This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ac3589f [HUDI-1814] Non partitioned table for Flink writer (#2859)
ac3589f is described below
commit ac3589f00659985c39ef29e5edd089279f6c2f70
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 21 20:07:27 2021 +0800
[HUDI-1814] Non partitioned table for Flink writer (#2859)
---
.../apache/hudi/configuration/FlinkOptions.java | 4 +--
.../org/apache/hudi/table/HoodieTableFactory.java | 17 +++++++---
.../apache/hudi/table/HoodieDataSourceITCase.java | 36 ++++++++++++++++++++++
.../apache/hudi/table/TestHoodieTableFactory.java | 17 ++++++++++
4 files changed, 68 insertions(+), 6 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c47ea95..9925fc7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -239,9 +239,9 @@ public class FlinkOptions {
public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.stringType()
- .defaultValue("partition-path")
+ .defaultValue("")
.withDescription("Partition path field. Value to be used at the
`partitionPath` component of `HoodieKey`.\n"
- + "Actual value obtained by invoking .toString()");
+ + "Actual value obtained by invoking .toString(), default ''");
public static final ConfigOption<Boolean> PARTITION_PATH_URL_ENCODE =
ConfigOptions
.key("write.partition.url_encode")
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 22dcd3e..abdfdcb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.flink.configuration.ConfigOption;
@@ -129,13 +130,21 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
String recordKey = String.join(",", pkColumns);
conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
}
- List<String> partitions = table.getPartitionKeys();
- if (partitions.size() > 0) {
+ List<String> partitionKeys = table.getPartitionKeys();
+ if (partitionKeys.size() > 0) {
// the PARTITIONED BY syntax always has higher priority than option
FlinkOptions#PARTITION_PATH_FIELD
- conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
partitions));
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",",
partitionKeys));
}
// tweak the key gen class if possible
- boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1;
+ final String[] partitions =
conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+ if (partitions.length == 1 && partitions[0].equals("")) {
+ conf.setString(FlinkOptions.KEYGEN_CLASS,
NonpartitionedAvroKeyGenerator.class.getName());
+ LOG.info("Table option [{}] is reset to {} because this is a
non-partitioned table",
+ FlinkOptions.KEYGEN_CLASS.key(),
NonpartitionedAvroKeyGenerator.class.getName());
+ return;
+ }
+ final String[] pks =
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.KEYGEN_CLASS)) {
conf.setString(FlinkOptions.KEYGEN_CLASS,
ComplexAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because record key or
partition path has two or more fields",
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 29a7d7d..fe652c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -352,6 +352,42 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]");
}
+ @ParameterizedTest
+ @EnumSource(value = ExecMode.class)
+ void testWriteNonPartitionedTable(ExecMode execMode) {
+ TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv :
streamTableEnv;
+ String hoodieTableDDL = "create table t1(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " ts timestamp(3),\n"
+ + " `partition` varchar(20),\n"
+ + " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ + ")\n"
+ + "with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'path' = '" + tempFile.getAbsolutePath() + "'\n"
+ + ")";
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String insertInto1 = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+
+ execInsertSql(tableEnv, insertInto1);
+
+ final String insertInto2 = "insert into t1 values\n"
+ + "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n"
+ + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n"
+ + "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n"
+ + "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')";
+
+ execInsertSql(tableEnv, insertInto2);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index bbb5964..1f2059e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -119,6 +120,14 @@ public class TestHoodieTableFactory {
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS),
is(ComplexAvroKeyGenerator.class.getName()));
+
+ // definition with complex primary keys and empty partition paths
+ this.conf.setString(FlinkOptions.KEYGEN_CLASS,
FlinkOptions.KEYGEN_CLASS.defaultValue());
+ final MockContext sourceContext3 = MockContext.getInstance(this.conf,
schema2, "");
+ final HoodieTableSource tableSource3 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext3);
+ final Configuration conf3 = tableSource3.getConf();
+ assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+ assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS),
is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test
@@ -167,6 +176,14 @@ public class TestHoodieTableFactory {
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS),
is(ComplexAvroKeyGenerator.class.getName()));
+
+ // definition with complex primary keys and empty partition paths
+ this.conf.setString(FlinkOptions.KEYGEN_CLASS,
FlinkOptions.KEYGEN_CLASS.defaultValue());
+ final MockContext sinkContext3 = MockContext.getInstance(this.conf,
schema2, "");
+ final HoodieTableSink tableSink3 = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sinkContext3);
+ final Configuration conf3 = tableSink3.getConf();
+ assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+ assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS),
is(NonpartitionedAvroKeyGenerator.class.getName()));
}
// -------------------------------------------------------------------------