This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 39b16c58c9 [HUDI-4638] Rename payload clazz and preCombine field
options for flink sql (#6434)
39b16c58c9 is described below
commit 39b16c58c9c6792d68bd46264af823e75086b419
Author: Danny Chan <[email protected]>
AuthorDate: Fri Aug 19 10:19:27 2022 +0800
[HUDI-4638] Rename payload clazz and preCombine field options for flink sql
(#6434)
---
.../apache/hudi/configuration/FlinkOptions.java | 79 ++++++++++++----------
1 file changed, 43 insertions(+), 36 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7b78fb8d6a..38dfdd5810 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -67,6 +67,7 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Base Options
// ------------------------------------------------------------------------
+
public static final ConfigOption<String> PATH = ConfigOptions
.key("path")
.stringType()
@@ -79,6 +80,38 @@ public class FlinkOptions extends HoodieConfig {
// Common Options
// ------------------------------------------------------------------------
+ public static final ConfigOption<String> TABLE_NAME = ConfigOptions
+ .key(HoodieWriteConfig.TBL_NAME.key())
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name to register to Hive metastore");
+
+ public static final String TABLE_TYPE_COPY_ON_WRITE =
HoodieTableType.COPY_ON_WRITE.name();
+ public static final String TABLE_TYPE_MERGE_ON_READ =
HoodieTableType.MERGE_ON_READ.name();
+ public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
+ .key("table.type")
+ .stringType()
+ .defaultValue(TABLE_TYPE_COPY_ON_WRITE)
+ .withDescription("Type of table to write. COPY_ON_WRITE (or)
MERGE_ON_READ");
+
+ public static final String NO_PRE_COMBINE = "no_precombine";
+ public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
+ .key("payload.ordering.field")
+ .stringType()
+ .defaultValue("ts")
+ .withFallbackKeys("write.precombine.field")
+ .withDescription("Field used in preCombining before actual write. When
two records have the same\n"
+ + "key value, we will pick the one with the largest value for the
precombine field,\n"
+ + "determined by Object.compareTo(..)");
+
+ public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
+ .key("payload.class")
+ .stringType()
+ .defaultValue(EventTimeAvroPayload.class.getName())
+ .withFallbackKeys("write.payload.class")
+ .withDescription("Payload class used. Override this, if you like to roll
your own merge logic, when upserting/inserting.\n"
+ + "This will render any value set for the option in-effective");
+
public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
ConfigOptions
.key("partition.default_name")
.stringType()
@@ -116,6 +149,7 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------
+
public static final ConfigOption<String> INDEX_TYPE = ConfigOptions
.key("index.type")
.stringType()
@@ -150,6 +184,7 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
+
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
.key("read.tasks")
.intType()
@@ -247,19 +282,6 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
- public static final ConfigOption<String> TABLE_NAME = ConfigOptions
- .key(HoodieWriteConfig.TBL_NAME.key())
- .stringType()
- .noDefaultValue()
- .withDescription("Table name to register to Hive metastore");
-
- public static final String TABLE_TYPE_COPY_ON_WRITE =
HoodieTableType.COPY_ON_WRITE.name();
- public static final String TABLE_TYPE_MERGE_ON_READ =
HoodieTableType.MERGE_ON_READ.name();
- public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
- .key("table.type")
- .stringType()
- .defaultValue(TABLE_TYPE_COPY_ON_WRITE)
- .withDescription("Type of table to write. COPY_ON_WRITE (or)
MERGE_ON_READ");
public static final ConfigOption<Boolean> INSERT_CLUSTER = ConfigOptions
.key("write.insert.cluster")
@@ -275,22 +297,6 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue("upsert")
.withDescription("The write operation, that this write should do");
- public static final String NO_PRE_COMBINE = "no_precombine";
- public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
- .key("write.precombine.field")
- .stringType()
- .defaultValue("ts")
- .withDescription("Field used in preCombining before actual write. When
two records have the same\n"
- + "key value, we will pick the one with the largest value for the
precombine field,\n"
- + "determined by Object.compareTo(..)");
-
- public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
- .key("write.payload.class")
- .stringType()
- .defaultValue(EventTimeAvroPayload.class.getName())
- .withDescription("Payload class used. Override this, if you like to roll
your own merge logic, when upserting/inserting.\n"
- + "This will render any value set for the option in-effective");
-
/**
* Flag to indicate whether to drop duplicates before insert/upsert.
* By default false to gain extra performance.
@@ -395,7 +401,7 @@ public class FlinkOptions extends HoodieConfig {
.key("write.index_bootstrap.tasks")
.intType()
.noDefaultValue()
- .withDescription("Parallelism of tasks that do index bootstrap, default
same as the sink parallelism");
+ .withDescription("Parallelism of tasks that do index bootstrap, default
same as the write task parallelism");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks")
@@ -580,12 +586,12 @@ public class FlinkOptions extends HoodieConfig {
+ "This also directly translates into how much you can incrementally
pull on this table, default 30");
public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions
- .key("clean.retain_hours")
- .intType()
- .defaultValue(24)// default 24 hours
- .withDescription("Number of hours for which commits need to be
retained. This config provides a more flexible option as"
- + "compared to number of commits retained for cleaning
service. Setting this property ensures all the files, but the latest in a file
group,"
- + " corresponding to commits with commit times older than
the configured number of hours to be retained are cleaned.");
+ .key("clean.retain_hours")
+ .intType()
+ .defaultValue(24)// default 24 hours
+ .withDescription("Number of hours for which commits need to be retained.
This config provides a more flexible option as"
+ + "compared to number of commits retained for cleaning service.
Setting this property ensures all the files, but the latest in a file group,"
+ + " corresponding to commits with commit times older than the
configured number of hours to be retained are cleaned.");
public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS =
ConfigOptions
.key("clean.retain_file_versions")
@@ -691,6 +697,7 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------
+
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.booleanType()