This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ae820af70d3 [HUDI-8631] Support of `hoodie.populate.meta.fields` for
Flink append mode (#12516)
ae820af70d3 is described below
commit ae820af70d3d6064e2abaafe846cbb54ff271681
Author: Geser Dugarov <[email protected]>
AuthorDate: Sat Dec 21 10:32:11 2024 +0700
[HUDI-8631] Support of `hoodie.populate.meta.fields` for Flink append mode
(#12516)
---
.../io/storage/row/HoodieRowDataCreateHandle.java | 27 ++++++++++-----
.../sink/bucket/BucketBulkInsertWriterHelper.java | 2 +-
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 13 ++++++--
.../apache/hudi/table/ITTestHoodieDataSource.java | 38 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index b08e814d15c..29bd3069922 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -71,6 +71,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
private final Path path;
private final String fileId;
private final boolean preserveHoodieMetadata;
+ private final boolean skipMetadataWrite;
private final HoodieStorage storage;
protected final WriteStatus writeStatus;
private final HoodieRecordLocation newRecordLocation;
@@ -79,7 +80,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig
writeConfig, String partitionPath, String fileId,
String instantTime, int taskPartitionId,
long taskId, long taskEpochId,
- RowType rowType, boolean
preserveHoodieMetadata) {
+ RowType rowType, boolean
preserveHoodieMetadata, boolean skipMetadataWrite) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
@@ -90,6 +91,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
this.fileId = fileId;
this.newRecordLocation = new HoodieRecordLocation(instantTime, fileId);
this.preserveHoodieMetadata = preserveHoodieMetadata;
+ this.skipMetadataWrite = skipMetadataWrite;
this.currTimer = HoodieTimer.start();
this.storage = table.getStorage();
this.path = makeNewPath(partitionPath);
@@ -128,14 +130,21 @@ public class HoodieRowDataCreateHandle implements
Serializable {
*/
public void write(String recordKey, String partitionPath, RowData record)
throws IOException {
try {
- String seqId = preserveHoodieMetadata
- ?
record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
- : HoodieRecord.generateSequenceId(instantTime, taskPartitionId,
SEQGEN.getAndIncrement());
- String commitInstant = preserveHoodieMetadata
- ?
record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
- : instantTime;
- RowData rowData = HoodieRowDataCreation.create(commitInstant, seqId,
recordKey, partitionPath, path.getName(),
- record, writeConfig.allowOperationMetadataField(),
preserveHoodieMetadata);
+ String seqId;
+ String commitInstant;
+ RowData rowData;
+ if (!skipMetadataWrite) {
+ seqId = preserveHoodieMetadata
+ ?
record.getString(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD).toString()
+ : HoodieRecord.generateSequenceId(instantTime, taskPartitionId,
SEQGEN.getAndIncrement());
+ commitInstant = preserveHoodieMetadata
+ ?
record.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD).toString()
+ : instantTime;
+ rowData = HoodieRowDataCreation.create(commitInstant, seqId,
recordKey, partitionPath, path.getName(),
+ record, writeConfig.allowOperationMetadataField(),
preserveHoodieMetadata);
+ } else {
+ rowData = record;
+ }
try {
fileWriter.writeRow(recordKey, rowData);
HoodieRecordDelegate recordDelegate =
writeStatus.isTrackingSuccessfulWrites()
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index 1047a4f5c00..b84c44af832 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -82,7 +82,7 @@ public class BucketBulkInsertWriterHelper extends
BulkInsertWriterHelper {
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index dc0c27d64d2..9b9b85ba6e3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -61,6 +61,9 @@ public class BulkInsertWriterHelper {
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
protected final boolean preserveHoodieMetadata;
+ protected final boolean isAppendMode;
+ // used for Append mode only, if true then only initial row data without
metacolumns is written
+ protected final boolean populateMetaFields;
protected final Boolean isInputSorted;
private final List<WriteStatus> writeStatusList = new ArrayList<>();
protected HoodieRowDataCreateHandle handle;
@@ -92,7 +95,11 @@ public class BulkInsertWriterHelper {
this.taskPartitionId = taskPartitionId;
this.totalSubtaskNum = totalSubtaskNum;
this.taskEpochId = taskEpochId;
- this.rowType = preserveHoodieMetadata ? rowType :
addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch
up with metadata fields
+ this.isAppendMode = OptionsResolver.isAppendMode(conf);
+ this.populateMetaFields = writeConfig.populateMetaFields();
+ this.rowType = preserveHoodieMetadata || (isAppendMode &&
!populateMetaFields)
+ ? rowType
+ : addMetadataFields(rowType,
writeConfig.allowOperationMetadataField());
this.preserveHoodieMetadata = preserveHoodieMetadata;
this.isInputSorted = OptionsResolver.isBulkInsertOperation(conf) &&
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
@@ -140,7 +147,7 @@ public class BulkInsertWriterHelper {
LOG.info("Creating new file for partition path " + partitionPath);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
handles.put(partitionPath, rowCreateHandle);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::increaseNumOfOpenHandle);
@@ -216,7 +223,7 @@ public class BulkInsertWriterHelper {
private HoodieRowDataCreateHandle createWriteHandle(String partitionPath) {
writeMetrics.ifPresent(FlinkStreamWriteMetrics::startHandleCreation);
HoodieRowDataCreateHandle rowCreateHandle = new
HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextFileId(),
- instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata);
+ instantTime, taskPartitionId, totalSubtaskNum, taskEpochId, rowType,
preserveHoodieMetadata, isAppendMode && !populateMetaFields);
writeMetrics.ifPresent(FlinkStreamWriteMetrics::endHandleCreation);
return rowCreateHandle;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 2cc964f1a26..3b5c689554b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,6 +21,8 @@ package org.apache.hudi.table;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
@@ -2320,6 +2322,32 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result4, expected4);
}
+ @ParameterizedTest
+ @MethodSource("parametersForMetaColumnsSkip")
+ void testWriteWithoutMetaColumns(HoodieTableType tableType,
WriteOperationType operation)
+ throws TableNotExistException, InterruptedException {
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false")
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ streamTableEnv.executeSql("drop table t1");
+ hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
@Test
void testReadWithParquetPredicatePushDown() {
TableEnvironment tableEnv = batchTableEnv;
@@ -2419,6 +2447,16 @@ public class ITTestHoodieDataSource {
return Stream.of(data).map(Arguments::of);
}
+ private static Stream<Arguments> parametersForMetaColumnsSkip() {
+ Object[][] data =
+ new Object[][] {
+ {HoodieTableType.COPY_ON_WRITE, WriteOperationType.INSERT}
+ // add MOR upsert check after fixing of HUDI-8785
+ // {HoodieTableType.MERGE_ON_READ, WriteOperationType.UPSERT}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish