This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ac3589f  [HUDI-1814] Non partitioned table for Flink writer (#2859)
ac3589f is described below

commit ac3589f00659985c39ef29e5edd089279f6c2f70
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 21 20:07:27 2021 +0800

    [HUDI-1814] Non partitioned table for Flink writer (#2859)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  4 +--
 .../org/apache/hudi/table/HoodieTableFactory.java  | 17 +++++++---
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 36 ++++++++++++++++++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 17 ++++++++++
 4 files changed, 68 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c47ea95..9925fc7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -239,9 +239,9 @@ public class FlinkOptions {
   public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
       .key(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
       .stringType()
-      .defaultValue("partition-path")
+      .defaultValue("")
       .withDescription("Partition path field. Value to be used at the 
`partitionPath` component of `HoodieKey`.\n"
-          + "Actual value obtained by invoking .toString()");
+          + "Actual value obtained by invoking .toString(), default ''");
 
   public static final ConfigOption<Boolean> PARTITION_PATH_URL_ENCODE = 
ConfigOptions
       .key("write.partition.url_encode")
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 22dcd3e..abdfdcb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.util.AvroSchemaConverter;
 
 import org.apache.flink.configuration.ConfigOption;
@@ -129,13 +130,21 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       String recordKey = String.join(",", pkColumns);
       conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
     }
-    List<String> partitions = table.getPartitionKeys();
-    if (partitions.size() > 0) {
+    List<String> partitionKeys = table.getPartitionKeys();
+    if (partitionKeys.size() > 0) {
       // the PARTITIONED BY syntax always has higher priority than option 
FlinkOptions#PARTITION_PATH_FIELD
-      conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
partitions));
+      conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
partitionKeys));
     }
     // tweak the key gen class if possible
-    boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1;
+    final String[] partitions = 
conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+    if (partitions.length == 1 && partitions[0].equals("")) {
+      conf.setString(FlinkOptions.KEYGEN_CLASS, 
NonpartitionedAvroKeyGenerator.class.getName());
+      LOG.info("Table option [{}] is reset to {} because this is a 
non-partitioned table",
+          FlinkOptions.KEYGEN_CLASS.key(), 
NonpartitionedAvroKeyGenerator.class.getName());
+      return;
+    }
+    final String[] pks = 
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+    boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
     if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.KEYGEN_CLASS)) {
       conf.setString(FlinkOptions.KEYGEN_CLASS, 
ComplexAvroKeyGenerator.class.getName());
       LOG.info("Table option [{}] is reset to {} because record key or 
partition path has two or more fields",
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 29a7d7d..fe652c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -352,6 +352,42 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = ExecMode.class)
+  void testWriteNonPartitionedTable(ExecMode execMode) {
+    TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
+    String hoodieTableDDL = "create table t1(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(10),\n"
+        + "  age int,\n"
+        + "  ts timestamp(3),\n"
+        + "  `partition` varchar(20),\n"
+        + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
+        + ")\n"
+        + "with (\n"
+        + "  'connector' = 'hudi',\n"
+        + "  'path' = '" + tempFile.getAbsolutePath() + "'\n"
+        + ")";
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String insertInto1 = "insert into t1 values\n"
+        + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
+
+    execInsertSql(tableEnv, insertInto1);
+
+    final String insertInto2 = "insert into t1 values\n"
+        + "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n"
+        + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n"
+        + "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n"
+        + "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')";
+
+    execInsertSql(tableEnv, insertInto2);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index bbb5964..1f2059e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 
@@ -119,6 +120,14 @@ public class TestHoodieTableFactory {
     final Configuration conf2 = tableSource2.getConf();
     assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
     assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), 
is(ComplexAvroKeyGenerator.class.getName()));
+
+    // definition with complex primary keys and empty partition paths
+    this.conf.setString(FlinkOptions.KEYGEN_CLASS, 
FlinkOptions.KEYGEN_CLASS.defaultValue());
+    final MockContext sourceContext3 = MockContext.getInstance(this.conf, 
schema2, "");
+    final HoodieTableSource tableSource3 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext3);
+    final Configuration conf3 = tableSource3.getConf();
+    assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+    assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), 
is(NonpartitionedAvroKeyGenerator.class.getName()));
   }
 
   @Test
@@ -167,6 +176,14 @@ public class TestHoodieTableFactory {
     final Configuration conf2 = tableSink2.getConf();
     assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
     assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), 
is(ComplexAvroKeyGenerator.class.getName()));
+
+    // definition with complex primary keys and empty partition paths
+    this.conf.setString(FlinkOptions.KEYGEN_CLASS, 
FlinkOptions.KEYGEN_CLASS.defaultValue());
+    final MockContext sinkContext3 = MockContext.getInstance(this.conf, 
schema2, "");
+    final HoodieTableSink tableSink3 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sinkContext3);
+    final Configuration conf3 = tableSink3.getConf();
+    assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+    assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), 
is(NonpartitionedAvroKeyGenerator.class.getName()));
   }
 
   // -------------------------------------------------------------------------

Reply via email to