This is an automated email from the ASF dual-hosted git repository.
lirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a94745e [FLINK-23178][hive] Raise an error for writing stream data
into partitioned hive tables without a partition committer
a94745e is described below
commit a94745ec85bf9e8ca3bc2fced5c1a466b836e0be
Author: Rui Li <[email protected]>
AuthorDate: Mon Jul 5 15:31:30 2021 +0800
[FLINK-23178][hive] Raise an error for writing stream data into partitioned
hive tables without a partition committer
This closes #16370
---
.../flink/connectors/hive/HiveTableSink.java | 21 ++++++++-----
.../flink/connectors/hive/HiveTableSinkITCase.java | 35 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 8 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 138befc..b31da9b 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.read.HiveCompactReaderFactory;
import org.apache.flink.connectors.hive.util.HiveConfUtils;
+import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.connectors.hive.write.HiveBulkWriterFactory;
import org.apache.flink.connectors.hive.write.HiveOutputFormatFactory;
import org.apache.flink.connectors.hive.write.HiveWriterFactory;
@@ -67,6 +68,7 @@ import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -217,7 +219,7 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
builder.setPartitionComputer(
new HiveRowPartitionComputer(
hiveShim,
- defaultPartName(),
+ JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray()));
@@ -248,10 +250,19 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
new org.apache.flink.configuration.Configuration();
catalogTable.getOptions().forEach(conf::setString);
+ String commitPolicies =
conf.getString(FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
+ if (!getPartitionKeys().isEmpty() &&
StringUtils.isNullOrWhitespaceOnly(commitPolicies)) {
+ throw new FlinkHiveException(
+ String.format(
+ "Streaming write to partitioned hive table %s
without providing a commit policy. "
+ + "Make sure to set a proper value for %s",
+ identifier,
FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key()));
+ }
+
HiveRowDataPartitionComputer partComputer =
new HiveRowDataPartitionComputer(
hiveShim,
- defaultPartName(),
+ JobConfUtils.getDefaultPartitionName(jobConf),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray());
@@ -332,12 +343,6 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
writerStream, path, identifier, getPartitionKeys(),
msFactory(), fsFactory(), conf);
}
- private String defaultPartName() {
- return jobConf.get(
- HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
- HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
- }
-
private CompactReader.Factory<RowData> createCompactReaderFactory(
StorageDescriptor sd, Properties properties) {
return new HiveCompactReaderFactory(
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 4ae3a07..5f2e0e4 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -65,6 +65,8 @@ import static
org.apache.flink.table.planner.utils.TableTestUtil.readFromResourc
import static
org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId;
import static
org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/** Tests {@link HiveTableSink}. */
public class HiveTableSinkITCase {
@@ -375,6 +377,39 @@ public class HiveTableSinkITCase {
}
}
+ @Test
+ public void testStreamingSinkWithoutCommitPolicy() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv =
HiveTestUtils.createTableEnvInStreamingMode(env);
+ tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tableEnv.useCatalog(hiveCatalog.getName());
+
+ tableEnv.executeSql("create database db1");
+ try {
+ tableEnv.useDatabase("db1");
+ tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tableEnv.executeSql("create table dest(x int) partitioned by (p
string)");
+
+ tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tableEnv.executeSql(
+ "create table src (i int, p string) with ("
+ + "'connector'='datagen',"
+ + "'number-of-rows'='5')");
+ tableEnv.executeSql("insert into dest select * from src").await();
+ fail("Streaming write partitioned table without commit policy
should fail");
+ } catch (FlinkHiveException e) {
+ // expected
+ assertTrue(
+ e.getMessage()
+ .contains(
+ String.format(
+ "Streaming write to partitioned
hive table `%s`.`%s`.`%s` without providing a commit policy",
+ hiveCatalog.getName(), "db1",
"dest")));
+ } finally {
+ tableEnv.executeSql("drop database db1 cascade");
+ }
+ }
+
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> strings = new ArrayList<>(size);
for (int i = 0; i < size; i++) {