This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f897e6d  [HUDI-2551] Support DefaultHoodieRecordPayload for flink 
(#3792)
f897e6d is described below

commit f897e6d73ebc26d32017774d452389023f53f742
Author: Danny Chan <[email protected]>
AuthorDate: Thu Oct 14 13:46:53 2021 +0800

    [HUDI-2551] Support DefaultHoodieRecordPayload for flink (#3792)
---
 .../hudi/execution/FlinkLazyInsertIterable.java    |  2 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  2 +-
 .../hudi/sink/bootstrap/BootstrapOperator.java     |  7 ++++++
 .../bootstrap/batch/BatchBootstrapOperator.java    |  5 ++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |  5 ++++
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 29 ++++++++++++++++++++++
 6 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 8769f63..b0674b2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -65,7 +65,7 @@ public class FlinkLazyInsertIterable<T extends 
HoodieRecordPayload> extends Hood
     try {
       final Schema schema = new 
Schema.Parser().parse(hoodieConfig.getSchema());
       bufferedIteratorExecutor =
-          new 
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new 
IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), 
getTransformFunction(schema));
+          new 
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new 
IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), 
getTransformFunction(schema, hoodieConfig));
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       assert result != null && !result.isEmpty() && 
!bufferedIteratorExecutor.isRemaining();
       return result;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 81bd517..b2359f4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -100,7 +100,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> METADATA_COMPACTION_DELTA_COMMITS 
= ConfigOptions
       .key("metadata.compaction.delta_commits")
       .intType()
-      .defaultValue(24)
+      .defaultValue(10)
       .withDescription("Max delta commits for metadata table to trigger 
compaction, default 24");
 
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 3ac7aa1..0e7bb54 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -129,6 +129,13 @@ public class BootstrapOperator<I, O extends HoodieRecord>
         WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
 
+    preLoadIndexRecords();
+  }
+
+  /**
+   * Load the index records before {@link #processElement}.
+   */
+  protected void preLoadIndexRecords() throws Exception {
     String basePath = hoodieTable.getMetaClient().getBasePath();
     int taskID = getRuntimeContext().getIndexOfThisSubtask();
     LOG.info("Start loading records in table {} into the index state, taskId = 
{}", basePath, taskID);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
index ac4c2b1..258f884 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
@@ -57,6 +57,11 @@ public class BatchBootstrapOperator<I, O extends 
HoodieRecord>
   }
 
   @Override
+  protected void preLoadIndexRecords() {
+    // no operation
+  }
+
+  @Override
   @SuppressWarnings("unchecked")
   public void processElement(StreamRecord<I> element) throws Exception {
     final HoodieRecord record = (HoodieRecord<?>) element.getValue();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index cfa2980..7fb550d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -189,6 +190,10 @@ public class StreamerUtil {
                 .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))
                 
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
                 .build())
+            .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+                
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+                
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+                .build())
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client 
embedded timeline service singleton
             .withAutoCommit(false)
             
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index e31f974..621cd1c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -584,6 +585,34 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(result, "[+I[id1, Sophia, 18, 1970-01-01T00:00:05, 
par1]]");
   }
 
+  @Test
+  void testUpdateWithDefaultHoodieRecordPayload() {
+    TableEnvironment tableEnv = batchTableEnv;
+    String hoodieTableDDL = sql("t1")
+        .field("id int")
+        .field("name string")
+        .field("price double")
+        .field("ts bigint")
+        .pkField("id")
+        .noPartition()
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.PAYLOAD_CLASS_NAME, 
DefaultHoodieRecordPayload.class.getName())
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String insertInto1 = "insert into t1 values\n"
+        + "(1,'a1',20,20)";
+    execInsertSql(tableEnv, insertInto1);
+
+    final String insertInto4 = "insert into t1 values\n"
+        + "(1,'a1',20,1)";
+    execInsertSql(tableEnv, insertInto4);
+
+    List<Row> result = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, "[+I[1, a1, 20.0, 20]]");
+  }
+
   @ParameterizedTest
   @MethodSource("executionModeAndTableTypeParams")
   void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType 
tableType) {

Reply via email to