This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1ad0e9560e [HUDI-4529] Tweak some default config options for flink
(#6287)
1ad0e9560e is described below
commit 1ad0e9560e4805b682fe661d78f2ad0f2fa1025b
Author: Danny Chan <[email protected]>
AuthorDate: Wed Aug 17 14:20:36 2022 +0800
[HUDI-4529] Tweak some default config options for flink (#6287)
---
.../apache/hudi/configuration/FlinkOptions.java | 10 +++----
.../apache/hudi/streamer/FlinkStreamerConfig.java | 6 ++--
.../org/apache/hudi/table/HoodieTableFactory.java | 34 ++++++++++------------
.../apache/hudi/table/TestHoodieTableFactory.java | 19 +++++++++---
4 files changed, 38 insertions(+), 31 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 164106a4e8..3638113288 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -22,13 +22,13 @@ import
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringP
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -287,7 +287,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
.key("write.payload.class")
.stringType()
- .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+ .defaultValue(EventTimeAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll
your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
@@ -718,7 +718,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
- .defaultValue("jdbc")
+ .defaultValue("hms")
.withDescription("Mode to choose for Hive ops. Valid values are hms,
jdbc and hiveql, default 'jdbc'");
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
@@ -754,7 +754,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String>
HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions
.key("hive_sync.partition_extractor_class")
.stringType()
-
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
+ .defaultValue(MultiPartKeysValueExtractor.class.getName())
.withDescription("Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'");
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 3ba1c6230f..3447a23851 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -24,7 +24,7 @@ import
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.util.FlinkStateBackendConverter;
import org.apache.hudi.util.StreamerUtil;
@@ -321,8 +321,8 @@ public class FlinkStreamerConfig extends Configuration {
public String hiveSyncPartitionFields = "";
@Parameter(names = {"--hive-sync-partition-extractor-class"}, description =
"Tool to extract the partition value from HDFS path, "
- + "default 'SlashEncodedDayPartitionValueExtractor'")
- public String hiveSyncPartitionExtractorClass =
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
+ + "default 'MultiPartKeysValueExtractor'")
+ public String hiveSyncPartitionExtractorClass =
MultiPartKeysValueExtractor.class.getCanonicalName();
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description =
"Assume partitioning is yyyy/mm/dd, default false")
public Boolean hiveSyncAssumeDatePartition = false;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 51bbb2dc87..1cf66ea343 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,12 +19,10 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
-import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -38,6 +36,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -71,7 +70,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
Configuration conf =
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
- setupConfOptions(conf, context.getObjectIdentifier().getObjectName(),
context.getCatalogTable(), schema);
+ setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
@@ -90,7 +89,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
"Option [path] should not be empty.");
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
- setupConfOptions(conf, context.getObjectIdentifier().getObjectName(),
context.getCatalogTable(), schema);
+ setupConfOptions(conf, context.getObjectIdentifier(),
context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}
@@ -154,35 +153,30 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
throw new HoodieValidationException("Field " + preCombineField + "
does not exist in the table schema."
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "'
option.");
}
- } else if (FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.PAYLOAD_CLASS_NAME)) {
- // if precombine field is specified but payload clazz is default,
- // use DefaultHoodieRecordPayload to make sure the precombine field is
always taken for
- // comparing.
- conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
EventTimeAvroPayload.class.getName());
}
}
/**
- * Sets up the config options based on the table definition, for e.g the
table name, primary key.
+ * Sets up the config options based on the table definition, for e.g, the
table name, primary key.
*
- * @param conf The configuration to setup
- * @param tableName The table name
+ * @param conf The configuration to set up
+ * @param tablePath The table path
* @param table The catalog table
* @param schema The physical schema
*/
private static void setupConfOptions(
Configuration conf,
- String tableName,
+ ObjectIdentifier tablePath,
CatalogTable table,
ResolvedSchema schema) {
// table name
- conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
+ conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// compaction options
setupCompactionOptions(conf);
// hive options
- setupHiveOptions(conf);
+ setupHiveOptions(conf, tablePath);
// read options
setupReadOptions(conf);
// write options
@@ -309,10 +303,12 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
/**
* Sets up the hive options from the table definition.
*/
- private static void setupHiveOptions(Configuration conf) {
- if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
- && FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
- conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME,
MultiPartKeysValueExtractor.class.getName());
+ private static void setupHiveOptions(Configuration conf, ObjectIdentifier
tablePath) {
+ if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
+ conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName());
+ }
+ if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
+ conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f27ab4ca53..f7a35e57f2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -240,15 +239,21 @@ public class TestHoodieTableFactory {
final MockContext sourceContext1 = MockContext.getInstance(this.conf,
schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
+ assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
+ assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
is(MultiPartKeysValueExtractor.class.getName()));
// set up hive style partitioning is true.
+ this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
+ this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
final MockContext sourceContext2 = MockContext.getInstance(this.conf,
schema1, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
-
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
is(SlashEncodedDayPartitionValueExtractor.class.getName()));
+ assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
+ assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
is(MultiPartKeysValueExtractor.class.getName()));
}
@Test
@@ -430,15 +435,21 @@ public class TestHoodieTableFactory {
final MockContext sinkContext1 = MockContext.getInstance(this.conf,
schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
+ assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
+ assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
is(MultiPartKeysValueExtractor.class.getName()));
// set up hive style partitioning is true.
+ this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
+ this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
final MockContext sinkContext2 = MockContext.getInstance(this.conf,
schema1, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
-
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
is(SlashEncodedDayPartitionValueExtractor.class.getName()));
+ assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
+ assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
+
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
is(MultiPartKeysValueExtractor.class.getName()));
}
@Test
@@ -542,7 +553,7 @@ public class TestHoodieTableFactory {
@Override
public ObjectIdentifier getObjectIdentifier() {
- return ObjectIdentifier.of("hudi", "default", "t1");
+ return ObjectIdentifier.of("hudi", "db1", "t1");
}
@Override