This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f1e0018  [HUDI-1704] Use PRIMARY KEY syntax to define record keys for 
Flink Hudi table (#2694)
f1e0018 is described below

commit f1e0018f12b66770ba3785be7aa8f5f6a80bab6f
Author: Danny Chan <[email protected]>
AuthorDate: Thu Mar 18 20:21:52 2021 +0800

    [HUDI-1704] Use PRIMARY KEY syntax to define record keys for Flink Hudi 
table (#2694)
    
    The SQL PRIMARY KEY semantics is very same with Hoodie record key, using
    PRIMARY KEY is more straight-forward way instead of a table option:
    hoodie.datasource.write.recordkey.field.
    
    After this change, both PRIMARY KEY and table option can define hoodie
    record key, while the PRIMARY KEY has higher priority if both are
    defined.
    
    Note: a column with PRIMARY KEY constraint is forced to be non-nullable.
---
 .../apache/hudi/configuration/FlinkOptions.java    |   8 ++
 .../org/apache/hudi/table/HoodieTableFactory.java  |  71 +++++++++++---
 .../apache/hudi/table/TestHoodieTableFactory.java  | 106 +++++++++++++++++++--
 .../org/apache/hudi/utils/TestConfigurations.java  |   3 +-
 4 files changed, 168 insertions(+), 20 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 17eee61..5919238 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -377,4 +377,12 @@ public class FlinkOptions {
     map.forEach(configuration::setString);
     return configuration;
   }
+
+  /**
+   * Returns whether the given conf defines default value for the option 
{@code option}.
+   */
+  public static <T> boolean isDefaultValueDefined(Configuration conf, 
ConfigOption<T> option) {
+    return !conf.getOptional(option).isPresent()
+        || conf.get(option).equals(option.defaultValue());
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 36020f4..a2dac36 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,11 +19,14 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.util.AvroSchemaConverter;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.TableSinkFactory;
@@ -33,6 +36,8 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,19 +48,19 @@ import java.util.Map;
  * Hoodie data source/sink factory.
  */
 public class HoodieTableFactory implements TableSourceFactory<RowData>, 
TableSinkFactory<RowData> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTableFactory.class);
+
   public static final String FACTORY_ID = "hudi";
 
   @Override
   public TableSource<RowData> createTableSource(TableSourceFactory.Context 
context) {
     Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
-    conf.setString(FlinkOptions.TABLE_NAME.key(), 
context.getObjectIdentifier().getObjectName());
-    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
context.getTable().getPartitionKeys()));
+    TableSchema schema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
+    setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getTable(), schema);
     Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
         new ValidationException("Option [path] should be not empty.")));
-    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    inferAvroSchema(conf, 
tableSchema.toRowDataType().notNull().getLogicalType());
     return new HoodieTableSource(
-        tableSchema,
+        schema,
         path,
         context.getTable().getPartitionKeys(),
         conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
@@ -65,11 +70,9 @@ public class HoodieTableFactory implements 
TableSourceFactory<RowData>, TableSin
   @Override
   public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
     Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
-    conf.setString(FlinkOptions.TABLE_NAME.key(), 
context.getObjectIdentifier().getObjectName());
-    conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
context.getTable().getPartitionKeys()));
-    TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    inferAvroSchema(conf, 
tableSchema.toRowDataType().notNull().getLogicalType());
-    return new HoodieTableSink(conf, tableSchema);
+    TableSchema schema = 
TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
+    setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getTable(), schema);
+    return new HoodieTableSink(conf, schema);
   }
 
   @Override
@@ -90,6 +93,52 @@ public class HoodieTableFactory implements 
TableSourceFactory<RowData>, TableSin
   // -------------------------------------------------------------------------
 
   /**
+   * Setup the config options based on the table definition, for e.g the table 
name, primary key.
+   *
+   * @param conf      The configuration to setup
+   * @param tableName The table name
+   * @param table     The catalog table
+   * @param schema    The physical schema
+   */
+  private static void setupConfOptions(
+      Configuration conf,
+      String tableName,
+      CatalogTable table,
+      TableSchema schema) {
+    // table name
+    conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
+    // hoodie key about options
+    setupHoodieKeyOptions(conf, table);
+    // infer avro schema from physical DDL schema
+    inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
+  }
+
+  /**
+   * Sets up the hoodie key options (e.g. record key and partition key) from 
the table definition.
+   */
+  private static void setupHoodieKeyOptions(Configuration conf, CatalogTable 
table) {
+    List<String> pkColumns = table.getSchema().getPrimaryKey()
+        .map(UniqueConstraint::getColumns).orElse(Collections.emptyList());
+    if (pkColumns.size() > 0) {
+      // the PRIMARY KEY syntax always has higher priority than option 
FlinkOptions#RECORD_KEY_FIELD
+      String recordKey = String.join(",", pkColumns);
+      conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
+    }
+    List<String> partitions = table.getPartitionKeys();
+    if (partitions.size() > 0) {
+      // the PARTITIONED BY syntax always has higher priority than option 
FlinkOptions#PARTITION_PATH_FIELD
+      conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
partitions));
+    }
+    // tweak the key gen class if possible
+    boolean complexHoodieKey = pkColumns.size() > 1 || partitions.size() > 1;
+    if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.KEYGEN_CLASS)) {
+      conf.setString(FlinkOptions.KEYGEN_CLASS, 
ComplexAvroKeyGenerator.class.getName());
+      LOG.info("Table option [{}] is reset to {} because record key or 
partition path has two or more fields",
+          FlinkOptions.KEYGEN_CLASS.key(), 
ComplexAvroKeyGenerator.class.getName());
+    }
+  }
+
+  /**
    * Inferences the deserialization Avro schema from the table schema (e.g. 
the DDL)
    * if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
    * {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified.
@@ -97,7 +146,7 @@ public class HoodieTableFactory implements 
TableSourceFactory<RowData>, TableSin
    * @param conf    The configuration
    * @param rowType The specified table row type
    */
-  private void inferAvroSchema(Configuration conf, LogicalType rowType) {
+  private static void inferAvroSchema(Configuration conf, LogicalType rowType) 
{
     if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()
         && !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
       String inferredSchema = 
AvroSchemaConverter.convertToSchema(rowType).toString();
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f64808e..44c030a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -19,11 +19,14 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -36,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -87,6 +91,38 @@ public class TestHoodieTableFactory {
   }
 
   @Test
+  void testSetupHoodieKeyOptionsForSource() {
+    this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
+    this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
+    // definition with simple primary key and partition path
+    TableSchema schema1 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    final MockSourceContext sourceContext1 = 
MockSourceContext.getInstance(this.conf, schema1, "f2");
+    final HoodieTableSource tableSource1 = (HoodieTableSource) new 
HoodieTableFactory().createTableSource(sourceContext1);
+    final Configuration conf1 = tableSource1.getConf();
+    assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+    assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
+
+    // definition with complex primary keys and partition paths
+    this.conf.setString(FlinkOptions.KEYGEN_CLASS, 
FlinkOptions.KEYGEN_CLASS.defaultValue());
+    TableSchema schema2 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20).notNull())
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0", "f1")
+        .build();
+    final MockSourceContext sourceContext2 = 
MockSourceContext.getInstance(this.conf, schema2, "f2");
+    final HoodieTableSource tableSource2 = (HoodieTableSource) new 
HoodieTableFactory().createTableSource(sourceContext2);
+    final Configuration conf2 = tableSource2.getConf();
+    assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+    assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), 
is(ComplexAvroKeyGenerator.class.getName()));
+  }
+
+  @Test
   void testInferAvroSchemaForSink() {
     // infer the schema if not specified
     final HoodieTableSink tableSink1 =
@@ -102,6 +138,38 @@ public class TestHoodieTableFactory {
     assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string 
as null");
   }
 
+  @Test
+  void testSetupHoodieKeyOptionsForSink() {
+    this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
+    this.conf.setString(FlinkOptions.KEYGEN_CLASS, "dummyKeyGenClass");
+    // definition with simple primary key and partition path
+    TableSchema schema1 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    final MockSinkContext sinkContext1 = 
MockSinkContext.getInstance(this.conf, schema1, "f2");
+    final HoodieTableSink tableSink1 = (HoodieTableSink) new 
HoodieTableFactory().createTableSink(sinkContext1);
+    final Configuration conf1 = tableSink1.getConf();
+    assertThat(conf1.get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+    assertThat(conf1.get(FlinkOptions.KEYGEN_CLASS), is("dummyKeyGenClass"));
+
+    // definition with complex primary keys and partition paths
+    this.conf.setString(FlinkOptions.KEYGEN_CLASS, 
FlinkOptions.KEYGEN_CLASS.defaultValue());
+    TableSchema schema2 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20).notNull())
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0", "f1")
+        .build();
+    final MockSinkContext sinkContext2 = 
MockSinkContext.getInstance(this.conf, schema2, "f2");
+    final HoodieTableSink tableSink2 = (HoodieTableSink) new 
HoodieTableFactory().createTableSink(sinkContext2);
+    final Configuration conf2 = tableSink2.getConf();
+    assertThat(conf2.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+    assertThat(conf2.get(FlinkOptions.KEYGEN_CLASS), 
is(ComplexAvroKeyGenerator.class.getName()));
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
@@ -111,13 +179,25 @@ public class TestHoodieTableFactory {
    */
   private static class MockSourceContext implements TableSourceFactory.Context 
{
     private final Configuration conf;
+    private final TableSchema schema;
+    private final List<String> partitions;
 
-    private MockSourceContext(Configuration conf) {
+    private MockSourceContext(Configuration conf, TableSchema schema, 
List<String> partitions) {
       this.conf = conf;
+      this.schema = schema;
+      this.partitions = partitions;
     }
 
     static MockSourceContext getInstance(Configuration conf) {
-      return new MockSourceContext(conf);
+      return getInstance(conf, TestConfigurations.TABLE_SCHEMA, 
Collections.singletonList("partition"));
+    }
+
+    static MockSourceContext getInstance(Configuration conf, TableSchema 
schema, String partition) {
+      return getInstance(conf, schema, Collections.singletonList(partition));
+    }
+
+    static MockSourceContext getInstance(Configuration conf, TableSchema 
schema, List<String> partitions) {
+      return new MockSourceContext(conf, schema, partitions);
     }
 
     @Override
@@ -127,8 +207,7 @@ public class TestHoodieTableFactory {
 
     @Override
     public CatalogTable getTable() {
-      return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA, 
Collections.singletonList("partition"),
-          conf.toMap(), "mock source table");
+      return new CatalogTableImpl(schema, partitions, conf.toMap(), "mock 
source table");
     }
 
     @Override
@@ -142,13 +221,25 @@ public class TestHoodieTableFactory {
    */
   private static class MockSinkContext implements TableSinkFactory.Context {
     private final Configuration conf;
+    private final TableSchema schema;
+    private final List<String> partitions;
 
-    private MockSinkContext(Configuration conf) {
+    private MockSinkContext(Configuration conf, TableSchema schema, 
List<String> partitions) {
       this.conf = conf;
+      this.schema = schema;
+      this.partitions = partitions;
     }
 
     static MockSinkContext getInstance(Configuration conf) {
-      return new MockSinkContext(conf);
+      return getInstance(conf, TestConfigurations.TABLE_SCHEMA, "partition");
+    }
+
+    static MockSinkContext getInstance(Configuration conf, TableSchema schema, 
String partition) {
+      return getInstance(conf, schema, Collections.singletonList(partition));
+    }
+
+    static MockSinkContext getInstance(Configuration conf, TableSchema schema, 
List<String> partitions) {
+      return new MockSinkContext(conf, schema, partitions);
     }
 
     @Override
@@ -158,8 +249,7 @@ public class TestHoodieTableFactory {
 
     @Override
     public CatalogTable getTable() {
-      return new CatalogTableImpl(TestConfigurations.TABLE_SCHEMA, 
Collections.singletonList("partition"),
-          conf.toMap(), "mock sink table");
+      return new CatalogTableImpl(this.schema, this.partitions, conf.toMap(), 
"mock sink table");
     }
 
     @Override
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index e32b9c0..eaf5979 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -73,7 +73,8 @@ public class TestConfigurations {
         + "  name varchar(10),\n"
         + "  age int,\n"
         + "  ts timestamp(3),\n"
-        + "  `partition` varchar(20)\n"
+        + "  `partition` varchar(20),\n"
+        + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
         + ")\n"
         + "PARTITIONED BY (`partition`)\n"
         + "with (\n"

Reply via email to