This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f897e6d [HUDI-2551] Support DefaultHoodieRecordPayload for flink
(#3792)
f897e6d is described below
commit f897e6d73ebc26d32017774d452389023f53f742
Author: Danny Chan <[email protected]>
AuthorDate: Thu Oct 14 13:46:53 2021 +0800
[HUDI-2551] Support DefaultHoodieRecordPayload for flink (#3792)
---
.../hudi/execution/FlinkLazyInsertIterable.java | 2 +-
.../apache/hudi/configuration/FlinkOptions.java | 2 +-
.../hudi/sink/bootstrap/BootstrapOperator.java | 7 ++++++
.../bootstrap/batch/BatchBootstrapOperator.java | 5 ++++
.../java/org/apache/hudi/util/StreamerUtil.java | 5 ++++
.../apache/hudi/table/HoodieDataSourceITCase.java | 29 ++++++++++++++++++++++
6 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 8769f63..b0674b2 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -65,7 +65,7 @@ public class FlinkLazyInsertIterable<T extends
HoodieRecordPayload> extends Hood
try {
final Schema schema = new
Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
- new
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new
IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()),
getTransformFunction(schema));
+ new
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new
IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()),
getTransformFunction(schema, hoodieConfig));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() &&
!bufferedIteratorExecutor.isRemaining();
return result;
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 81bd517..b2359f4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -100,7 +100,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS
= ConfigOptions
.key("metadata.compaction.delta_commits")
.intType()
- .defaultValue(24)
+ .defaultValue(10)
.withDescription("Max delta commits for metadata table to trigger
compaction, default 24");
// ------------------------------------------------------------------------
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 3ac7aa1..0e7bb54 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -129,6 +129,13 @@ public class BootstrapOperator<I, O extends HoodieRecord>
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
+ preLoadIndexRecords();
+ }
+
+ /**
+ * Load the index records before {@link #processElement}.
+ */
+ protected void preLoadIndexRecords() throws Exception {
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId =
{}", basePath, taskID);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
index ac4c2b1..258f884 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
@@ -57,6 +57,11 @@ public class BatchBootstrapOperator<I, O extends
HoodieRecord>
}
@Override
+ protected void preLoadIndexRecords() {
+ // no operation
+ }
+
+ @Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<I> element) throws Exception {
final HoodieRecord record = (HoodieRecord<?>) element.getValue();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index cfa2980..7fb550d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -189,6 +190,10 @@ public class StreamerUtil {
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
+ .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+ .build())
.withEmbeddedTimelineServerReuseEnabled(true) // make write client
embedded timeline service singleton
.withAutoCommit(false)
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index e31f974..621cd1c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
@@ -584,6 +585,34 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05,
par1]]");
}
+ @Test
+ void testUpdateWithDefaultHoodieRecordPayload() {
+ TableEnvironment tableEnv = batchTableEnv;
+ String hoodieTableDDL = sql("t1")
+ .field("id int")
+ .field("name string")
+ .field("price double")
+ .field("ts bigint")
+ .pkField("id")
+ .noPartition()
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.PAYLOAD_CLASS_NAME,
DefaultHoodieRecordPayload.class.getName())
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String insertInto1 = "insert into t1 values\n"
+ + "(1,'a1',20,20)";
+ execInsertSql(tableEnv, insertInto1);
+
+ final String insertInto4 = "insert into t1 values\n"
+ + "(1,'a1',20,1)";
+ execInsertSql(tableEnv, insertInto4);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, "[+I[1, a1, 20.0, 20]]");
+ }
+
@ParameterizedTest
@MethodSource("executionModeAndTableTypeParams")
void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType
tableType) {