This is an automated email from the ASF dual-hosted git repository.
jingzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 06dc85aa67d [FLINK-30567][hive] Fix wrong behavior for insert
overwrite with Hive dialect when the table contains uppercase character
(#21602)
06dc85aa67d is described below
commit 06dc85aa67d7abb9e44102544588eaf1c292b93b
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Jan 30 11:26:40 2023 +0800
[FLINK-30567][hive] Fix wrong behavior for insert overwrite with Hive
dialect when the table contains uppercase character (#21602)
[FLINK-30567][hive] Fix wrong behavior for insert overwrite with Hive
dialect when the table contains uppercase character
This closes #21602
---
.../planner/delegation/hive/HiveParserDMLHelper.java | 11 ++++++++++-
.../apache/flink/connectors/hive/HiveDialectITCase.java | 17 +++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
index 7444bec2d43..5c99f9d9ced 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
@@ -316,7 +316,16 @@ public class HiveParserDMLHelper {
topQB.getParseInfo().getInsertOverwriteTables().keySet().stream()
.map(String::toLowerCase)
.collect(Collectors.toSet())
- .contains(tableName);
+ .contains(tableName.toLowerCase());
+
+ boolean isInsertInto =
topQB.getParseInfo().isInsertIntoTable(tableName);
+
+ Preconditions.checkArgument(
+ overwrite | isInsertInto,
+ "Inconsistent data structure detected: we are writing to "
+ + tableName
+ + ", but it's not in isInsertIntoTable() or
getInsertOverwriteTables()."
+ + " This is a bug. Please consider filing an issue.");
Tuple4<ObjectIdentifier, QueryOperation, Map<String, String>, Boolean>
insertOperationInfo =
createInsertOperationInfo(
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index f88d4346fcc..1b10e6c93a7 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -395,6 +395,23 @@ public class HiveDialectITCase {
.isEqualTo("[+I[1, a, 100.45], +I[1, b, 100.45], +I[1, c,
100.45]]");
}
+ @Test
+ public void testInsertOverwrite() throws Exception {
+ tableEnv.executeSql("create table T1(a int, b string)");
+ tableEnv.executeSql("insert into T1 values(1, 'v1')").await();
+ tableEnv.executeSql("create table T2(a int, b string) partitioned by
(dt string)");
+ tableEnv.executeSql(
+ "insert overwrite table default.T2 partition (dt =
'2023-01-01') select * from default.T1")
+ .await();
+ List<Row> rows = queryResult(tableEnv.sqlQuery("select * from T2"));
+ assertThat(rows.toString()).isEqualTo("[+I[1, v1, 2023-01-01]]");
+ tableEnv.executeSql(
+ "insert overwrite table default.T2 partition (dt =
'2023-01-01') select * from default.T1")
+ .await();
+ rows = queryResult(tableEnv.sqlQuery("select * from T2"));
+ assertThat(rows.toString()).isEqualTo("[+I[1, v1, 2023-01-01]]");
+ }
+
@Test
public void testAlterTable() throws Exception {
tableEnv.executeSql("create table tbl (x int)
tblproperties('k1'='v1')");