This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b00dac54642 [HUDI-5503] Optimize flink table factory option check
(#7608)
b00dac54642 is described below
commit b00dac5464229c898d102e3278785ac922727391
Author: Bingeng Huang <[email protected]>
AuthorDate: Sun Jan 29 21:13:33 2023 +0800
[HUDI-5503] Optimize flink table factory option check (#7608)
Co-authored-by: hbg <[email protected]>
---
.../org/apache/hudi/sink/bulk/RowDataKeyGen.java | 22 +++++-
.../org/apache/hudi/table/HoodieTableFactory.java | 51 ++++++++++--
.../hudi/table/catalog/HoodieHiveCatalog.java | 7 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 11 +--
.../apache/hudi/sink/bulk/TestRowDataKeyGen.java | 20 +++++
.../apache/hudi/table/TestHoodieTableFactory.java | 92 ++++++++++++++++++++--
6 files changed, 177 insertions(+), 26 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
index a2414abc3de..57b95788d9a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
@@ -56,6 +56,8 @@ public class RowDataKeyGen implements Serializable {
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+ private final boolean hasRecordKey;
+
private final String[] recordKeyFields;
private final String[] partitionPathFields;
@@ -90,7 +92,11 @@ public class RowDataKeyGen implements Serializable {
this.hiveStylePartitioning = hiveStylePartitioning;
this.encodePartitionPath = encodePartitionPath;
- if (this.recordKeyFields.length == 1) {
+
+ this.hasRecordKey = hasRecordKey(fieldNames);
+ if (!hasRecordKey) {
+ this.recordKeyProjection = null;
+ } else if (this.recordKeyFields.length == 1) {
// efficient code path
this.simpleRecordKey = true;
int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
@@ -115,6 +121,14 @@ public class RowDataKeyGen implements Serializable {
this.keyGenOpt = keyGenOpt;
}
+ /**
+ * Checks whether user provides any record key.
+ */
+ private boolean hasRecordKey(List<String> fieldNames) {
+ return recordKeyFields.length != 1
+ || fieldNames.contains(recordKeyFields[0]);
+ }
+
public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
if
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.getString(FlinkOptions.KEYGEN_CLASS_NAME)))
{
@@ -134,7 +148,11 @@ public class RowDataKeyGen implements Serializable {
}
public String getRecordKey(RowData rowData) {
- if (this.simpleRecordKey) {
+ if (!hasRecordKey) {
+ // should be optimized to unique values that can be easily calculated
with low cost
+ // for e.g, fileId + auto inc integer
+ return EMPTY_RECORDKEY_PLACEHOLDER;
+ } else if (this.simpleRecordKey) {
return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData),
this.recordKeyFields[0]);
} else {
Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index c7a79561b3f..d57d971f476 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,8 +19,10 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
@@ -30,6 +32,7 @@ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -68,12 +71,11 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf =
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
- ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
- sanityCheck(conf, schema);
- setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
-
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
+ setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
+ ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+ setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
return new HoodieTableSource(
schema,
path,
@@ -87,12 +89,34 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
Configuration conf =
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
"Option [path] should not be empty.");
+ setupTableOptions(conf.getString(FlinkOptions.PATH), conf);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}
+ /**
+ * Supplement the table config options if not specified.
+ */
+ private void setupTableOptions(String basePath, Configuration conf) {
+ StreamerUtil.getTableConfig(basePath,
HadoopConfigurations.getHadoopConf(conf))
+ .ifPresent(tableConfig -> {
+ if (tableConfig.contains(HoodieTableConfig.RECORDKEY_FIELDS)
+ && !conf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS));
+ }
+ if (tableConfig.contains(HoodieTableConfig.PRECOMBINE_FIELD)
+ && !conf.contains(FlinkOptions.PRECOMBINE_FIELD)) {
+ conf.setString(FlinkOptions.PRECOMBINE_FIELD,
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD));
+ }
+ if
(tableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)
+ && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
+ conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING,
tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
+ }
+ });
+ }
+
@Override
public String factoryIdentifier() {
return FACTORY_ID;
@@ -119,9 +143,17 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
- List<String> fields = schema.getColumnNames();
+ if (!OptionsResolver.isAppendMode(conf)) {
+ checkRecordKey(conf, schema);
+ checkPreCombineKey(conf, schema);
+ }
+ }
- // validate record key in pk absence.
+ /**
+ * Validate the record key.
+ */
+ private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
+ List<String> fields = schema.getColumnNames();
if (!schema.getPrimaryKey().isPresent()) {
String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
if (recordKeys.length == 1
@@ -139,8 +171,13 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
+ "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not
exist in the table schema.");
});
}
+ }
- // validate pre_combine key
+ /**
+ * Validate pre_combine key.
+ */
+ private void checkPreCombineKey(Configuration conf, ResolvedSchema schema) {
+ List<String> fields = schema.getColumnNames();
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
if (!fields.contains(preCombineField)) {
if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 8644435b5ab..6dcdf118415 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -386,9 +387,9 @@ public class HoodieHiveCatalog extends AbstractCatalog {
if
(!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
// read the table config first
final boolean hiveStyle;
- HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path,
hiveConf);
- if (tableConfig != null &&
tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
- hiveStyle =
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+ Option<HoodieTableConfig> tableConfig =
StreamerUtil.getTableConfig(path, hiveConf);
+ if (tableConfig.isPresent() &&
tableConfig.get().contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
+ hiveStyle =
Boolean.parseBoolean(tableConfig.get().getHiveStylePartitioningEnable());
} else {
// fallback to the partition path pattern
Path hoodieTablePath = new Path(path);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index a6bddf1e82f..1e5af896928 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -57,8 +57,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
@@ -287,20 +285,19 @@ public class StreamerUtil {
}
/**
- * Returns the table config or null if the table does not exist.
+ * Returns the table config or empty if the table does not exist.
*/
- @Nullable
- public static HoodieTableConfig getTableConfig(String basePath,
org.apache.hadoop.conf.Configuration hadoopConf) {
+ public static Option<HoodieTableConfig> getTableConfig(String basePath,
org.apache.hadoop.conf.Configuration hadoopConf) {
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
try {
if (fs.exists(metaPath)) {
- return new HoodieTableConfig(fs, metaPath.toString(), null, null);
+ return Option.of(new HoodieTableConfig(fs, metaPath.toString(), null,
null));
}
} catch (IOException e) {
throw new HoodieIOException("Get table config error", e);
}
- return null;
+ return Option.empty();
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
index a66874c4864..6fae13811f4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java
@@ -165,4 +165,24 @@ public class TestRowDataKeyGen {
assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" +
expectedPartition2));
assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" +
expectedPartition3));
}
+
+ @Test
+ void testPrimaryKeylessWrite() {
+ Configuration conf = TestConfigurations.getDefaultConf("path1");
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, "");
+ final RowData rowData1 = insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
+ final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf,
TestConfigurations.ROW_TYPE);
+ assertThat(keyGen1.getRecordKey(rowData1), is("__empty__"));
+
+ // null record key and partition path
+ final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null,
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), null);
+ assertThat(keyGen1.getRecordKey(rowData2), is("__empty__"));
+
+ // empty record key and partition path
+ final RowData rowData3 = insertRow(StringData.fromString(""),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1), StringData.fromString(""));
+ assertThat(keyGen1.getRecordKey(rowData3), is("__empty__"));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f7a35e57f2b..b9964b70f79 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -86,17 +86,26 @@ public class TestHoodieTableFactory {
}
@Test
- void testRequiredOptionsForSource() {
- // miss pk and precombine key will throw exception
+ void testRequiredOptions() {
ResolvedSchema schema1 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.build();
final MockContext sourceContext1 = MockContext.getInstance(this.conf,
schema1, "f2");
- assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSource(sourceContext1));
+
+ // createDynamicTableSource doesn't call sanity check, will not throw
exception
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext1));
+ // miss pk and precombine key will throw exception when create sink
assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext1));
+ // append mode does not throw
+ this.conf.set(FlinkOptions.OPERATION, "insert");
+ final MockContext sourceContext11 = MockContext.getInstance(this.conf,
schema1, "f2");
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext11));
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext11));
+ this.conf.set(FlinkOptions.OPERATION,
FlinkOptions.OPERATION.defaultValue());
+
// a non-exists precombine key will throw exception
ResolvedSchema schema2 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
@@ -105,7 +114,8 @@ public class TestHoodieTableFactory {
.build();
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "non_exist_field");
final MockContext sourceContext2 = MockContext.getInstance(this.conf,
schema2, "f2");
- assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSource(sourceContext2));
+ // createDynamicTableSource doesn't call sanity check, will not throw
exception
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext2));
assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext2));
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD,
FlinkOptions.PRECOMBINE_FIELD.defaultValue());
@@ -120,17 +130,17 @@ public class TestHoodieTableFactory {
HoodieTableSource tableSource = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext3);
HoodieTableSink tableSink = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sourceContext3);
// the precombine field is overwritten
- assertThat(tableSource.getConf().getString(FlinkOptions.PRECOMBINE_FIELD),
is(FlinkOptions.NO_PRE_COMBINE));
assertThat(tableSink.getConf().getString(FlinkOptions.PRECOMBINE_FIELD),
is(FlinkOptions.NO_PRE_COMBINE));
// precombine field not specified, use the default payload clazz
assertThat(tableSource.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME),
is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
assertThat(tableSink.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME),
is(FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue()));
- // given pk but miss the pre combine key with DefaultHoodieRecordPayload
should throw
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
DefaultHoodieRecordPayload.class.getName());
final MockContext sourceContext4 = MockContext.getInstance(this.conf,
schema3, "f2");
- assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSource(sourceContext4));
+ // createDynamicTableSource doesn't call sanity check, will not throw
exception
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext4));
+ // given pk but miss the pre combine key with DefaultHoodieRecordPayload
should throw
assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext4));
this.conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
FlinkOptions.PAYLOAD_CLASS_NAME.defaultValue());
@@ -167,6 +177,74 @@ public class TestHoodieTableFactory {
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext6));
}
+ @Test
+ void testSupplementTableConfig() throws Exception {
+ String tablePath = new File(tempFile.getAbsolutePath(),
"dummy").getAbsolutePath();
+ // add pk and pre-combine key to table config
+ Configuration tableConf = new Configuration();
+ tableConf.setString(FlinkOptions.PATH, tablePath);
+ tableConf.setString(FlinkOptions.TABLE_NAME, "t2");
+ tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1");
+ tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");
+
+ StreamerUtil.initTableIfNotExists(tableConf);
+
+ Configuration writeConf = new Configuration();
+ writeConf.set(FlinkOptions.PATH, tablePath);
+ writeConf.set(FlinkOptions.TABLE_NAME, "t2");
+
+ // fallback to table config
+ ResolvedSchema schema1 = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .build();
+ final MockContext sourceContext1 = MockContext.getInstance(writeConf,
schema1, "f2");
+ HoodieTableSource source1 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext1);
+ HoodieTableSink sink1 = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sourceContext1);
+ assertThat("pk not provided, fallback to table config",
+ source1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+ assertThat("pk not provided, fallback to table config",
+ sink1.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
+ assertThat("pre-combine key not provided, fallback to table config",
+ source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
+ assertThat("pre-combine key not provided, fallback to table config",
+ sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
+
+ // write config always has higher priority
+ // set up a different primary key and pre_combine key with table config
options
+ writeConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0");
+ writeConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f1");
+
+ final MockContext sourceContext2 = MockContext.getInstance(writeConf,
schema1, "f2");
+ HoodieTableSource source2 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext2);
+ HoodieTableSink sink2 = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sourceContext2);
+ assertThat("choose pk from write config",
+ source2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+ assertThat("choose pk from write config",
+ sink2.getConf().get(FlinkOptions.RECORD_KEY_FIELD), is("f0"));
+ assertThat("choose preCombine key from write config",
+ source2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1"));
+ assertThat("choose preCombine pk from write config",
+ sink2.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f1"));
+
+ writeConf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
+ writeConf.removeConfig(FlinkOptions.PRECOMBINE_FIELD);
+
+ // pk defined in table config but missing in schema will throw
+ ResolvedSchema schema2 = SchemaBuilder.instance()
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .build();
+ final MockContext sourceContext3 = MockContext.getInstance(writeConf,
schema2, "f2");
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSource(sourceContext3),
+ "createDynamicTableSource won't call sanity check");
+ assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext3),
+ "f0 is in table config as record key, but missing in input schema");
+ }
+
@Test
void testInferAvroSchemaForSource() {
// infer the schema if not specified