This is an automated email from the ASF dual-hosted git repository.

lirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a94745e  [FLINK-23178][hive] Raise an error for writing stream data 
into partitioned hive tables without a partition committer
a94745e is described below

commit a94745ec85bf9e8ca3bc2fced5c1a466b836e0be
Author: Rui Li <[email protected]>
AuthorDate: Mon Jul 5 15:31:30 2021 +0800

    [FLINK-23178][hive] Raise an error for writing stream data into partitioned 
hive tables without a partition committer
    
    This closes #16370
---
 .../flink/connectors/hive/HiveTableSink.java       | 21 ++++++++-----
 .../flink/connectors/hive/HiveTableSinkITCase.java | 35 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 138befc..b31da9b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
 import org.apache.flink.connectors.hive.util.HiveConfUtils;
+import org.apache.flink.connectors.hive.util.JobConfUtils;
 import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
 import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
 import org.apache.flink.connectors.hive.write.HiveWriterFactory;
@@ -67,6 +68,7 @@ import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -217,7 +219,7 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
         builder.setPartitionComputer(
                 new HiveRowPartitionComputer(
                         hiveShim,
-                        defaultPartName(),
+                        JobConfUtils.getDefaultPartitionName(jobConf),
                         tableSchema.getFieldNames(),
                         tableSchema.getFieldDataTypes(),
                         getPartitionKeyArray()));
@@ -248,10 +250,19 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
                 new org.apache.flink.configuration.Configuration();
         catalogTable.getOptions().forEach(conf::setString);
 
+        String commitPolicies = 
conf.getString(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
+        if (!getPartitionKeys().isEmpty() && 
StringUtils.isNullOrWhitespaceOnly(commitPolicies)) {
+            throw new FlinkHiveException(
+                    String.format(
+                            "Streaming write to partitioned hive table %s 
without providing a commit policy. "
+                                    + "Make sure to set a proper value for %s",
+                            identifier, 
FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
+        }
+
         HiveRowDataPartitionComputer partComputer =
                 new HiveRowDataPartitionComputer(
                         hiveShim,
-                        defaultPartName(),
+                        JobConfUtils.getDefaultPartitionName(jobConf),
                         tableSchema.getFieldNames(),
                         tableSchema.getFieldDataTypes(),
                         getPartitionKeyArray());
@@ -332,12 +343,6 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
                 writerStream, path, identifier, getPartitionKeys(), 
msFactory(), fsFactory(), conf);
     }
 
-    private String defaultPartName() {
-        return jobConf.get(
-                HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
-                HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
-    }
-
     private CompactReader.Factory<RowData> createCompactReaderFactory(
             StorageDescriptor sd, Properties properties) {
         return new HiveCompactReaderFactory(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 4ae3a07..5f2e0e4 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -65,6 +65,8 @@ import static 
org.apache.flink.table.planner.utils.TableTestUtil.readFromResourc
 import static 
org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId;
 import static 
org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /** Tests {@link HiveTableSink}. */
 public class HiveTableSinkITCase {
@@ -375,6 +377,39 @@ public class HiveTableSinkITCase {
         }
     }
 
+    @Test
+    public void testStreamingSinkWithoutCommitPolicy() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tableEnv = 
HiveTestUtils.createTableEnvInStreamingMode(env);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+
+        tableEnv.executeSql("create database db1");
+        try {
+            tableEnv.useDatabase("db1");
+            tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+            tableEnv.executeSql("create table dest(x int) partitioned by (p 
string)");
+
+            tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+            tableEnv.executeSql(
+                    "create table src (i int, p string) with ("
+                            + "'connector'='datagen',"
+                            + "'number-of-rows'='5')");
+            tableEnv.executeSql("insert into dest select * from src").await();
+            fail("Streaming write partitioned table without commit policy 
should fail");
+        } catch (FlinkHiveException e) {
+            // expected
+            assertTrue(
+                    e.getMessage()
+                            .contains(
+                                    String.format(
+                                            "Streaming write to partitioned 
hive table `%s`.`%s`.`%s` without providing a commit policy",
+                                            hiveCatalog.getName(), "db1", 
"dest")));
+        } finally {
+            tableEnv.executeSql("drop database db1 cascade");
+        }
+    }
+
     private static List<String> fetchRows(Iterator<Row> iter, int size) {
         List<String> strings = new ArrayList<>(size);
         for (int i = 0; i < size; i++) {

Reply via email to