This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 48977a61e26e93603085bfc6f349bce8dd72b940
Author: Bingeng Huang <[email protected]>
AuthorDate: Sun Jan 29 21:13:33 2023 +0800

    [HUDI-5503] Optimize flink table factory option check (#7608)
    
    Co-authored-by: hbg <[email protected]>
---
 .../org/apache/hudi/sink/bulk/RowDataKeyGen.java   | 22 +++++-
 .../org/apache/hudi/table/HoodieTableFactory.java  | 51 ++++++++++--
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  7 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    | 11 +--
 .../apache/hudi/sink/bulk/TestRowDataKeyGen.java   | 20 +++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 92 ++++++++++++++++++++--
 6 files changed, 177 insertions(+), 26 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index a2414abc3de..57b95788d9a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -56,6 +56,8 @@ public class RowDataKeyGen implements Serializable {
 
   private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
 
+  private final boolean hasRecordKey;
+
   private final String[] recordKeyFields;
   private final String[] partitionPathFields;
 
@@ -90,7 +92,11 @@ public class RowDataKeyGen implements Serializable {
 
     this.hiveStylePartitioning = hiveStylePartitioning;
     this.encodePartitionPath = encodePartitionPath;
-    if (this.recordKeyFields.length == 1) {
+
+    this.hasRecordKey = hasRecordKey(fieldNames);
+    if (!hasRecordKey) {
+      this.recordKeyProjection = null;
+    } else if (this.recordKeyFields.length == 1) {
       // efficient code path
       this.simpleRecordKey = true;
       int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
@@ -115,6 +121,14 @@ public class RowDataKeyGen implements Serializable {
     this.keyGenOpt = keyGenOpt;
   }
 
+  /**
+   * Checks whether user provides any record key.
+   */
+  private boolean hasRecordKey(List<String> fieldNames) {
+    return recordKeyFields.length != 1
+        || fieldNames.contains(recordKeyFields[0]);
+  }
+
   public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
     Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
     if 
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)))
 {
@@ -134,7 +148,11 @@ public class RowDataKeyGen implements Serializable {
   }
 
   public String getRecordKey(RowData rowData) {
-    if (this.simpleRecordKey) {
+    if (!hasRecordKey) {
+      // should be optimized to unique values that can be easily calculated 
with low cost
+      // for e.g, fileId + auto inc integer
+      return EMPTY_RECORDKEY_PLACEHOLDER;
+    } else if (this.simpleRecordKey) {
       return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), 
this.recordKeyFields[0]);
     } else {
       Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index c7a79561b3f..d57d971f476 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.index.HoodieIndex;
@@ -30,6 +32,7 @@ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -68,12 +71,11 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   @Override
   public DynamicTableSource createDynamicTableSource(Context context) {
     Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
-    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
-    sanityCheck(conf, schema);
-    setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
-
     Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
         new ValidationException("Option [path] should not be empty.")));
+    setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
+    ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+    setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
     return new HoodieTableSource(
         schema,
         path,
@@ -87,12 +89,34 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
     
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
         "Option [path] should not be empty.");
+    setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
     return new HoodieTableSink(conf, schema);
   }
 
+  /**
+   * Supplement the table config options if not specified.
+   */
+  private void setupTableOptions(String basePath, Configuration conf) {
+    StreamerUtil.getTableConfig(basePath, 
HadoopConfigurations.getHadoopConf(conf))
+        .ifPresent(tableConfig -> {
+          if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
+              && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
+            conf.setString(FlinkOptions.RECORD_KEY_FIELD, 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
+          }
+          if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD)
+              && !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {
+            conf.setString(FlinkOptions.PRECOMBINE_FIELD, 
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));
+          }
+          if 
(tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)
+              && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
+            conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, 
tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
+          }
+        });
+  }
+
   @Override
   public String factoryIdentifier() {
     return FACTORY_ID;
@@ -119,9 +143,17 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    * @param schema The table schema
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
-    List<String> fields = schema.getColumnNames();
+    if (!OptionsResolver.isAppendMode(conf)) {
+      checkRecordKey(conf, schema);
+      checkPreCombineKey(conf, schema);
+    }
+  }
 
-    // validate record key in pk absence.
+  /**
+   * Validate the record key.
+   */
+  private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
+    List<String> fields = schema.getColumnNames();
     if (!schema.getPrimaryKey().isPresent()) {
       String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
       if (recordKeys.length == 1
@@ -139,8 +171,13 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
                 + "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not 
exist in the table schema.");
           });
     }
+  }
 
-    // validate pre_combine key
+  /**
+   * Validate pre_combine key.
+   */
+  private void checkPreCombineKey(Configuration conf, ResolvedSchema schema) {
+    List<String> fields = schema.getColumnNames();
     String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
     if (!fields.contains(preCombineField)) {
       if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 8644435b5ab..6dcdf118415 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -386,9 +387,9 @@ public class HoodieHiveCatalog extends AbstractCatalog {
         if 
(!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
           // read the table config first
           final boolean hiveStyle;
-          HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path, 
hiveConf);
-          if (tableConfig != null && 
tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
-            hiveStyle = 
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+          Option<HoodieTableConfig> tableConfig = 
StreamerUtil.getTableConfig(path, hiveConf);
+          if (tableConfig.isPresent() && 
tableConfig.get().contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
+            hiveStyle = 
Boolean.parseBoolean(tableConfig.get().getHiveStylePartitioningEnable());
           } else {
             // fallback to the partition path pattern
             Path hoodieTablePath = new Path(path);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index a6bddf1e82f..1e5af896928 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -57,8 +57,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.StringReader;
@@ -287,20 +285,19 @@ public class StreamerUtil {
   }
 
   /**
-   * Returns the table config or null if the table does not exist.
+   * Returns the table config or empty if the table does not exist.
    */
-  @Nullable
-  public static HoodieTableConfig getTableConfig(String basePath, 
org.apache.hadoop.conf.Configuration hadoopConf) {
+  public static Option<HoodieTableConfig> getTableConfig(String basePath, 
org.apache.hadoop.conf.Configuration hadoopConf) {
     FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
     Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
     try {
       if (fs.exists(metaPath)) {
-        return new HoodieTableConfig(fs, metaPath.toString(), null, null);
+        return Option.of(new HoodieTableConfig(fs, metaPath.toString(), null, 
null));
       }
     } catch (IOException e) {
       throw new HoodieIOException("Get table config error", e);
     }
-    return null;
+    return Option.empty();
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index a66874c4864..6fae13811f4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -165,4 +165,24 @@ public class TestRowDataKeyGen {
     assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + 
expectedPartition2));
     assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + 
expectedPartition3));
   }
+
+  @Test
+  void testPrimaryKeylessWrite() {
+    Configuration conf = TestConfigurations.getDefaultConf("path1");
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, "");
+    final RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+        TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
+    final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, 
TestConfigurations.ROW_TYPE);
+    assertThat(keyGen1.getRecordKey(rowData1), is("__empty__"));
+
+    // null record key and partition path
+    final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, 
StringData.fromString("Danny"), 23,
+        TimestampData.fromEpochMillis(1), null);
+    assertThat(keyGen1.getRecordKey(rowData2), is("__empty__"));
+
+    // empty record key and partition path
+    final RowData rowData3 = insertRow(StringData.fromString(""), 
StringData.fromString("Danny"), 23,
+        TimestampData.fromEpochMillis(1), StringData.fromString(""));
+    assertThat(keyGen1.getRecordKey(rowData3), is("__empty__"));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f7a35e57f2b..b9964b70f79 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -86,17 +86,26 @@ public class TestHoodieTableFactory {
   }
 
   @Test
-  void testRequiredOptionsForSource() {
-    // miss pk and precombine key will throw exception
+  void testRequiredOptions() {
     ResolvedSchema schema1 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
         .build();
     final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
-    assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext1));
+
+    // createDynamicTableSource doesn't call sanity check, will not throw 
exception
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext1));
+    // miss pk and precombine key will throw exception when create sink
     assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext1));
 
+    // append mode does not throw
+    this.conf.set(FlinkOptions.OPERATION, "insert");
+    final MockContext sourceContext11 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext11));
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext11));
+    this.conf.set(FlinkOptions.OPERATION, 
FlinkOptions.OPERATION.defaultValue());
+
     // a non-exists precombine key will throw exception
     ResolvedSchema schema2 = SchemaBuilder.instance()
         .field("f0", DataTypes.INT().notNull())
@@ -105,7 +114,8 @@ public class TestHoodieTableFactory {
         .build();
     this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field");
     final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema2, "f2");
-    assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext2));
+    // createDynamicTableSource doesn't call sanity check, will not throw 
exception
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext2));
     assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext2));
     this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, 
FlinkOptions.PRECOMBINE_FIELD.defaultValue());
 
@@ -120,17 +130,17 @@ public class TestHoodieTableFactory {
     HoodieTableSource tableSource = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext3);
     HoodieTableSink tableSink = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sourceContext3);
     // the precombine field is overwritten
-    assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), 
is(FlinkOptions.NO_PRE_COMBINE));
     assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD), 
is(FlinkOptions.NO_PRE_COMBINE));
     // precombine field not specified, use the default payload clazz
     
assertThat(tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), 
is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
     assertThat(tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), 
is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
 
-    // given pk but miss the pre combine key with DefaultHoodieRecordPayload 
should throw
     this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
DefaultHoodieRecordPayload.class.getName());
     final MockContext sourceContext4 = MockContext.getInstance(this.conf, 
schema3, "f2");
 
-    assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext4));
+    // createDynamicTableSource doesn't call sanity check, will not throw 
exception
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext4));
+    // given pk but miss the pre combine key with DefaultHoodieRecordPayload 
should throw
     assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext4));
     this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue());
 
@@ -167,6 +177,74 @@ public class TestHoodieTableFactory {
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext6));
   }
 
+  @Test
+  void testSupplementTableConfig() throws Exception {
+    String tablePath = new File(tempFile.getAbsolutePath(), 
"dummy").getAbsolutePath();
+    // add pk and pre-combine key to table config
+    Configuration tableConf = new Configuration();
+    tableConf.setString(FlinkOptions.PATH, tablePath);
+    tableConf.setString(FlinkOptions.TABLE_NAME, "t2");
+    tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1");
+    tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");
+
+    StreamerUtil.initTableIfNotExists(tableConf);
+
+    Configuration writeConf = new Configuration();
+    writeConf.set(FlinkOptions.PATH, tablePath);
+    writeConf.set(FlinkOptions.TABLE_NAME, "t2");
+
+    // fallback to table config
+    ResolvedSchema schema1 = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .build();
+    final MockContext sourceContext1 = MockContext.getInstance(writeConf, 
schema1, "f2");
+    HoodieTableSource source1 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext1);
+    HoodieTableSink sink1 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sourceContext1);
+    assertThat("pk not provided, fallback to table config",
+        source1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+    assertThat("pk not provided, fallback to table config",
+        sink1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+    assertThat("pre-combine key not provided, fallback to table config",
+        source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
+    assertThat("pre-combine key not provided, fallback to table config",
+        sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
+
+    // write config always has higher priority
+    // set up a different primary key and pre_combine key with table config 
options
+    writeConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0");
+    writeConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f1");
+
+    final MockContext sourceContext2 = MockContext.getInstance(writeConf, 
schema1, "f2");
+    HoodieTableSource source2 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext2);
+    HoodieTableSink sink2 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sourceContext2);
+    assertThat("choose pk from write config",
+        source2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+    assertThat("choose pk from write config",
+        sink2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+    assertThat("choose preCombine key from write config",
+        source2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1"));
+    assertThat("choose preCombine pk from write config",
+        sink2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1"));
+
+    writeConf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
+    writeConf.removeConfig(FlinkOptions.PRECOMBINE_FIELD);
+
+    // pk defined in table config but missing in schema will throw
+    ResolvedSchema schema2 = SchemaBuilder.instance()
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .build();
+    final MockContext sourceContext3 = MockContext.getInstance(writeConf, 
schema2, "f2");
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext3),
+        "createDynamicTableSource won't call sanity check");
+    assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext3),
+        "f0 is in table config as record key, but missing in input schema");
+  }
+
   @Test
   void testInferAvroSchemaForSource() {
     // infer the schema if not specified

Reply via email to