This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae820af70d3 [HUDI-8631] Support of `hoodie.populate.meta.fields` for 
Flink append mode (#12516)
ae820af70d3 is described below

commit ae820af70d3d6064e2abaafe846cbb54ff271681
Author: Geser Dugarov <[email protected]>
AuthorDate: Sat Dec 21 10:32:11 2024 +0700

    [HUDI-8631] Support of `hoodie.populate.meta.fields` for Flink append mode 
(#12516)
---
 .../io/storage/row/HoodieRowDataCreateHandle.java  | 27 ++++++++++-----
 .../sink/bucket/BucketBulkInsertWriterHelper.java  |  2 +-
 .../hudi/sink/bulk/BulkInsertWriterHelper.java     | 13 ++++++--
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 38 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index b08e814d15c..29bd3069922 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -71,6 +71,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
   private final Path path;
   private final String fileId;
   private final boolean preserveHoodieMetadata;
+  private final boolean skipMetadataWrite;
   private final HoodieStorage storage;
   protected final WriteStatus writeStatus;
   private final HoodieRecordLocation newRecordLocation;
@@ -79,7 +80,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
 
   public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
                                    String instantTime, int taskPartitionId, 
long taskId, long taskEpochId,
-                                   RowType rowType, boolean 
preserveHoodieMetadata) {
+                                   RowType rowType, boolean 
preserveHoodieMetadata, boolean skipMetadataWrite) {
     this.partitionPath = partitionPath;
     this.table = table;
     this.writeConfig = writeConfig;
@@ -90,6 +91,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     this.fileId = fileId;
     this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
     this.preserveHoodieMetadata = preserveHoodieMetadata;
+    this.skipMetadataWrite = skipMetadataWrite;
     this.currTimer = HoodieTimer.start();
     this.storage = table.getStorage();
     this.path = makeNewPath(partitionPath);
@@ -128,14 +130,21 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
    */
   public void write(String recordKey, String partitionPath, RowData record) 
throws IOException {
     try {
-      String seqId = preserveHoodieMetadata
-          ? 
record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
-          : HoodieRecord.generateSequenceId(instantTime, taskPartitionId, 
SEQGEN.getAndIncrement());
-      String commitInstant = preserveHoodieMetadata
-          ? 
record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
-          : instantTime;
-      RowData rowData = HoodieRowDataCreation.create(commitInstant, seqId, 
recordKey, partitionPath, path.getName(),
-          record, writeConfig.allowOperationMetadataField(), 
preserveHoodieMetadata);
+      String seqId;
+      String commitInstant;
+      RowData rowData;
+      if (!skipMetadataWrite) {
+        seqId = preserveHoodieMetadata
+            ? 
record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
+            : HoodieRecord.generateSequenceId(instantTime, taskPartitionId, 
SEQGEN.getAndIncrement());
+        commitInstant = preserveHoodieMetadata
+            ? 
record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
+            : instantTime;
+        rowData = HoodieRowDataCreation.create(commitInstant, seqId, 
recordKey, partitionPath, path.getName(),
+            record, writeConfig.allowOperationMetadataField(), 
preserveHoodieMetadata);
+      } else {
+        rowData = record;
+      }
       try {
         fileWriter.writeRow(recordKey, rowData);
         HoodieRecordDelegate recordDelegate = 
writeStatus.isTrackingSuccessfulWrites()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 1047a4f5c00..b84c44af832 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -82,7 +82,7 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
         close();
       }
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
-          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
       handles.put(fileId, rowCreateHandle);
     }
     return handles.get(fileId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index dc0c27d64d2..9b9b85ba6e3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -61,6 +61,9 @@ public class BulkInsertWriterHelper {
   protected final HoodieWriteConfig writeConfig;
   protected final RowType rowType;
   protected final boolean preserveHoodieMetadata;
+  protected final boolean isAppendMode;
+  // used for Append mode only, if true then only initial row data without 
metacolumns is written
+  protected final boolean populateMetaFields;
   protected final Boolean isInputSorted;
   private final List<WriteStatus> writeStatusList = new ArrayList<>();
   protected HoodieRowDataCreateHandle handle;
@@ -92,7 +95,11 @@ public class BulkInsertWriterHelper {
     this.taskPartitionId = taskPartitionId;
     this.totalSubtaskNum = totalSubtaskNum;
     this.taskEpochId = taskEpochId;
-    this.rowType = preserveHoodieMetadata ? rowType : 
addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch 
up with metadata fields
+    this.isAppendMode = OptionsResolver.isAppendMode(conf);
+    this.populateMetaFields = writeConfig.populateMetaFields();
+    this.rowType = preserveHoodieMetadata || (isAppendMode && 
!populateMetaFields)
+        ? rowType
+        : addMetadataFields(rowType, 
writeConfig.allowOperationMetadataField());
     this.preserveHoodieMetadata = preserveHoodieMetadata;
     this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) && 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
     this.fileIdPrefix = UUID.randomUUID().toString();
@@ -140,7 +147,7 @@ public class BulkInsertWriterHelper {
       LOG.info("Creating new file for partition path " + partitionPath);
       writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
       HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
+          instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
       handles.put(partitionPath, rowCreateHandle);
 
       writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
@@ -216,7 +223,7 @@ public class BulkInsertWriterHelper {
   private HoodieRowDataCreateHandle createWriteHandle(String  partitionPath) {
     writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
     HoodieRowDataCreateHandle rowCreateHandle = new 
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextFileId(),
-        instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata);
+        instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType, 
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
     writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
     return rowCreateHandle;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 2cc964f1a26..3b5c689554b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,6 +21,8 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -2320,6 +2322,32 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result4, expected4);
   }
 
+  @ParameterizedTest
+  @MethodSource("parametersForMetaColumnsSkip")
+  void testWriteWithoutMetaColumns(HoodieTableType tableType, 
WriteOperationType operation)
+      throws TableNotExistException, InterruptedException {
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false")
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    streamTableEnv.executeSql("drop table t1");
+    hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   @Test
   void testReadWithParquetPredicatePushDown() {
     TableEnvironment tableEnv = batchTableEnv;
@@ -2419,6 +2447,16 @@ public class ITTestHoodieDataSource {
     return Stream.of(data).map(Arguments::of);
   }
 
+  private static Stream<Arguments> parametersForMetaColumnsSkip() {
+    Object[][] data =
+        new Object[][] {
+            {HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT}
+            // add MOR upsert check after fixing of HUDI-8785
+            // {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT}
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private void execInsertSql(TableEnvironment tEnv, String insert) {
     TableResult tableResult = tEnv.executeSql(insert);
     // wait to finish

Reply via email to