This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f3d4ce919d [HUDI-4916] Implement change log feed for Flink (#6840)
f3d4ce919d is described below
commit f3d4ce919d4909f9533255ee2a9a0450c8e44c73
Author: Danny Chan <[email protected]>
AuthorDate: Sat Oct 1 18:21:23 2022 +0800
[HUDI-4916] Implement change log feed for Flink (#6840)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 5 +
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 2 +-
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 8 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 50 +-
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 6 -
.../FlinkMergeAndReplaceHandleWithChangeLog.java} | 51 +-
.../hudi/io/FlinkMergeHandleWithChangeLog.java} | 52 +-
.../apache/hudi/io/FlinkWriteHandleFactory.java | 283 ++++++++
.../java/org/apache/hudi/io/MiniBatchHandle.java | 8 -
.../hudi/common/model/WriteOperationType.java | 14 +
.../hudi/common/table/HoodieTableConfig.java | 4 +-
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 100 ++-
.../hudi/common/table/cdc/HoodieCDCFileSplit.java | 31 +-
.../cdc/HoodieCDCSupplementalLoggingMode.java | 4 +-
.../table/log/HoodieCDCLogRecordIterator.java | 14 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 6 +-
.../apache/hudi/configuration/FlinkOptions.java | 22 +
.../apache/hudi/configuration/OptionsResolver.java | 9 +
.../apache/hudi/source/IncrementalInputSplits.java | 114 +++-
.../hudi/source/StreamReadMonitoringFunction.java | 11 +-
.../org/apache/hudi/table/HoodieTableSource.java | 34 +-
.../org/apache/hudi/table/format/FormatUtils.java | 20 +
.../hudi/table/format/cdc/CdcInputFormat.java | 723 +++++++++++++++++++++
.../hudi/table/format/cdc/CdcInputSplit.java | 61 ++
.../table/format/mor/MergeOnReadInputFormat.java | 166 +++--
.../table/format/mor/MergeOnReadInputSplit.java | 2 +-
.../org/apache/hudi/util/RowDataProjection.java | 1 +
.../java/org/apache/hudi/util/StreamerUtil.java | 4 +
.../apache/hudi/table/ITTestHoodieDataSource.java | 36 +
.../apache/hudi/table/format/TestInputFormat.java | 95 ++-
.../test/java/org/apache/hudi/utils/TestData.java | 30 +
.../scala/org/apache/hudi/cdc/CDCRelation.scala | 15 +-
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 32 +-
33 files changed, 1732 insertions(+), 281 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index fd7972a719..3b3995c8f8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -975,6 +975,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(KEYGENERATOR_CLASS_NAME);
}
+ public boolean isCDCEnabled() {
+ return getBooleanOrDefault(
+ HoodieTableConfig.CDC_ENABLED,
HoodieTableConfig.CDC_ENABLED.defaultValue());
+ }
+
public boolean isConsistentLogicalTimestampEnabled() {
return
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index f57b195c76..303eea76db 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -95,7 +95,7 @@ public class HoodieCDCLogger implements Closeable {
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.cdcWriter = cdcWriter;
- this.cdcSupplementalLoggingMode = config.getCDCSupplementalLoggingMode();
+ this.cdcSupplementalLoggingMode =
tableConfig.cdcSupplementalLoggingMode();
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 12e48ffbb4..910bc42158 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
@@ -75,14 +76,17 @@ public class HoodieMergeHandleWithChangeLog<T extends
HoodieRecordPayload, I, K,
protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
indexedRecord);
if (result) {
- cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+ boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+ cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() :
indexedRecord);
}
return result;
}
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord> insertRecord) {
super.writeInsertRecord(hoodieRecord, insertRecord);
- cdcLogger.put(hoodieRecord, null, insertRecord);
+ if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+ cdcLogger.put(hoodieRecord, null, insertRecord);
+ }
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 53a5799508..191eb003b9 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -27,10 +27,8 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -46,12 +44,7 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.io.FlinkAppendHandle;
-import org.apache.hudi.io.FlinkConcatAndReplaceHandle;
-import org.apache.hudi.io.FlinkConcatHandle;
-import org.apache.hudi.io.FlinkCreateHandle;
-import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
-import org.apache.hudi.io.FlinkMergeHandle;
+import org.apache.hudi.io.FlinkWriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
@@ -566,41 +559,12 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
Iterator<HoodieRecord<T>> recordItr) {
- final HoodieRecordLocation loc = record.getCurrentLocation();
- final String fileID = loc.getFileId();
- final String partitionPath = record.getPartitionPath();
- final boolean insertClustering = config.allowDuplicateInserts();
-
- if (bucketToHandles.containsKey(fileID)) {
- MiniBatchHandle lastHandle = (MiniBatchHandle)
bucketToHandles.get(fileID);
- if (lastHandle.shouldReplace()) {
- HoodieWriteHandle<?, ?, ?, ?> writeHandle = insertClustering
- ? new FlinkConcatAndReplaceHandle<>(config, instantTime, table,
recordItr, partitionPath, fileID,
- table.getTaskContextSupplier(), lastHandle.getWritePath())
- : new FlinkMergeAndReplaceHandle<>(config, instantTime, table,
recordItr, partitionPath, fileID,
- table.getTaskContextSupplier(), lastHandle.getWritePath());
- this.bucketToHandles.put(fileID, writeHandle); // override with new
replace handle
- return writeHandle;
- }
- }
-
- final boolean isDelta =
table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
- final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
- if (isDelta) {
- writeHandle = new FlinkAppendHandle<>(config, instantTime, table,
partitionPath, fileID, recordItr,
- table.getTaskContextSupplier());
- } else if (loc.getInstantTime().equals("I")) {
- writeHandle = new FlinkCreateHandle<>(config, instantTime, table,
partitionPath,
- fileID, table.getTaskContextSupplier());
- } else {
- writeHandle = insertClustering
- ? new FlinkConcatHandle<>(config, instantTime, table, recordItr,
partitionPath,
- fileID, table.getTaskContextSupplier())
- : new FlinkMergeHandle<>(config, instantTime, table, recordItr,
partitionPath,
- fileID, table.getTaskContextSupplier());
- }
- this.bucketToHandles.put(fileID, writeHandle);
- return writeHandle;
+ // caution: it's not a good practice to modify the handles internal.
+ FlinkWriteHandleFactory.Factory<T,
+ List<HoodieRecord<T>>,
+ List<HoodieKey>,
+ List<WriteStatus>> writeHandleFactory =
FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(),
config);
+ return writeHandleFactory.create(this.bucketToHandles, record, config,
instantTime, table, recordItr);
}
public HoodieFlinkTable<T> getHoodieTable() {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index b514896aa1..2258375fdd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -120,10 +120,4 @@ public class FlinkAppendHandle<T extends
HoodieRecordPayload, I, K, O>
public Path getWritePath() {
return writer.getLogFile().getPath();
}
-
- @Override
- public boolean shouldReplace() {
- // log files can append new data buffer directly
- return false;
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
similarity index 56%
copy from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
copy to
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 12e48ffbb4..62b5481cf9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,49 +20,36 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
/**
- * A merge handle that supports logging change logs.
+ * A flink merge and replace handle that supports logging change logs.
+ *
+ * <p>The cdc about logic is copied from {@link
HoodieMergeHandleWithChangeLog},
+ * we should refactor it out when there are good abstractions.
*/
-public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I,
K, O> extends HoodieMergeHandle<T, I, K, O> {
- protected final HoodieCDCLogger cdcLogger;
+public class FlinkMergeAndReplaceHandleWithChangeLog<T extends
HoodieRecordPayload, I, K, O>
+ extends FlinkMergeAndReplaceHandle<T, I, K, O> {
+ private final HoodieCDCLogger cdcLogger;
- public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr,
String partitionPath, String fileId,
- TaskContextSupplier
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
- super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier, keyGeneratorOpt);
- this.cdcLogger = new HoodieCDCLogger(
- instantTime,
- config,
- hoodieTable.getMetaClient().getTableConfig(),
- tableSchema,
- createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
- IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
- }
-
- /**
- * Called by compactor code path.
- */
- public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
- Map<String, HoodieRecord<T>>
keyToNewRecords, String partitionPath, String fileId,
- HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
- super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath,
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config,
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>>
recordItr, String partitionPath, String fileId,
+ TaskContextSupplier
taskContextSupplier, Path basePath) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier, basePath);
this.cdcLogger = new HoodieCDCLogger(
instantTime,
config,
@@ -75,22 +62,24 @@ public class HoodieMergeHandleWithChangeLog<T extends
HoodieRecordPayload, I, K,
protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
indexedRecord);
if (result) {
- cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+ boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+ cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() :
indexedRecord);
}
return result;
}
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord> insertRecord) {
super.writeInsertRecord(hoodieRecord, insertRecord);
- cdcLogger.put(hoodieRecord, null, insertRecord);
+ if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+ cdcLogger.put(hoodieRecord, null, insertRecord);
+ }
}
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
- Option<AppendResult> cdcResult =
- HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten,
insertRecordsWritten);
+ Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(),
cdcResult, partitionPath, fs);
return writeStatuses;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
similarity index 56%
copy from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
copy to
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 12e48ffbb4..f6adbbf0d4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,49 +20,39 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
/**
- * A merge handle that supports logging change logs.
+ * A flink merge handle that supports logging change logs.
+ *
+ * <p>The cdc about logic is copied from {@link
HoodieMergeHandleWithChangeLog},
+ * we should refactor it out when there are good abstractions.
*/
-public class HoodieMergeHandleWithChangeLog<T extends HoodieRecordPayload, I,
K, O> extends HoodieMergeHandle<T, I, K, O> {
- protected final HoodieCDCLogger cdcLogger;
+public class FlinkMergeHandleWithChangeLog<T extends HoodieRecordPayload, I,
K, O>
+ extends FlinkMergeHandle<T, I, K, O> {
+ private final HoodieCDCLogger cdcLogger;
- public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
- Iterator<HoodieRecord<T>> recordItr,
String partitionPath, String fileId,
- TaskContextSupplier
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) {
- super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier, keyGeneratorOpt);
- this.cdcLogger = new HoodieCDCLogger(
- instantTime,
- config,
- hoodieTable.getMetaClient().getTableConfig(),
- tableSchema,
- createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
- IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
- }
+ private static final Logger LOG =
LogManager.getLogger(FlinkMergeHandleWithChangeLog.class);
- /**
- * Called by compactor code path.
- */
- public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
- Map<String, HoodieRecord<T>>
keyToNewRecords, String partitionPath, String fileId,
- HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator>
keyGeneratorOpt) {
- super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath,
fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr,
String partitionPath, String fileId,
+ TaskContextSupplier
taskContextSupplier) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier);
this.cdcLogger = new HoodieCDCLogger(
instantTime,
config,
@@ -75,22 +65,24 @@ public class HoodieMergeHandleWithChangeLog<T extends
HoodieRecordPayload, I, K,
protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
final boolean result = super.writeUpdateRecord(hoodieRecord, oldRecord,
indexedRecord);
if (result) {
- cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
+ boolean isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
+ cdcLogger.put(hoodieRecord, oldRecord, isDelete ? Option.empty() :
indexedRecord);
}
return result;
}
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord,
Option<IndexedRecord> insertRecord) {
super.writeInsertRecord(hoodieRecord, insertRecord);
- cdcLogger.put(hoodieRecord, null, insertRecord);
+ if (!HoodieOperation.isDelete(hoodieRecord.getOperation())) {
+ cdcLogger.put(hoodieRecord, null, insertRecord);
+ }
}
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
- Option<AppendResult> cdcResult =
- HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten,
insertRecordsWritten);
+ Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(),
cdcResult, partitionPath, fs);
return writeStatuses;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
new file mode 100644
index 0000000000..fbc1c7ec55
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Factory clazz for flink write handles.
+ */
+public class FlinkWriteHandleFactory {
+
+ /**
+ * Returns the write handle factory with given write config.
+ */
+ public static <T extends HoodieRecordPayload, I, K, O> Factory<T, I, K, O>
getFactory(
+ HoodieTableConfig tableConfig,
+ HoodieWriteConfig writeConfig) {
+ if (writeConfig.allowDuplicateInserts()) {
+ return ClusterWriteHandleFactory.getInstance();
+ }
+ if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
+ return DeltaCommitWriteHandleFactory.getInstance();
+ } else if (tableConfig.isCDCEnabled()) {
+ return CdcWriteHandleFactory.getInstance();
+ } else {
+ return CommitWriteHandleFactory.getInstance();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ public interface Factory<T extends HoodieRecordPayload, I, K, O> {
+ /**
+ * Get or create a new write handle in order to reuse the file handles.
+ *
+ * <p>CAUTION: the method is not thread safe.
+ *
+ * @param bucketToHandles The existing write handles
+ * @param record The first record in the bucket
+ * @param config Write config
+ * @param instantTime The instant time
+ * @param table The table
+ * @param recordItr Record iterator
+ *
+ * @return Existing write handle or create a new one
+ */
+ HoodieWriteHandle<?, ?, ?, ?> create(
+ final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
+ HoodieRecord<T> record,
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr
+ );
+ }
+
+ /**
+ * Base clazz for commit write handle factory,
+ * it encapsulates the handle switching logic: INSERT OR UPSERT.
+ */
+ private abstract static class BaseCommitWriteHandleFactory<T extends
HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+ @Override
+ public HoodieWriteHandle<?, ?, ?, ?> create(
+ Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
+ HoodieRecord<T> record,
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr) {
+ final HoodieRecordLocation loc = record.getCurrentLocation();
+ final String fileID = loc.getFileId();
+ final String partitionPath = record.getPartitionPath();
+
+ if (bucketToHandles.containsKey(fileID)) {
+ MiniBatchHandle lastHandle = (MiniBatchHandle)
bucketToHandles.get(fileID);
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle =
+ createReplaceHandle(config, instantTime, table, recordItr,
partitionPath, fileID, lastHandle.getWritePath());
+ bucketToHandles.put(fileID, writeHandle); // override with new replace
handle
+ return writeHandle;
+ }
+
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
+ if (loc.getInstantTime().equals("I")) {
+ writeHandle = new FlinkCreateHandle<>(config, instantTime, table,
partitionPath,
+ fileID, table.getTaskContextSupplier());
+ } else {
+ writeHandle = createMergeHandle(config, instantTime, table, recordItr,
partitionPath, fileID);
+ }
+ bucketToHandles.put(fileID, writeHandle);
+ return writeHandle;
+ }
+
+ protected abstract HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId,
+ Path basePath);
+
+ protected abstract HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId);
+ }
+
+ /**
+ * Write handle factory for commit.
+ */
+ private static class CommitWriteHandleFactory<T extends HoodieRecordPayload,
I, K, O>
+ extends BaseCommitWriteHandleFactory<T, I, K, O> {
+ private static final CommitWriteHandleFactory<?, ?, ?, ?> INSTANCE = new
CommitWriteHandleFactory<>();
+
+ @SuppressWarnings("unchecked")
+ public static <T extends HoodieRecordPayload, I, K, O>
CommitWriteHandleFactory<T, I, K, O> getInstance() {
+ return (CommitWriteHandleFactory<T, I, K, O>) INSTANCE;
+ }
+
+ @Override
+ protected HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId,
+ Path basePath) {
+ return new FlinkMergeAndReplaceHandle<>(config, instantTime, table,
recordItr, partitionPath, fileId,
+ table.getTaskContextSupplier(), basePath);
+ }
+
+ @Override
+ protected HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId) {
+ return new FlinkMergeHandle<>(config, instantTime, table, recordItr,
partitionPath,
+ fileId, table.getTaskContextSupplier());
+ }
+ }
+
+ /**
+ * Write handle factory for inline clustering.
+ */
+ private static class ClusterWriteHandleFactory<T extends
HoodieRecordPayload, I, K, O>
+ extends BaseCommitWriteHandleFactory<T, I, K, O> {
+ private static final ClusterWriteHandleFactory<?, ?, ?, ?> INSTANCE = new
ClusterWriteHandleFactory<>();
+
+ @SuppressWarnings("unchecked")
+ public static <T extends HoodieRecordPayload, I, K, O>
ClusterWriteHandleFactory<T, I, K, O> getInstance() {
+ return (ClusterWriteHandleFactory<T, I, K, O>) INSTANCE;
+ }
+
+ @Override
+ protected HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId,
+ Path basePath) {
+ return new FlinkConcatAndReplaceHandle<>(config, instantTime, table,
recordItr, partitionPath, fileId,
+ table.getTaskContextSupplier(), basePath);
+ }
+
+ @Override
+ protected HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId) {
+ return new FlinkConcatHandle<>(config, instantTime, table, recordItr,
partitionPath,
+ fileId, table.getTaskContextSupplier());
+ }
+ }
+
+ /**
+ * Write handle factory for commit, the write handle supports logging change
logs.
+ */
+ private static class CdcWriteHandleFactory<T extends HoodieRecordPayload, I,
K, O>
+ extends BaseCommitWriteHandleFactory<T, I, K, O> {
+ private static final CdcWriteHandleFactory<?, ?, ?, ?> INSTANCE = new
CdcWriteHandleFactory<>();
+
+ @SuppressWarnings("unchecked")
+ public static <T extends HoodieRecordPayload, I, K, O>
CdcWriteHandleFactory<T, I, K, O> getInstance() {
+ return (CdcWriteHandleFactory<T, I, K, O>) INSTANCE;
+ }
+
+ @Override
+ protected HoodieWriteHandle<?, ?, ?, ?> createReplaceHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId,
+ Path basePath) {
+ return new FlinkMergeAndReplaceHandleWithChangeLog<>(config,
instantTime, table, recordItr, partitionPath, fileId,
+ table.getTaskContextSupplier(), basePath);
+ }
+
+ @Override
+ protected HoodieWriteHandle<?, ?, ?, ?> createMergeHandle(
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr,
+ String partitionPath,
+ String fileId) {
+ return new FlinkMergeHandleWithChangeLog<>(config, instantTime, table,
recordItr, partitionPath,
+ fileId, table.getTaskContextSupplier());
+ }
+ }
+
+ /**
+ * Write handle factory for delta commit.
+ */
+ private static class DeltaCommitWriteHandleFactory<T extends
HoodieRecordPayload, I, K, O> implements Factory<T, I, K, O> {
+ private static final DeltaCommitWriteHandleFactory<?, ?, ?, ?> INSTANCE =
new DeltaCommitWriteHandleFactory<>();
+
+ @SuppressWarnings("unchecked")
+ public static <T extends HoodieRecordPayload, I, K, O>
DeltaCommitWriteHandleFactory<T, I, K, O> getInstance() {
+ return (DeltaCommitWriteHandleFactory<T, I, K, O>) INSTANCE;
+ }
+
+ @Override
+ public HoodieWriteHandle<?, ?, ?, ?> create(
+ Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles,
+ HoodieRecord<T> record,
+ HoodieWriteConfig config,
+ String instantTime,
+ HoodieTable<T, I, K, O> table,
+ Iterator<HoodieRecord<T>> recordItr) {
+ final String fileID = record.getCurrentLocation().getFileId();
+ final String partitionPath = record.getPartitionPath();
+
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle =
+ new FlinkAppendHandle<>(config, instantTime, table, partitionPath,
fileID, recordItr,
+ table.getTaskContextSupplier());
+ bucketToHandles.put(fileID, writeHandle);
+ return writeHandle;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
index 7d3d7fa5ff..91b8f6630c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -43,12 +43,4 @@ public interface MiniBatchHandle {
* Returns the write file path.
*/
Path getWritePath();
-
- /**
- * Whether the old write file should be replaced with the same name new file
- * using content merged with incremental new data batch.
- */
- default boolean shouldReplace() {
- return true;
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index f2f3809cf5..e0574d5119 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -118,4 +118,18 @@ public enum WriteOperationType {
public static boolean isOverwrite(WriteOperationType operationType) {
return operationType == INSERT_OVERWRITE || operationType ==
INSERT_OVERWRITE_TABLE;
}
+
+ /**
+ * Whether the operation changes the dataset.
+ */
+ public static boolean isDataChange(WriteOperationType operation) {
+ return operation == WriteOperationType.INSERT
+ || operation == WriteOperationType.UPSERT
+ || operation == WriteOperationType.DELETE
+ || operation == WriteOperationType.BULK_INSERT
+ || operation == WriteOperationType.DELETE_PARTITION
+ || operation == WriteOperationType.INSERT_OVERWRITE
+ || operation == WriteOperationType.INSERT_OVERWRITE_TABLE
+ || operation == WriteOperationType.BOOTSTRAP;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 56862be803..f295b0019c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -614,8 +614,8 @@ public class HoodieTableConfig extends HoodieConfig {
return getBooleanOrDefault(CDC_ENABLED);
}
- public String cdcSupplementalLoggingMode() {
- return getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE);
+ public HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode() {
+ return
HoodieCDCSupplementalLoggingMode.parse(getStringOrDefault(CDC_SUPPLEMENTAL_LOGGING_MODE));
}
public String getKeyGeneratorClassName() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 8cae90d59a..a9da1be77a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -28,12 +28,14 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -45,6 +47,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
@@ -60,17 +63,16 @@ import static
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELE
import static
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
-import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
/**
* This class helps to extract all the information which will be used when CDC
query.
*
* There are some steps:
- * 1. filter out the completed commit instants, and get the related
[[HoodieCommitMetadata]] objects.
- * 2. initialize the [[HoodieTableFileSystemView]] by the touched data files.
+ * 1. filter out the completed commit instants, and get the related {@link
HoodieCommitMetadata} objects.
+ * 2. initialize the {@link HoodieTableFileSystemView} by the touched data
files.
* 3. extract the cdc information:
- * generate a [[CDCFileSplit]] object for each of the instant in
(startInstant, endInstant)
+ * generate a {@link HoodieCDCFileSplit} object for each of the instant in
(startInstant, endInstant)
* and each of the file group which is touched in the range of instants.
*/
public class HoodieCDCExtractor {
@@ -83,9 +85,7 @@ public class HoodieCDCExtractor {
private final HoodieCDCSupplementalLoggingMode supplementalLoggingMode;
- private final String startInstant;
-
- private final String endInstant;
+ private final InstantRange instantRange;
private Map<HoodieInstant, HoodieCommitMetadata> commits;
@@ -93,33 +93,27 @@ public class HoodieCDCExtractor {
public HoodieCDCExtractor(
HoodieTableMetaClient metaClient,
- String startInstant,
- String endInstant) {
+ InstantRange range) {
this.metaClient = metaClient;
this.basePath = metaClient.getBasePathV2();
this.fs = metaClient.getFs().getFileSystem();
- this.supplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
- metaClient.getTableConfig().cdcSupplementalLoggingMode());
- this.startInstant = startInstant;
- this.endInstant = endInstant;
+ this.supplementalLoggingMode =
metaClient.getTableConfig().cdcSupplementalLoggingMode();
+ this.instantRange = range;
init();
}
private void init() {
initInstantAndCommitMetadatas();
- initFSView();
}
/**
* At the granularity of a file group, trace the mapping between
* each commit/instant and changes to this file group.
*/
- public Map<HoodieFileGroupId, List<Pair<HoodieInstant, HoodieCDCFileSplit>>>
extractCDCFileSplits() {
- if (commits == null || fsView == null) {
- throw new HoodieException("Fail to init CDCExtractor");
- }
+ public Map<HoodieFileGroupId, List<HoodieCDCFileSplit>>
extractCDCFileSplits() {
+ ValidationUtils.checkState(commits != null, "Empty commits");
- Map<HoodieFileGroupId, List<Pair<HoodieInstant, HoodieCDCFileSplit>>>
fgToCommitChanges = new HashMap<>();
+ Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fgToCommitChanges = new
HashMap<>();
for (HoodieInstant instant : commits.keySet()) {
HoodieCommitMetadata commitMetadata = commits.get(instant);
@@ -134,7 +128,7 @@ public class HoodieCDCExtractor {
HoodieCDCFileSplit changeFile =
parseWriteStat(fileGroupId, instant, writeStat,
commitMetadata.getOperationType());
fgToCommitChanges.computeIfAbsent(fileGroupId, k -> new
ArrayList<>());
- fgToCommitChanges.get(fileGroupId).add(Pair.of(instant, changeFile));
+ fgToCommitChanges.get(fileGroupId).add(changeFile);
});
}
@@ -144,15 +138,15 @@ public class HoodieCDCExtractor {
for (String partition : ptToReplacedFileId.keySet()) {
List<String> fileIds = ptToReplacedFileId.get(partition);
fileIds.forEach(fileId -> {
- Option<FileSlice> latestFileSliceOpt =
fsView.fetchLatestFileSlice(partition, fileId);
+ Option<FileSlice> latestFileSliceOpt =
getOrCreateFsView().fetchLatestFileSlice(partition, fileId);
if (latestFileSliceOpt.isPresent()) {
HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition,
fileId);
- HoodieCDCFileSplit changeFile = new HoodieCDCFileSplit(
+ HoodieCDCFileSplit changeFile = new
HoodieCDCFileSplit(instant.getTimestamp(),
REPLACE_COMMIT, null, latestFileSliceOpt, Option.empty());
if (!fgToCommitChanges.containsKey(fileGroupId)) {
fgToCommitChanges.put(fileGroupId, new ArrayList<>());
}
- fgToCommitChanges.get(fileGroupId).add(Pair.of(instant,
changeFile));
+ fgToCommitChanges.get(fileGroupId).add(changeFile);
}
});
}
@@ -161,11 +155,23 @@ public class HoodieCDCExtractor {
return fgToCommitChanges;
}
+ /**
+ * Returns the fs view directly or creates a new one.
+ *
+ * <p>There is no need to initialize the fs view when supplemental logging
mode is: WITH_BEFORE_AFTER.
+ */
+ private HoodieTableFileSystemView getOrCreateFsView() {
+ if (this.fsView == null) {
+ this.fsView = initFSView();
+ }
+ return this.fsView;
+ }
+
/**
* Parse the commit metadata between (startInstant, endInstant], and extract
the touched partitions
* and files to build the filesystem view.
*/
- private void initFSView() {
+ private HoodieTableFileSystemView initFSView() {
Set<String> touchedPartitions = new HashSet<>();
for (Map.Entry<HoodieInstant, HoodieCommitMetadata> entry :
commits.entrySet()) {
HoodieCommitMetadata commitMetadata = entry.getValue();
@@ -182,7 +188,7 @@ public class HoodieCDCExtractor {
Path partitionPath = FSUtils.getPartitionPath(basePath,
touchedPartition);
touchedFiles.addAll(Arrays.asList(fs.listStatus(partitionPath)));
}
- this.fsView = new HoodieTableFileSystemView(
+ return new HoodieTableFileSystemView(
metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(),
touchedFiles.toArray(new FileStatus[0])
@@ -192,7 +198,6 @@ public class HoodieCDCExtractor {
}
}
-
/**
* Extract the required instants from all the instants between
(startInstant, endInstant].
*
@@ -206,12 +211,12 @@ public class HoodieCDCExtractor {
*/
private void initInstantAndCommitMetadatas() {
try {
- List<String> requiredActions = Arrays.asList(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+ Set<String> requiredActions = new HashSet<>(Arrays.asList(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION));
HoodieActiveTimeline activeTimeLine = metaClient.getActiveTimeline();
- Map<HoodieInstant, HoodieCommitMetadata> result =
activeTimeLine.getInstants()
+ this.commits = activeTimeLine.getInstants()
.filter(instant ->
instant.isCompleted()
- && isInRange(instant.getTimestamp(), startInstant,
endInstant)
+ && instantRange.isInRange(instant.getTimestamp())
&&
requiredActions.contains(instant.getAction().toLowerCase(Locale.ROOT))
).map(instant -> {
HoodieCommitMetadata commitMetadata;
@@ -228,25 +233,13 @@ public class HoodieCDCExtractor {
}
return Pair.of(instant, commitMetadata);
}).filter(pair ->
- maybeChangeData(pair.getRight().getOperationType())
+
WriteOperationType.isDataChange(pair.getRight().getOperationType())
).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
- this.commits = result;
} catch (Exception e) {
throw new HoodieIOException("Fail to get the commit metadata for CDC");
}
}
- private Boolean maybeChangeData(WriteOperationType operation) {
- return operation == WriteOperationType.INSERT
- || operation == WriteOperationType.UPSERT
- || operation == WriteOperationType.DELETE
- || operation == WriteOperationType.BULK_INSERT
- || operation == WriteOperationType.DELETE_PARTITION
- || operation == WriteOperationType.INSERT_OVERWRITE
- || operation == WriteOperationType.INSERT_OVERWRITE_TABLE
- || operation == WriteOperationType.BOOTSTRAP;
- }
-
/**
* Parse HoodieWriteStat, judge which type the file is, and what strategy
should be used to parse CDC data.
* Then build a [[HoodieCDCFileSplit]] object.
@@ -256,8 +249,9 @@ public class HoodieCDCExtractor {
HoodieInstant instant,
HoodieWriteStat writeStat,
WriteOperationType operation) {
- Path basePath = metaClient.getBasePathV2();
- FileSystem fs = metaClient.getFs().getFileSystem();
+ final Path basePath = metaClient.getBasePathV2();
+ final FileSystem fs = metaClient.getFs().getFileSystem();
+ final String instantTs = instant.getTimestamp();
HoodieCDCFileSplit cdcFileSplit;
if (StringUtils.isNullOrEmpty(writeStat.getCdcPath())) {
@@ -268,35 +262,35 @@ public class HoodieCDCExtractor {
if (operation == WriteOperationType.DELETE && writeStat.getNumWrites()
== 0L
&& writeStat.getNumDeletes() != 0) {
// This is a delete operation wherein all the records in this file
group are deleted
- // and no records have been writen out a new file.
+ // and no records have been written out a new file.
// So, we find the previous file that this operation delete from,
and treat each of
// records as a deleted one.
- HoodieBaseFile beforeBaseFile = fsView.getBaseFileOn(
+ HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
fileGroupId.getPartitionPath(), writeStat.getPrevCommit(),
fileGroupId.getFileId()
).orElseThrow(() ->
new HoodieIOException("Can not get the previous version of the
base file")
);
- FileSlice beforeFileSlice = new FileSlice(fileGroupId,
writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
- cdcFileSplit = new HoodieCDCFileSplit(BASE_FILE_DELETE, null,
Option.empty(), Option.of(beforeFileSlice));
+ FileSlice beforeFileSlice = new FileSlice(fileGroupId,
writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList());
+ cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE,
null, Option.empty(), Option.of(beforeFileSlice));
} else if (writeStat.getNumUpdateWrites() == 0L &&
writeStat.getNumDeletes() == 0
&& writeStat.getNumWrites() == writeStat.getNumInserts()) {
// all the records in this file are new.
- cdcFileSplit = new HoodieCDCFileSplit(BASE_FILE_INSERT, path);
+ cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_INSERT,
path);
} else {
throw new HoodieException("There should be a cdc log file.");
}
} else {
// this is a log file
Option<FileSlice> beforeFileSliceOpt =
getDependentFileSliceForLogFile(fileGroupId, instant, path);
- cdcFileSplit = new HoodieCDCFileSplit(LOG_FILE, path,
beforeFileSliceOpt, Option.empty());
+ cdcFileSplit = new HoodieCDCFileSplit(instantTs, LOG_FILE, path,
beforeFileSliceOpt, Option.empty());
}
} else {
// this is a cdc log
if
(supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
{
- cdcFileSplit = new HoodieCDCFileSplit(AS_IS, writeStat.getCdcPath());
+ cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS,
writeStat.getCdcPath());
} else {
try {
- HoodieBaseFile beforeBaseFile = fsView.getBaseFileOn(
+ HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
fileGroupId.getPartitionPath(), writeStat.getPrevCommit(),
fileGroupId.getFileId()
).orElseThrow(() ->
new HoodieIOException("Can not get the previous version of the
base file")
@@ -307,7 +301,7 @@ public class HoodieCDCExtractor {
if
(supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) {
beforeFileSlice = new FileSlice(fileGroupId,
writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
}
- cdcFileSplit = new HoodieCDCFileSplit(AS_IS, writeStat.getCdcPath(),
+ cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS,
writeStat.getCdcPath(),
Option.ofNullable(beforeFileSlice),
Option.ofNullable(currentFileSlice));
} catch (Exception e) {
throw new HoodieException("Fail to parse HoodieWriteStat", e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
index 43ad3bf176..ae082b1945 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
@@ -21,6 +21,8 @@ package org.apache.hudi.common.table.cdc;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.util.Option;
+import org.jetbrains.annotations.NotNull;
+
import java.io.Serializable;
/**
@@ -39,43 +41,53 @@ import java.io.Serializable;
* For `cdcInferCase` = {@link HoodieCDCInferCase#REPLACE_COMMIT}, `cdcFile`
is null,
* `beforeFileSlice` is the current version of the file slice.
*/
-public class HoodieCDCFileSplit implements Serializable {
+public class HoodieCDCFileSplit implements Serializable,
Comparable<HoodieCDCFileSplit> {
+ /**
+ * The instant time at which the changes happened.
+ */
+ private final String instant;
/**
- * * the change type, which decide to how to retrieve the change data. more
details see: `HoodieCDCLogicalFileType#`
+ * Flag that decides to how to retrieve the change data. More details see:
`HoodieCDCLogicalFileType`.
*/
private final HoodieCDCInferCase cdcInferCase;
/**
- * the file that the change data can be parsed from.
+ * The file that the change data can be parsed from.
*/
private final String cdcFile;
/**
- * the file slice that are required when retrieve the before data.
+ * THe file slice that are required when retrieving the before data.
*/
private final Option<FileSlice> beforeFileSlice;
/**
- * the file slice that are required when retrieve the after data.
+ * The file slice that are required when retrieving the after data.
*/
private final Option<FileSlice> afterFileSlice;
- public HoodieCDCFileSplit(HoodieCDCInferCase cdcInferCase, String cdcFile) {
- this(cdcInferCase, cdcFile, Option.empty(), Option.empty());
+ public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase,
String cdcFile) {
+ this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty());
}
public HoodieCDCFileSplit(
+ String instant,
HoodieCDCInferCase cdcInferCase,
String cdcFile,
Option<FileSlice> beforeFileSlice,
Option<FileSlice> afterFileSlice) {
+ this.instant = instant;
this.cdcInferCase = cdcInferCase;
this.cdcFile = cdcFile;
this.beforeFileSlice = beforeFileSlice;
this.afterFileSlice = afterFileSlice;
}
+ public String getInstant() {
+ return this.instant;
+ }
+
public HoodieCDCInferCase getCdcInferCase() {
return this.cdcInferCase;
}
@@ -91,4 +103,9 @@ public class HoodieCDCFileSplit implements Serializable {
public Option<FileSlice> getAfterFileSlice() {
return this.afterFileSlice;
}
+
+ @Override
+ public int compareTo(@NotNull HoodieCDCFileSplit o) {
+ return this.instant.compareTo(o.instant);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
index 35a232206f..13a51a4f07 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java
@@ -28,8 +28,8 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
*
* <ul>
* <li>OP_KEY: record keys, the reader needs to figure out the update before
image and after image;</li>
- * <li>OP_KEY: before images, the reader needs to figure out the update
after images;</li>
- * <li>OP_KEY: before and after images, the reader can generate the details
directly from the log.</li>
+ * <li>WITH_BEFORE: before images, the reader needs to figure out the update
after images;</li>
+ * <li>WITH_BEFORE_AFTER: before and after images, the reader can generate
the details directly from the log.</li>
* </ul>
*/
public enum HoodieCDCSupplementalLoggingMode {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index 18db9850a0..f194ddf8f4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.util.ClosableIterator;
@@ -26,6 +27,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,18 +35,22 @@ import java.io.IOException;
public class HoodieCDCLogRecordIterator implements
ClosableIterator<IndexedRecord> {
- private final HoodieLogFile cdcLogFile;
-
private final HoodieLogFormat.Reader reader;
private ClosableIterator<IndexedRecord> itr;
+ public HoodieCDCLogRecordIterator(
+ Configuration hadoopConf,
+ Path cdcLogPath,
+ Schema cdcSchema) throws IOException {
+ this(FSUtils.getFs(cdcLogPath, hadoopConf), cdcLogPath, cdcSchema);
+ }
+
public HoodieCDCLogRecordIterator(
FileSystem fs,
Path cdcLogPath,
Schema cdcSchema) throws IOException {
- this.cdcLogFile = new HoodieLogFile(fs.getFileStatus(cdcLogPath));
- this.reader = new HoodieLogFileReader(fs, cdcLogFile, cdcSchema,
+ this.reader = new HoodieLogFileReader(fs, new
HoodieLogFile(fs.getFileStatus(cdcLogPath)), cdcSchema,
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 8ea34d6f2f..16d3a7a3e4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -21,12 +21,14 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import java.util.List;
+import java.util.stream.Collectors;
/**
* A scanner used to scan hoodie unmerged log records.
@@ -96,7 +98,9 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
}
public Builder withLogFilePaths(List<String> logFilePaths) {
- this.logFilePaths = logFilePaths;
+ this.logFilePaths = logFilePaths.stream()
+ .filter(p -> !p.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList());
return this;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a9e10d3e55..27f19f4a66 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -122,6 +123,10 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("The default partition name in case the dynamic
partition"
+ " column value is null/empty string");
+ // ------------------------------------------------------------------------
+ // Changelog Capture Options
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<Boolean> CHANGELOG_ENABLED = ConfigOptions
.key("changelog.enabled")
.booleanType()
@@ -133,6 +138,23 @@ public class FlinkOptions extends HoodieConfig {
+ "The semantics is best effort because the compaction job would
finally merge all changes of a record into one.\n"
+ " default false to have UPSERT semantics");
+ public static final ConfigOption<Boolean> CDC_ENABLED = ConfigOptions
+ .key("cdc.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withFallbackKeys(HoodieTableConfig.CDC_ENABLED.key())
+ .withDescription("When enable, persist the change data if necessary, and
can be queried as a CDC query mode");
+
+ public static final ConfigOption<String> SUPPLEMENTAL_LOGGING_MODE =
ConfigOptions
+ .key("cdc.supplemental.logging.mode")
+ .stringType()
+ .defaultValue("cdc_data_before_after") // default record all the change
log images
+ .withFallbackKeys(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key())
+ .withDescription("The supplemental logging mode:"
+ + "1) 'cdc_op_key': persist the 'op' and the record key only,"
+ + "2) 'cdc_data_before': persist the additional 'before' image,"
+ + "3) 'cdc_data_before_after': persist the 'before' and 'after'
images at the same time");
+
// ------------------------------------------------------------------------
// Metadata table Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 00ebf09426..93044f6ca3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.format.FilePathUtils;
@@ -189,4 +190,12 @@ public class OptionsResolver {
return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
&&
!conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST);
}
+
+ /**
+ * Returns the supplemental logging mode.
+ */
+ public static HoodieCDCSupplementalLoggingMode
getCDCSupplementalLoggingMode(Configuration conf) {
+ String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE);
+ return HoodieCDCSupplementalLoggingMode.parse(mode);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 0be2a5300f..dc10970b05 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -21,8 +21,11 @@ package org.apache.hudi.source;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCExtractor;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -33,6 +36,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -52,6 +56,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -243,23 +248,7 @@ public class IncrementalInputSplits implements
Serializable {
final HoodieInstant instantToIssue = instants.size() == 0 ? null :
instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
- if (issuedInstant != null) {
- // the streaming reader may record the last issued instant, if the
issued instant is present,
- // the instant range should be: (issued instant, the latest instant].
- instantRange =
InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue.getTimestamp())
- .rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
- } else if
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
- // first time consume and has a start commit
- final String startCommit =
this.conf.getString(FlinkOptions.READ_START_COMMIT);
- instantRange =
startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
- ? null
- :
InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue.getTimestamp())
- .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
- } else {
- // first time consume and no start commit, consumes the latest
incremental data set.
- instantRange =
InstantRange.builder().startInstant(instantToIssue.getTimestamp()).endInstant(instantToIssue.getTimestamp())
- .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
- }
+ instantRange = getInstantRange(issuedInstant,
instantToIssue.getTimestamp(), false);
} else {
LOG.info("No new instant found for the table under path " + path + ",
skip reading");
return Result.EMPTY;
@@ -315,6 +304,97 @@ public class IncrementalInputSplits implements
Serializable {
return Result.instance(inputSplits, endInstant);
}
+ /**
+ * Returns the incremental cdc input splits.
+ *
+ * @param metaClient The meta client
+ * @param issuedInstant The last issued instant, only valid in streaming read
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public Result inputSplitsCDC(
+ HoodieTableMetaClient metaClient,
+ String issuedInstant) {
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline commitTimeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ if (commitTimeline.empty()) {
+ LOG.warn("No splits found for the table under path " + path);
+ return Result.EMPTY;
+ }
+ List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline,
issuedInstant);
+ // get the latest instant that satisfies condition
+ final HoodieInstant instantToIssue = instants.size() == 0 ? null :
instants.get(instants.size() - 1);
+ final InstantRange instantRange;
+ if (instantToIssue != null) {
+ instantRange = getInstantRange(issuedInstant,
instantToIssue.getTimestamp(), true);
+ } else {
+ LOG.info("No new instant found for the table under path " + path + ",
skip reading");
+ return Result.EMPTY;
+ }
+
+ Set<String> readPartitions;
+ final FileStatus[] fileStatuses;
+
+ if (instantRange == null) {
+ // reading from the earliest, scans the partitions and files directly.
+ FileIndex fileIndex = getFileIndex();
+ readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading in path: " + path);
+ return Result.EMPTY;
+ }
+ fileStatuses = fileIndex.getFilesInPartitions();
+
+ if (fileStatuses.length == 0) {
+ LOG.warn("No files found for reading in path: " + path);
+ return Result.EMPTY;
+ }
+
+ final String endInstant = instantToIssue.getTimestamp();
+ List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
+ fileStatuses, readPartitions, endInstant, null);
+
+ return Result.instance(inputSplits, endInstant);
+ } else {
+ HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient,
instantRange);
+ final String endInstant = instantToIssue.getTimestamp();
+ Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits =
extractor.extractCDCFileSplits();
+
+ if (fileSplits.isEmpty()) {
+ LOG.warn("No change logs found for reading in path: " + path);
+ return Result.EMPTY;
+ }
+
+ final AtomicInteger cnt = new AtomicInteger(0);
+ List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
+ .map(splits ->
+ new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(),
maxCompactionMemoryInBytes,
+ splits.getKey().getFileId(),
splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
+ .collect(Collectors.toList());
+ return Result.instance(inputSplits, endInstant);
+ }
+ }
+
+ @Nullable
+ private InstantRange getInstantRange(String issuedInstant, String
instantToIssue, boolean nullableBoundary) {
+ if (issuedInstant != null) {
+ // the streaming reader may record the last issued instant, if the
issued instant is present,
+ // the instant range should be: (issued instant, the latest instant].
+ return
InstantRange.builder().startInstant(issuedInstant).endInstant(instantToIssue)
+
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.OPEN_CLOSE).build();
+ } else if
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
+ // first time consume and has a start commit
+ final String startCommit =
this.conf.getString(FlinkOptions.READ_START_COMMIT);
+ return startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
+ ? null
+ :
InstantRange.builder().startInstant(startCommit).endInstant(instantToIssue)
+
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+ } else {
+ // first time consume and no start commit, consumes the latest
incremental data set.
+ return
InstantRange.builder().startInstant(instantToIssue).endInstant(instantToIssue)
+
.nullableBoundary(nullableBoundary).rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+ }
+ }
+
private List<MergeOnReadInputSplit> getInputSplits(
HoodieTableMetaClient metaClient,
HoodieTimeline commitTimeline,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 3318cecf10..1812262ab4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -81,6 +81,11 @@ public class StreamReadMonitoringFunction
*/
private final long interval;
+ /**
+ * Flag saying whether the change log capture is enabled.
+ */
+ private final boolean cdcEnabled;
+
private transient Object checkpointLock;
private volatile boolean isRunning = true;
@@ -106,6 +111,7 @@ public class StreamReadMonitoringFunction
this.conf = conf;
this.path = path;
this.interval =
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
+ this.cdcEnabled = conf.getBoolean(FlinkOptions.CDC_ENABLED);
this.incrementalInputSplits = IncrementalInputSplits.builder()
.conf(conf)
.path(path)
@@ -195,8 +201,9 @@ public class StreamReadMonitoringFunction
// table does not exist
return;
}
- IncrementalInputSplits.Result result =
- incrementalInputSplits.inputSplits(metaClient, this.hadoopConf,
this.issuedInstant);
+ IncrementalInputSplits.Result result = cdcEnabled
+ ? incrementalInputSplits.inputSplitsCDC(metaClient, this.issuedInstant)
+ : incrementalInputSplits.inputSplits(metaClient, this.hadoopConf,
this.issuedInstant);
if (result.isEmpty()) {
// no new instants, returns early
return;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6fac5e4b88..4533ef7179 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -42,6 +42,7 @@ import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -393,8 +394,12 @@ public class HoodieTableSource implements
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
final HoodieTableType tableType =
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
- return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
- rowDataType, Collections.emptyList(), emitDelete);
+ if (this.conf.getBoolean(FlinkOptions.CDC_ENABLED)) {
+ return cdcInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, Collections.emptyList());
+ } else {
+ return mergeOnReadInputFormat(rowType, requiredRowType,
tableAvroSchema,
+ rowDataType, Collections.emptyList(), emitDelete);
+ }
}
String errMsg = String.format("Invalid query type : '%s', options ['%s']
are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT);
@@ -410,6 +415,31 @@ public class HoodieTableSource implements
return instantAndCommitMetadata.isPresent();
}
+ private MergeOnReadInputFormat cdcInputFormat(
+ RowType rowType,
+ RowType requiredRowType,
+ Schema tableAvroSchema,
+ DataType rowDataType,
+ List<MergeOnReadInputSplit> inputSplits) {
+ final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+ rowType,
+ requiredRowType,
+ tableAvroSchema.toString(),
+ AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+ inputSplits,
+ conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+ return CdcInputFormat.builder()
+ .config(this.conf)
+ .tableState(hoodieTableState)
+ // use the explicit fields' data type because the AvroSchemaConverter
+ // is not very stable.
+ .fieldTypes(rowDataType.getChildren())
+ .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+ .limit(this.limit)
+ .emitDelete(false) // the change logs iterator can handle the DELETE
records
+ .build();
+ }
+
private MergeOnReadInputFormat mergeOnReadInputFormat(
RowType rowType,
RowType requiredRowType,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 8adbde355c..f44ec67e5e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -26,11 +26,13 @@ import
org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -44,6 +46,7 @@ import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -118,6 +121,23 @@ public class FormatUtils {
return pos == -1 ? null : record.get(pos);
}
+ public static ExternalSpillableMap<String, byte[]> spillableMap(
+ HoodieWriteConfig writeConfig,
+ long maxCompactionMemoryInBytes) {
+ try {
+ return new ExternalSpillableMap<>(
+ maxCompactionMemoryInBytes,
+ writeConfig.getSpillableMapBasePath(),
+ new DefaultSizeEstimator<>(),
+ new DefaultSizeEstimator<>(),
+ writeConfig.getCommonConfig().getSpillableDiskMapType(),
+ writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
+ } catch (IOException e) {
+ throw new HoodieIOException(
+ "IOException when creating ExternalSpillableMap at " +
writeConfig.getSpillableMapBasePath(), e);
+ }
+ }
+
public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
new file mode 100644
index 0000000000..41b05c9f1e
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.format.FormatUtils;
+import
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.table.format.mor.MergeOnReadTableState;
+import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.RowDataProjection;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.hadoop.fs.Path;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
+
+/**
+ * The base InputFormat class to read Hoodie data set as change logs.
+ */
+public class CdcInputFormat extends MergeOnReadInputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private CdcInputFormat(
+ Configuration conf,
+ MergeOnReadTableState tableState,
+ List<DataType> fieldTypes,
+ String defaultPartName,
+ long limit,
+ boolean emitDelete) {
+ super(conf, tableState, fieldTypes, defaultPartName, limit, emitDelete);
+ }
+
+ @Override
+ protected RecordIterator initIterator(MergeOnReadInputSplit split) throws
IOException {
+ if (split instanceof CdcInputSplit) {
+ HoodieCDCSupplementalLoggingMode mode =
OptionsResolver.getCDCSupplementalLoggingMode(conf);
+ ImageManager manager = new ImageManager(conf, tableState.getRowType(),
this::getFileSliceIterator);
+ Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc =
+ cdcFileSplit -> getRecordIteratorV2(split.getTablePath(),
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
+ return new CdcFileSplitsIterator((CdcInputSplit) split, manager,
recordIteratorFunc);
+ } else {
+ return super.initIterator(split);
+ }
+ }
+
+ /**
+ * Returns the builder for {@link MergeOnReadInputFormat}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ private RecordIterator getFileSliceIterator(MergeOnReadInputSplit split) {
+ if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
+ // base file only
+ return new
BaseFileOnlyIterator(getFullSchemaReader(split.getBasePath().get()));
+ } else if (!split.getBasePath().isPresent()) {
+ // log files only
+ return new LogFileOnlyIterator(getFullLogFileIterator(split));
+ } else {
+ Schema tableSchema = new
Schema.Parser().parse(this.tableState.getAvroSchema());
+ return new MergeIterator(
+ conf,
+ hadoopConf,
+ split,
+ this.tableState.getRowType(),
+ this.tableState.getRowType(),
+ tableSchema,
+ Option.empty(),
+ Option.empty(),
+ false,
+ this.tableState.getOperationPos(),
+ getFullSchemaReader(split.getBasePath().get()));
+ }
+ }
+
+ private RecordIterator getRecordIteratorV2(
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ HoodieCDCSupplementalLoggingMode mode,
+ ImageManager imageManager) {
+ try {
+ return getRecordIterator(tablePath, maxCompactionMemoryInBytes,
fileSplit, mode, imageManager);
+ } catch (IOException e) {
+ throw new HoodieException("Get record iterator error", e);
+ }
+ }
+
+ private RecordIterator getRecordIterator(
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ HoodieCDCSupplementalLoggingMode mode,
+ ImageManager imageManager) throws IOException {
+ switch (fileSplit.getCdcInferCase()) {
+ case BASE_FILE_INSERT:
+ ValidationUtils.checkState(fileSplit.getCdcFile() != null,
+ "CDC file path should exist");
+ String path = new Path(tablePath, fileSplit.getCdcFile()).toString();
+ return new AddBaseFileIterator(getRequiredSchemaReader(path));
+ case BASE_FILE_DELETE:
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice should exist");
+ FileSlice fileSlice = fileSplit.getBeforeFileSlice().get();
+ MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath,
fileSlice, maxCompactionMemoryInBytes);
+ return new RemoveBaseFileIterator(tableState,
getFileSliceIterator(inputSplit));
+ case AS_IS:
+ Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new
Schema.Parser().parse(tableState.getAvroSchema()));
+ Schema cdcSchema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
+ switch (mode) {
+ case WITH_BEFORE_AFTER:
+ return new BeforeAfterImageIterator(tablePath, tableState,
hadoopConf, cdcSchema, fileSplit);
+ case WITH_BEFORE:
+ return new BeforeImageIterator(conf, hadoopConf, tablePath,
tableState, cdcSchema, fileSplit, imageManager);
+ case OP_KEY:
+ return new RecordKeyImageIterator(conf, hadoopConf, tablePath,
tableState, cdcSchema, fileSplit, imageManager);
+ default:
+ throw new AssertionError("Unexpected mode" + mode);
+ }
+ case REPLACE_COMMIT:
+ return new ReplaceCommitIterator(conf, tablePath, tableState,
fileSplit, this::getFileSliceIterator);
+ default:
+ throw new AssertionError("Unexpected cdc file split infer case: " +
fileSplit.getCdcInferCase());
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+ static class CdcFileSplitsIterator implements RecordIterator {
+ private ImageManager imageManager; // keep a reference to release resource
+ private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
+ private final Function<HoodieCDCFileSplit, RecordIterator>
recordIteratorFunc;
+ private RecordIterator recordIterator;
+
+ CdcFileSplitsIterator(
+ CdcInputSplit inputSplit,
+ ImageManager imageManager,
+ Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc) {
+ this.fileSplitIterator =
Arrays.asList(inputSplit.getChanges()).iterator();
+ this.imageManager = imageManager;
+ this.recordIteratorFunc = recordIteratorFunc;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (recordIterator != null) {
+ if (!recordIterator.reachedEnd()) {
+ return false;
+ } else {
+ recordIterator.close(); // release resource
+ recordIterator = null;
+ }
+ }
+ if (fileSplitIterator.hasNext()) {
+ HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
+ recordIterator = recordIteratorFunc.apply(fileSplit);
+ return recordIterator.reachedEnd();
+ }
+ return true;
+ }
+
+ @Override
+ public RowData nextRecord() {
+ return recordIterator.nextRecord();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (recordIterator != null) {
+ recordIterator.close();
+ }
+ if (imageManager != null) {
+ imageManager.close();
+ imageManager = null;
+ }
+ }
+ }
+
+ static class AddBaseFileIterator implements RecordIterator {
+ // base file reader
+ private ParquetColumnarRowSplitReader reader;
+
+ private RowData currentRecord;
+
+ AddBaseFileIterator(ParquetColumnarRowSplitReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (!this.reader.reachedEnd()) {
+ currentRecord = this.reader.nextRecord();
+ currentRecord.setRowKind(RowKind.INSERT);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public RowData nextRecord() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ this.reader = null;
+ }
+ }
+ }
+
+ static class RemoveBaseFileIterator implements RecordIterator {
+ private RecordIterator nested;
+ private final RowDataProjection projection;
+
+ RemoveBaseFileIterator(MergeOnReadTableState tableState, RecordIterator
iterator) {
+ this.nested = iterator;
+ this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return nested.reachedEnd();
+ }
+
+ @Override
+ public RowData nextRecord() {
+ RowData row = nested.nextRecord();
+ row.setRowKind(RowKind.DELETE);
+ return this.projection.project(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.nested != null) {
+ this.nested.close();
+ this.nested = null;
+ }
+ }
+ }
+
+ abstract static class BaseImageIterator implements RecordIterator {
+ private final Schema requiredSchema;
+ private final int[] requiredPos;
+ private final GenericRecordBuilder recordBuilder;
+ private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+
+ // the changelog records iterator
+ private HoodieCDCLogRecordIterator cdcItr;
+
+ private GenericRecord cdcRecord;
+
+ private RowData sideImage;
+
+ private RowData currentImage;
+
+ BaseImageIterator(
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ MergeOnReadTableState tableState,
+ Schema cdcSchema,
+ HoodieCDCFileSplit fileSplit) throws IOException {
+ this.requiredSchema = new
Schema.Parser().parse(tableState.getRequiredAvroSchema());
+ this.requiredPos = getRequiredPos(tableState.getAvroSchema(),
this.requiredSchema);
+ this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+ Path cdcFilePath = new Path(tablePath, fileSplit.getCdcFile());
+ this.cdcItr = new HoodieCDCLogRecordIterator(hadoopConf, cdcFilePath,
cdcSchema);
+ }
+
+ private int[] getRequiredPos(String tableSchema, Schema required) {
+ Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new
Schema.Parser().parse(tableSchema));
+ List<String> fields =
dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ return required.getFields().stream()
+ .map(f -> fields.indexOf(f.name()))
+ .mapToInt(i -> i)
+ .toArray();
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ if (this.sideImage != null) {
+ currentImage = this.sideImage;
+ this.sideImage = null;
+ return false;
+ } else if (this.cdcItr.hasNext()) {
+ cdcRecord = (GenericRecord) this.cdcItr.next();
+ String op = String.valueOf(cdcRecord.get(0));
+ resolveImage(op);
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord
cdcRecord);
+
+ protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord
cdcRecord);
+
+ @Override
+ public RowData nextRecord() {
+ return currentImage;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.cdcItr != null) {
+ this.cdcItr.close();
+ this.cdcItr = null;
+ }
+ }
+
+ private void resolveImage(String op) {
+ switch (op) {
+ case "i":
+ currentImage = getAfterImage(RowKind.INSERT, cdcRecord);
+ break;
+ case "u":
+ currentImage = getBeforeImage(RowKind.UPDATE_BEFORE, cdcRecord);
+ sideImage = getAfterImage(RowKind.UPDATE_AFTER, cdcRecord);
+ break;
+ case "d":
+ currentImage = getBeforeImage(RowKind.DELETE, cdcRecord);
+ break;
+ default:
+ throw new AssertionError("Unexpected");
+ }
+ }
+
+ protected RowData resolveAvro(RowKind rowKind, GenericRecord avroRecord) {
+ GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+ avroRecord,
+ requiredSchema,
+ requiredPos,
+ recordBuilder);
+ RowData resolved = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
+ resolved.setRowKind(rowKind);
+ return resolved;
+ }
+ }
+
+ // op, ts, before_image, after_image
+ static class BeforeAfterImageIterator extends BaseImageIterator {
+ BeforeAfterImageIterator(
+ String tablePath,
+ MergeOnReadTableState tableState,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ Schema cdcSchema,
+ HoodieCDCFileSplit fileSplit) throws IOException {
+ super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(3));
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ // op, key, before_image
+ static class BeforeImageIterator extends BaseImageIterator {
+ protected ExternalSpillableMap<String, byte[]> afterImages;
+
+ protected final long maxCompactionMemoryInBytes;
+
+ protected final RowDataProjection projection;
+ protected final ImageManager imageManager;
+
+ BeforeImageIterator(
+ Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ MergeOnReadTableState tableState,
+ Schema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ ImageManager imageManager) throws IOException {
+ super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
+ this.maxCompactionMemoryInBytes =
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf);
+ this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
+ this.imageManager = imageManager;
+ initImages(fileSplit);
+ }
+
+ protected void initImages(
+ HoodieCDCFileSplit fileSplit) throws IOException {
+ ValidationUtils.checkState(fileSplit.getAfterFileSlice().isPresent(),
+ "Current file slice does not exist for instant: " +
fileSplit.getInstant());
+ this.afterImages = this.imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getAfterFileSlice().get());
+ }
+
+ @Override
+ protected RowData getAfterImage(RowKind rowKind, GenericRecord cdcRecord) {
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = imageManager.getImageRecord(recordKey, this.afterImages,
rowKind);
+ row.setRowKind(rowKind);
+ return this.projection.project(row);
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ return resolveAvro(rowKind, (GenericRecord) cdcRecord.get(2));
+ }
+ }
+
+ // op, key
+ static class RecordKeyImageIterator extends BeforeImageIterator {
+ protected ExternalSpillableMap<String, byte[]> beforeImages;
+
+ RecordKeyImageIterator(
+ Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String tablePath,
+ MergeOnReadTableState tableState,
+ Schema cdcSchema,
+ HoodieCDCFileSplit fileSplit,
+ ImageManager imageManager) throws IOException {
+ super(flinkConf, hadoopConf, tablePath, tableState, cdcSchema,
fileSplit, imageManager);
+ }
+
+ protected void initImages(HoodieCDCFileSplit fileSplit) throws IOException
{
+ // init after images
+ super.initImages(fileSplit);
+ // init before images
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " +
fileSplit.getInstant());
+ this.beforeImages = this.imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ }
+
+ @Override
+ protected RowData getBeforeImage(RowKind rowKind, GenericRecord cdcRecord)
{
+ String recordKey = cdcRecord.get(1).toString();
+ RowData row = this.imageManager.getImageRecord(recordKey,
this.beforeImages, rowKind);
+ row.setRowKind(rowKind);
+ return this.projection.project(row);
+ }
+ }
+
+ static class ReplaceCommitIterator implements RecordIterator {
+ private final RecordIterator itr;
+ private final RowDataProjection projection;
+
+ ReplaceCommitIterator(
+ Configuration flinkConf,
+ String tablePath,
+ MergeOnReadTableState tableState,
+ HoodieCDCFileSplit fileSplit,
+ Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+ this.itr = initIterator(tablePath,
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit,
splitIteratorFunc);
+ this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
+ }
+
+ private RecordIterator initIterator(
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ HoodieCDCFileSplit fileSplit,
+ Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+ // init before images
+
+ // the before file slice must exist,
+ // see HoodieCDCExtractor#extractCDCFileSplits for details
+ ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
+ "Before file slice does not exist for instant: " +
fileSplit.getInstant());
+ MergeOnReadInputSplit inputSplit = CdcInputFormat.fileSlice2Split(
+ tablePath, fileSplit.getBeforeFileSlice().get(),
maxCompactionMemoryInBytes);
+ return splitIteratorFunc.apply(inputSplit);
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return this.itr.reachedEnd();
+ }
+
+ @Override
+ public RowData nextRecord() {
+ RowData row = this.itr.nextRecord();
+ row.setRowKind(RowKind.DELETE);
+ return this.projection.project(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.itr.close();
+ }
+ }
+
+ public static final class BytesArrayInputView extends DataInputStream
implements DataInputView {
+ public BytesArrayInputView(byte[] data) {
+ super(new ByteArrayInputStream(data));
+ }
+
+ public void skipBytesToRead(int numBytes) throws IOException {
+ while (numBytes > 0) {
+ int skipped = this.skipBytes(numBytes);
+ numBytes -= skipped;
+ }
+ }
+ }
+
+ public static final class BytesArrayOutputView extends DataOutputStream
implements DataOutputView {
+ public BytesArrayOutputView(ByteArrayOutputStream baos) {
+ super(baos);
+ }
+
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; ++i) {
+ this.write(0);
+ }
+ }
+
+ public void write(DataInputView source, int numBytes) throws IOException {
+ byte[] buffer = new byte[numBytes];
+ source.readFully(buffer);
+ this.write(buffer);
+ }
+ }
+
+ /**
+ * A before/after image manager
+ * that caches the image records by versions(file slices).
+ */
+ private static class ImageManager implements AutoCloseable {
+ private final HoodieWriteConfig writeConfig;
+
+ private final RowDataSerializer serializer;
+ private final Function<MergeOnReadInputSplit, RecordIterator>
splitIteratorFunc;
+
+ private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+ public ImageManager(
+ Configuration flinkConf,
+ RowType rowType,
+ Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+ this.serializer = new RowDataSerializer(rowType);
+ this.splitIteratorFunc = splitIteratorFunc;
+ this.cache = new TreeMap<>();
+ this.writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf);
+ }
+
+ public ExternalSpillableMap<String, byte[]> getOrLoadImages(
+ long maxCompactionMemoryInBytes,
+ FileSlice fileSlice) throws IOException {
+ final String instant = fileSlice.getBaseInstantTime();
+ if (this.cache.containsKey(instant)) {
+ return cache.get(instant);
+ }
+ // clean the earliest file slice first
+ if (this.cache.size() > 1) {
+ // keep at most 2 versions: before & after
+ String instantToClean = this.cache.keySet().iterator().next();
+ this.cache.remove(instantToClean).close();
+ }
+ ExternalSpillableMap<String, byte[]> images =
loadImageRecords(maxCompactionMemoryInBytes, fileSlice);
+ this.cache.put(instant, images);
+ return images;
+ }
+
+ private ExternalSpillableMap<String, byte[]> loadImageRecords(
+ long maxCompactionMemoryInBytes,
+ FileSlice fileSlice) throws IOException {
+ MergeOnReadInputSplit inputSplit =
CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice,
maxCompactionMemoryInBytes);
+ RecordIterator itr = splitIteratorFunc.apply(inputSplit);
+ // initialize the image records map
+ ExternalSpillableMap<String, byte[]> imageRecordsMap =
+ FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes);
+ while (!itr.reachedEnd()) {
+ RowData row = itr.nextRecord();
+ String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ imageRecordsMap.put(recordKey, baos.toByteArray());
+ }
+ itr.close(); // release resource
+ return imageRecordsMap;
+ }
+
+ public RowData getImageRecord(
+ String recordKey,
+ ExternalSpillableMap<String, byte[]> cache,
+ RowKind rowKind) {
+ byte[] bytes = cache.get(recordKey);
+ ValidationUtils.checkState(bytes != null,
+ "Key " + recordKey + " does not exist in current file group");
+ try {
+ RowData row = serializer.deserialize(new BytesArrayInputView(bytes));
+ row.setRowKind(rowKind);
+ return row;
+ } catch (IOException e) {
+ throw new HoodieException("Deserialize bytes into row data exception",
e);
+ }
+ }
+
+ @Override
+ public void close() {
+ this.cache.values().forEach(ExternalSpillableMap::close);
+ this.cache.clear();
+ }
+ }
+
+ /**
+ * Builder for {@link CdcInputFormat}.
+ */
+ public static class Builder extends MergeOnReadInputFormat.Builder {
+
+ public Builder config(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder tableState(MergeOnReadTableState tableState) {
+ this.tableState = tableState;
+ return this;
+ }
+
+ public Builder fieldTypes(List<DataType> fieldTypes) {
+ this.fieldTypes = fieldTypes;
+ return this;
+ }
+
+ public Builder defaultPartName(String defaultPartName) {
+ this.defaultPartName = defaultPartName;
+ return this;
+ }
+
+ public Builder limit(long limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ public Builder emitDelete(boolean emitDelete) {
+ this.emitDelete = emitDelete;
+ return this;
+ }
+
+ public CdcInputFormat build() {
+ return new CdcInputFormat(conf, tableState, fieldTypes,
+ defaultPartName, limit, emitDelete);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+ public static MergeOnReadInputSplit fileSlice2Split(
+ String tablePath,
+ FileSlice fileSlice,
+ long maxCompactionMemoryInBytes) {
+ Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ // filter out the cdc logs
+ .filter(path -> !path.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList()));
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ return new MergeOnReadInputSplit(0, basePath, logPaths,
+ fileSlice.getBaseInstantTime(), tablePath, maxCompactionMemoryInBytes,
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
new file mode 100644
index 0000000000..914d8518c7
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import java.util.Arrays;
+
+/**
+ * Represents an input split of source, actually a data bucket.
+ */
+public class CdcInputSplit extends MergeOnReadInputSplit {
+ private static final long serialVersionUID = 1L;
+
+ private HoodieCDCFileSplit[] changes;
+
+ public CdcInputSplit(
+ int splitNum,
+ String tablePath,
+ long maxCompactionMemoryInBytes,
+ String fileId,
+ HoodieCDCFileSplit[] changes) {
+ super(splitNum, null, Option.empty(), "", tablePath,
+ maxCompactionMemoryInBytes, "", null, fileId);
+ this.changes = changes;
+ }
+
+ public HoodieCDCFileSplit[] getChanges() {
+ return changes;
+ }
+
+ public void setChanges(HoodieCDCFileSplit[] changes) {
+ this.changes = changes;
+ }
+
+ @Override
+ public String toString() {
+ return "CdcInputSplit{"
+ + "changes=" + Arrays.toString(changes)
+ + ", fileId='" + fileId + '\''
+ + '}';
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index c9b6561bde..e2e8079a47 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -66,6 +66,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.IntStream;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
@@ -83,11 +84,11 @@ public class MergeOnReadInputFormat
private static final long serialVersionUID = 1L;
- private final Configuration conf;
+ protected final Configuration conf;
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
+ protected transient org.apache.hadoop.conf.Configuration hadoopConf;
- private final MergeOnReadTableState tableState;
+ protected final MergeOnReadTableState tableState;
/**
* Uniform iterator view for the underneath records.
@@ -137,7 +138,7 @@ public class MergeOnReadInputFormat
*/
private boolean closed = true;
- private MergeOnReadInputFormat(
+ protected MergeOnReadInputFormat(
Configuration conf,
MergeOnReadTableState tableState,
List<DataType> fieldTypes,
@@ -168,30 +169,35 @@ public class MergeOnReadInputFormat
this.currentReadCount = 0L;
this.closed = false;
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
+ this.iterator = initIterator(split);
+ mayShiftInputSplit(split);
+ }
+
+ protected RecordIterator initIterator(MergeOnReadInputSplit split) throws
IOException {
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
if (split.getInstantRange() != null) {
// base file only with commit time filtering
- this.iterator = new BaseFileOnlyFilteringIterator(
+ return new BaseFileOnlyFilteringIterator(
split.getInstantRange(),
this.tableState.getRequiredRowType(),
getReader(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
} else {
// base file only
- this.iterator = new
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+ return new
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
}
} else if (!split.getBasePath().isPresent()) {
// log files only
if (OptionsResolver.emitChangelog(conf)) {
- this.iterator = new
LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+ return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
} else {
- this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
+ return new LogFileOnlyIterator(getLogFileIterator(split));
}
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
- this.iterator = new SkipMergeIterator(
+ return new SkipMergeIterator(
getRequiredSchemaReader(split.getBasePath().get()),
getLogFileIterator(split));
} else if
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
- this.iterator = new MergeIterator(
+ return new MergeIterator(
conf,
hadoopConf,
split,
@@ -211,7 +217,6 @@ public class MergeOnReadInputFormat
+ "spark partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
}
- mayShiftInputSplit(split);
}
@Override
@@ -284,11 +289,15 @@ public class MergeOnReadInputFormat
}
}
- private ParquetColumnarRowSplitReader getFullSchemaReader(String path)
throws IOException {
- return getReader(path, IntStream.range(0,
this.tableState.getRowType().getFieldCount()).toArray());
+ protected ParquetColumnarRowSplitReader getFullSchemaReader(String path) {
+ try {
+ return getReader(path, IntStream.range(0,
this.tableState.getRowType().getFieldCount()).toArray());
+ } catch (IOException e) {
+ throw new HoodieException("Get reader error for path: " + path);
+ }
}
- private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path)
throws IOException {
+ protected ParquetColumnarRowSplitReader getRequiredSchemaReader(String path)
throws IOException {
return getReader(path, this.requiredPos);
}
@@ -457,10 +466,63 @@ public class MergeOnReadInputFormat
};
}
+ protected ClosableIterator<RowData>
getFullLogFileIterator(MergeOnReadInputSplit split) {
+ final Schema tableSchema = new
Schema.Parser().parse(tableState.getAvroSchema());
+ final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter =
+ AvroToRowDataConverters.createRowConverter(tableState.getRowType());
+ final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split,
tableSchema, conf, hadoopConf);
+ final Iterator<String> logRecordsKeyIterator =
scanner.getRecords().keySet().iterator();
+
+ return new ClosableIterator<RowData>() {
+ private RowData currentRecord;
+
+ @Override
+ public boolean hasNext() {
+ while (logRecordsKeyIterator.hasNext()) {
+ String curAvroKey = logRecordsKeyIterator.next();
+ Option<IndexedRecord> curAvroRecord = null;
+ final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord)
scanner.getRecords().get(curAvroKey);
+ try {
+ curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
+ } catch (IOException e) {
+ throw new HoodieException("Get avro insert value error for key: "
+ curAvroKey, e);
+ }
+ if (curAvroRecord.isPresent()) {
+ final IndexedRecord avroRecord = curAvroRecord.get();
+ final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord,
tableState.getOperationPos());
+ if (rowKind == RowKind.DELETE) {
+ // skip the delete record
+ continue;
+ }
+ currentRecord = (RowData)
avroToRowDataConverter.convert(avroRecord);
+ currentRecord.setRowKind(rowKind);
+ return true;
+ }
+ // else:
+ // delete record found
+ // skipping if the condition is unsatisfied
+ // continue;
+
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return currentRecord;
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
+ };
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
- private interface RecordIterator {
+ protected interface RecordIterator {
boolean reachedEnd() throws IOException;
RowData nextRecord();
@@ -468,11 +530,11 @@ public class MergeOnReadInputFormat
void close() throws IOException;
}
- static class BaseFileOnlyIterator implements RecordIterator {
+ protected static class BaseFileOnlyIterator implements RecordIterator {
// base file reader
private final ParquetColumnarRowSplitReader reader;
- BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
+ public BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
this.reader = reader;
}
@@ -545,11 +607,11 @@ public class MergeOnReadInputFormat
}
}
- static class LogFileOnlyIterator implements RecordIterator {
+ protected static class LogFileOnlyIterator implements RecordIterator {
// iterator for log files
private final ClosableIterator<RowData> iterator;
- LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
+ public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
this.iterator = iterator;
}
@@ -619,7 +681,7 @@ public class MergeOnReadInputFormat
}
}
- static class MergeIterator implements RecordIterator {
+ protected static class MergeIterator implements RecordIterator {
// base file reader
private final ParquetColumnarRowSplitReader reader;
// log keys used for merging
@@ -628,15 +690,13 @@ public class MergeOnReadInputFormat
private final HoodieMergedLogRecordScanner scanner;
private final Schema tableSchema;
- private final Schema requiredSchema;
- private final int[] requiredPos;
private final boolean emitDelete;
private final int operationPos;
private final RowDataToAvroConverters.RowDataToAvroConverter
rowDataToAvroConverter;
private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
- private final GenericRecordBuilder recordBuilder;
- private final RowDataProjection projection;
+ private final Option<RowDataProjection> projection;
+ private final Option<Function<IndexedRecord, GenericRecord>>
avroProjection;
private final InstantRange instantRange;
@@ -651,7 +711,7 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
- MergeIterator(
+ public MergeIterator(
Configuration flinkConf,
org.apache.hadoop.conf.Configuration hadoopConf,
MergeOnReadInputSplit split,
@@ -663,19 +723,35 @@ public class MergeOnReadInputFormat
boolean emitDelete,
int operationPos,
ParquetColumnarRowSplitReader reader) { // the reader should be with
full schema
+ this(flinkConf, hadoopConf, split, tableRowType, requiredRowType,
tableSchema,
+ Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
+ Option.of(record -> buildAvroRecordBySchema(record, requiredSchema,
requiredPos, new GenericRecordBuilder(requiredSchema))),
+ emitDelete, operationPos, reader);
+ }
+
+ public MergeIterator(
+ Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ MergeOnReadInputSplit split,
+ RowType tableRowType,
+ RowType requiredRowType,
+ Schema tableSchema,
+ Option<RowDataProjection> projection,
+ Option<Function<IndexedRecord, GenericRecord>> avroProjection,
+ boolean emitDelete,
+ int operationPos,
+ ParquetColumnarRowSplitReader reader) { // the reader should be with
full schema
this.tableSchema = tableSchema;
this.reader = reader;
this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf,
hadoopConf);
this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
this.logKeysIterator = scanner.getRecords().keySet().iterator();
- this.requiredSchema = requiredSchema;
- this.requiredPos = requiredPos;
this.emitDelete = emitDelete;
this.operationPos = operationPos;
- this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+ this.avroProjection = avroProjection;
this.rowDataToAvroConverter =
RowDataToAvroConverters.createConverter(tableRowType);
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(requiredRowType);
- this.projection = RowDataProjection.instance(requiredRowType,
requiredPos);
+ this.projection = projection;
this.instantRange = split.getInstantRange().orElse(null);
}
@@ -703,18 +779,18 @@ public class MergeOnReadInputFormat
// deleted
continue;
}
- GenericRecord avroRecord = buildAvroRecordBySchema(
- mergedAvroRecord.get(),
- requiredSchema,
- requiredPos,
- recordBuilder);
+ IndexedRecord avroRecord = avroProjection.isPresent()
+ ? avroProjection.get().apply(mergedAvroRecord.get())
+ : mergedAvroRecord.get();
this.currentRecord = (RowData)
avroToRowDataConverter.convert(avroRecord);
this.currentRecord.setRowKind(rowKind);
return false;
}
}
// project the full record in base with required positions
- currentRecord = projection.project(currentRecord);
+ if (projection.isPresent()) {
+ currentRecord = projection.get().project(currentRecord);
+ }
return false;
}
// read the logs
@@ -725,11 +801,9 @@ public class MergeOnReadInputFormat
Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
if (insertAvroRecord.isPresent()) {
// the record is a DELETE if insertAvroRecord not present, skipping
- GenericRecord avroRecord = buildAvroRecordBySchema(
- insertAvroRecord.get(),
- requiredSchema,
- requiredPos,
- recordBuilder);
+ IndexedRecord avroRecord = avroProjection.isPresent()
+ ? avroProjection.get().apply(insertAvroRecord.get())
+ : insertAvroRecord.get();
this.currentRecord = (RowData)
avroToRowDataConverter.convert(avroRecord);
FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(),
this.operationPos);
return false;
@@ -775,12 +849,12 @@ public class MergeOnReadInputFormat
* Builder for {@link MergeOnReadInputFormat}.
*/
public static class Builder {
- private Configuration conf;
- private MergeOnReadTableState tableState;
- private List<DataType> fieldTypes;
- private String defaultPartName;
- private long limit = -1;
- private boolean emitDelete = false;
+ protected Configuration conf;
+ protected MergeOnReadTableState tableState;
+ protected List<DataType> fieldTypes;
+ protected String defaultPartName;
+ protected long limit = -1;
+ protected boolean emitDelete = false;
public Builder config(Configuration conf) {
this.conf = conf;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index cde646e41f..7a54fa4b40 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -43,7 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit {
private final long maxCompactionMemoryInBytes;
private final String mergeType;
private final Option<InstantRange> instantRange;
- private String fileId;
+ protected String fileId;
// for streaming reader to record the consumed offset,
// which is the start of next round reading.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
index 8076d982b9..72a57df5ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -68,6 +68,7 @@ public class RowDataProjection implements Serializable {
*/
public RowData project(RowData rowData) {
GenericRowData genericRowData = new
GenericRowData(this.fieldGetters.length);
+ genericRowData.setRowKind(rowData.getRowKind());
for (int i = 0; i < this.fieldGetters.length; i++) {
final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
genericRowData.setField(i, val);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 5a42f79aff..cdacefbf17 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -316,6 +317,8 @@ public class StreamerUtil {
conf.getOptional(FlinkOptions.KEYGEN_CLASS_NAME).orElse(SimpleAvroKeyGenerator.class.getName()))
.setHiveStylePartitioningEnable(conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING))
+ .setCDCEnabled(conf.getBoolean(FlinkOptions.CDC_ENABLED))
+
.setCDCSupplementalLoggingMode(conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE))
.setTimelineLayoutVersion(1)
.initTable(hadoopConf, basePath);
LOG.info("Table initialized under base path {}", basePath);
@@ -421,6 +424,7 @@ public class StreamerUtil {
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
final HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
+
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
// create the filesystem view storage properties for client
final FileSystemViewStorageConfig viewStorageConfig =
writeConfig.getViewStorageConfig();
// rebuild the view storage config with simplified options.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 49fb82709a..538b78d988 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
@@ -141,6 +142,41 @@ public class ITTestHoodieDataSource extends
AbstractTestBase {
assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
+ void
testStreamReadFromSpecifiedCommitWithChangelog(HoodieCDCSupplementalLoggingMode
mode) throws Exception {
+ streamTableEnv.getConfig().getConfiguration()
+ .setBoolean("table.dynamic-table-options.enabled", true);
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+ .option(FlinkOptions.CDC_ENABLED, true)
+ .option(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE, mode.getValue())
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ String firstCommit =
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+ List<Row> rows = execSelectSql(streamTableEnv,
+ "select * from t1/*+options('read.start-commit'='" + firstCommit +
"')*/", 10);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+
+ // insert another batch of data
+ execInsertSql(streamTableEnv, TestSQL.UPDATE_INSERT_T1);
+ List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+ assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_CHANGELOG);
+
+ // specify the start commit as earliest
+ List<Row> rows3 = execSelectSql(streamTableEnv,
+ "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
+ assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_MERGED);
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 9f82161908..f066a0b702 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -23,12 +23,16 @@ import
org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.table.HoodieTableSource;
+import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -48,8 +52,10 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -309,6 +315,89 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
+ void testReadWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode) throws
Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+ options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+ beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+ // write the insert data sets
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
+ assertThat(inputFormat, instanceOf(CdcInputFormat.class));
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ IncrementalInputSplits incrementalInputSplits =
IncrementalInputSplits.builder()
+ .rowType(TestConfigurations.ROW_TYPE)
+ .conf(conf)
+ .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+ .requiredPartitions(Collections.emptySet())
+ .skipCompaction(false)
+ .build();
+
+ // default read the latest commit
+ IncrementalInputSplits.Result splits =
incrementalInputSplits.inputSplitsCDC(metaClient, null);
+
+ List<RowData> result = readData(inputFormat,
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+ final String actual = TestData.rowDataToString(result);
+ final String expected = "["
+ + "-U[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], "
+ + "+U[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+ + "-U[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
+ + "+U[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+ + "-D[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+ + "-D[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3]]";
+ assertThat(actual, is(expected));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieCDCSupplementalLoggingMode.class)
+ void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode
mode) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+ options.put(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE.key(), mode.getValue());
+ options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
+ beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+ // write the insert data sets
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
+ assertThat(inputFormat, instanceOf(CdcInputFormat.class));
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ IncrementalInputSplits incrementalInputSplits =
IncrementalInputSplits.builder()
+ .rowType(TestConfigurations.ROW_TYPE)
+ .conf(conf)
+ .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+ .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2",
"par3", "par4")))
+ .skipCompaction(false)
+ .build();
+
+ // default read the latest commit
+ IncrementalInputSplits.Result splits =
incrementalInputSplits.inputSplitsCDC(metaClient, null);
+
+ List<RowData> result = readData(inputFormat,
splits.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+ final String actual = TestData.rowDataToString(result);
+ final String expected = "["
+ + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+ + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+ + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+ + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]";
+ assertThat(actual, is(expected));
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
@@ -631,10 +720,14 @@ public class TestInputFormat {
conf);
}
- @SuppressWarnings("unchecked, rawtypes")
+ @SuppressWarnings("rawtypes")
private static List<RowData> readData(InputFormat inputFormat) throws
IOException {
InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+ return readData(inputFormat, inputSplits);
+ }
+ @SuppressWarnings("unchecked, rawtypes")
+ private static List<RowData> readData(InputFormat inputFormat, InputSplit[]
inputSplits) throws IOException {
List<RowData> result = new ArrayList<>();
for (InputSplit inputSplit : inputSplits) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 8e1dd9964c..98053f7821 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -228,6 +228,36 @@ public class TestData {
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
+ // changelog details of test_source.data and test_source_2.data
+ public static List<RowData> DATA_SET_SOURCE_CHANGELOG = Arrays.asList(
+ updateBeforeRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 24,
+ TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33,
+ TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+ updateAfterRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 34,
+ TimestampData.fromEpochMillis(2000), StringData.fromString("par1")),
+ updateBeforeRow(StringData.fromString("id3"),
StringData.fromString("Julian"), 53,
+ TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+ updateAfterRow(StringData.fromString("id3"),
StringData.fromString("Julian"), 54,
+ TimestampData.fromEpochMillis(3000), StringData.fromString("par2")),
+ updateBeforeRow(StringData.fromString("id4"),
StringData.fromString("Fabian"), 31,
+ TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+ updateAfterRow(StringData.fromString("id4"),
StringData.fromString("Fabian"), 32,
+ TimestampData.fromEpochMillis(4000), StringData.fromString("par2")),
+ updateBeforeRow(StringData.fromString("id5"),
StringData.fromString("Sophia"), 18,
+ TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+ updateAfterRow(StringData.fromString("id5"),
StringData.fromString("Sophia"), 18,
+ TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id9"), StringData.fromString("Jane"),
19,
+ TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+ insertRow(StringData.fromString("id10"), StringData.fromString("Ella"),
38,
+ TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+ insertRow(StringData.fromString("id11"),
StringData.fromString("Phoebe"), 52,
+ TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+ );
+
// data set of test_source.data with partition 'par1' overwrite
public static List<RowData> DATA_SET_SOURCE_INSERT_OVERWRITE = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
24,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index c4c8f10360..4e90303b3b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -27,9 +27,9 @@ import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.TableSchemaResolver
import org.apache.hudi.common.table.cdc.HoodieCDCExtractor
+import org.apache.hudi.common.table.log.InstantRange
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.internal.schema.InternalSchema
-
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -73,7 +73,14 @@ class CDCRelation(
val tableStructSchema: StructType =
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
- val cdcExtractor: HoodieCDCExtractor = new HoodieCDCExtractor(metaClient,
startInstant, endInstant)
+ val cdcExtractor: HoodieCDCExtractor =
+ new HoodieCDCExtractor(
+ metaClient,
+ InstantRange.builder()
+ .startInstant(startInstant)
+ .endInstant(endInstant)
+ .nullableBoundary(true)
+ .rangeType(InstantRange.RangeType.OPEN_CLOSE).build())
override final def needConversion: Boolean = false
@@ -98,9 +105,9 @@ class CDCRelation(
hadoopConf = spark.sessionState.newHadoopConf()
)
- val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map {
pairs =>
+ val changes = cdcExtractor.extractCDCFileSplits().values().asScala.map {
splits =>
HoodieCDCFileGroupSplit(
- pairs.asScala.map(pair => (pair.getLeft,
pair.getRight)).sortBy(_._1).toArray
+ splits.asScala.sorted.toArray
)
}
val cdcRdd = new HoodieCDCRDD(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 49d5e19b16..7d96689ae0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -30,7 +30,6 @@ import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit,
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
-import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
@@ -67,10 +66,9 @@ import scala.collection.mutable
/**
* The split that will be processed by spark task.
+ * The [[changes]] should be sorted first.
*/
-case class HoodieCDCFileGroupSplit(
- commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)]
-)
+case class HoodieCDCFileGroupSplit(changes: Array[HoodieCDCFileSplit])
/**
* The Spark [[Partition]]'s implementation.
@@ -94,9 +92,7 @@ class HoodieCDCRDD(
private val confBroadcast = spark.sparkContext.broadcast(new
SerializableWritable(hadoopConf))
- private val cdcSupplementalLoggingMode =
HoodieCDCSupplementalLoggingMode.parse(
- metaClient.getTableConfig.cdcSupplementalLoggingMode
- )
+ private val cdcSupplementalLoggingMode =
metaClient.getTableConfig.cdcSupplementalLoggingMode
private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
@@ -158,7 +154,7 @@ class HoodieCDCRDD(
.build();
HoodieTableState(
pathToString(basePath),
- split.commitToChanges.map(_._1.getTimestamp).max,
+ split.changes.last.getInstant,
recordKeyField,
preCombineFieldOpt,
usesVirtualKeys = false,
@@ -201,10 +197,10 @@ class HoodieCDCRDD(
private lazy val projection: UnsafeProjection =
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
// Iterator on cdc file
- private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator
+ private val cdcFileIter = split.changes.iterator
// The instant that is currently being processed
- private var currentInstant: HoodieInstant = _
+ private var currentInstant: String = _
// The change file that is currently being processed
private var currentChangeFile: HoodieCDCFileSplit = _
@@ -420,9 +416,9 @@ class HoodieCDCRDD(
}
if (cdcFileIter.hasNext) {
- val pair = cdcFileIter.next()
- currentInstant = pair._1
- currentChangeFile = pair._2
+ val split = cdcFileIter.next()
+ currentInstant = split.getInstant
+ currentChangeFile = split
currentChangeFile.getCdcInferCase match {
case BASE_FILE_INSERT =>
assert(currentChangeFile.getCdcFile != null)
@@ -480,23 +476,23 @@ class HoodieCDCRDD(
recordToLoad = currentChangeFile.getCdcInferCase match {
case BASE_FILE_INSERT =>
InternalRow.fromSeq(Array(
- CDCRelation.CDC_OPERATION_INSERT,
convertToUTF8String(currentInstant.getTimestamp),
+ CDCRelation.CDC_OPERATION_INSERT,
convertToUTF8String(currentInstant),
null, null))
case BASE_FILE_DELETE =>
InternalRow.fromSeq(Array(
- CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant.getTimestamp),
+ CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
null, null))
case LOG_FILE =>
InternalRow.fromSeq(Array(
- null, convertToUTF8String(currentInstant.getTimestamp),
+ null, convertToUTF8String(currentInstant),
null, null))
case AS_IS =>
InternalRow.fromSeq(Array(
- null, convertToUTF8String(currentInstant.getTimestamp),
+ null, convertToUTF8String(currentInstant),
null, null))
case REPLACE_COMMIT =>
InternalRow.fromSeq(Array(
- CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant.getTimestamp),
+ CDCRelation.CDC_OPERATION_DELETE,
convertToUTF8String(currentInstant),
null, null))
}
}