This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 87f9fdaee90 [HUDI-7911] Enable cdc log for MOR table (#11490)
87f9fdaee90 is described below
commit 87f9fdaee9047caeb5bb0628feabec58a5b2ed10
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jul 9 08:28:43 2024 +0800
[HUDI-7911] Enable cdc log for MOR table (#11490)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 18 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 2 +-
.../apache/hudi/io/HoodieMergeHandleFactory.java | 6 +
.../hudi/common/model/WriteOperationType.java | 2 +-
.../hudi/common/table/cdc/HoodieCDCExtractor.java | 14 +-
.../table/read/IncrementalQueryAnalyzer.java | 40 ++++-
.../apache/hudi/configuration/FlinkOptions.java | 11 ++
.../apache/hudi/configuration/OptionsResolver.java | 23 ++-
.../apache/hudi/source/IncrementalInputSplits.java | 4 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 188 ++++++++++++++++++++-
.../table/format/mor/MergeOnReadInputFormat.java | 21 ++-
.../utils/BucketStreamWriteFunctionWrapper.java | 11 ++
.../sink/utils/StreamWriteFunctionWrapper.java | 11 ++
.../hudi/sink/utils/TestFunctionWrapper.java | 8 +
.../apache/hudi/table/ITTestHoodieDataSource.java | 40 +++++
.../apache/hudi/table/format/TestInputFormat.java | 63 ++++++-
.../org/apache/hudi/utils/TestCompactionUtil.java | 1 +
.../test/java/org/apache/hudi/utils/TestData.java | 2 +
.../scala/org/apache/hudi/cdc/CDCRelation.scala | 3 +-
19 files changed, 435 insertions(+), 33 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ac728ec8de6..268a0c81fae 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1295,11 +1295,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return
ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
}
- public boolean isCDCEnabled() {
- return getBooleanOrDefault(
- HoodieTableConfig.CDC_ENABLED,
HoodieTableConfig.CDC_ENABLED.defaultValue());
- }
-
public boolean isConsistentHashingEnabled() {
return getIndexType() == HoodieIndex.IndexType.BUCKET &&
getBucketIndexEngineType() ==
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING;
}
@@ -1309,6 +1304,19 @@ public class HoodieWriteConfig extends HoodieConfig {
&&
HoodieIndex.BucketIndexEngineType.SIMPLE.equals(getBucketIndexEngineType());
}
+ /**
+ * Returns whether the table writer would generate pure log files at the
very first place.
+ */
+ public boolean isYieldingPureLogForMor() {
+ switch (getIndexType()) {
+ case BUCKET:
+ case FLINK_STATE:
+ return true;
+ default:
+ return false;
+ }
+ }
+
public boolean isConsistentLogicalTimestampEnabled() {
return
getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index b02fe871b27..30bcc767274 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -172,7 +172,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
String prevCommit = instantTime;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
- if (config.isCDCEnabled()) {
+ if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic
update sequence.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
index ac8a08c96ad..d049f884b51 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java
@@ -88,6 +88,12 @@ public class HoodieMergeHandleFactory {
if (table.requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table,
keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
+ } else if (table.getMetaClient().getTableConfig().isCDCEnabled() &&
writeConfig.isYieldingPureLogForMor()) {
+ // IMPORTANT: only index type that yields pure log files need to enable
the cdc log files for compaction,
+ // index type such as the BLOOM does not need this because it would do
delta merge for inserts and generates log for updates,
+ // both of these two cases are already handled in HoodieCDCExtractor.
+ return new HoodieMergeHandleWithChangeLog<>(writeConfig, instantTime,
table, keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
} else {
return new HoodieMergeHandle<>(writeConfig, instantTime, table,
keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 96e00e6b955..268703710c5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -126,7 +126,7 @@ public enum WriteOperationType {
/**
* Whether the operation changes the dataset.
*/
- public static boolean isDataChange(WriteOperationType operation) {
+ public static boolean yieldChanges(WriteOperationType operation) {
return operation == WriteOperationType.INSERT
|| operation == WriteOperationType.UPSERT
|| operation == WriteOperationType.UPSERT_PREPPED
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index e38651336ad..28de790050a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -90,14 +90,18 @@ public class HoodieCDCExtractor {
private HoodieTableFileSystemView fsView;
+ private final boolean consumeChangesFromCompaction;
+
public HoodieCDCExtractor(
HoodieTableMetaClient metaClient,
- InstantRange range) {
+ InstantRange range,
+ boolean consumeChangesFromCompaction) {
this.metaClient = metaClient;
this.basePath = metaClient.getBasePath();
this.storage = metaClient.getStorage();
this.supplementalLoggingMode =
metaClient.getTableConfig().cdcSupplementalLoggingMode();
this.instantRange = range;
+ this.consumeChangesFromCompaction = consumeChangesFromCompaction;
init();
}
@@ -226,7 +230,8 @@ public class HoodieCDCExtractor {
}
return Pair.of(instant, commitMetadata);
}).filter(pair ->
-
WriteOperationType.isDataChange(pair.getRight().getOperationType())
+
WriteOperationType.yieldChanges(pair.getRight().getOperationType())
+ || (this.consumeChangesFromCompaction &&
pair.getRight().getOperationType() == WriteOperationType.COMPACT)
).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
} catch (Exception e) {
throw new HoodieIOException("Fail to get the commit metadata for CDC");
@@ -322,8 +327,9 @@ public class HoodieCDCExtractor {
if (fileSliceOpt.isPresent()) {
Pair<String, List<String>> fileSlice = fileSliceOpt.get();
try {
- HoodieBaseFile baseFile = new HoodieBaseFile(
- storage.getPathInfo(new StoragePath(partitionPath,
fileSlice.getLeft())));
+ HoodieBaseFile baseFile = fileSlice.getLeft().isEmpty()
+ ? null
+ : new HoodieBaseFile(storage.getPathInfo(new
StoragePath(partitionPath, fileSlice.getLeft())));
List<StoragePath> logFilePaths = fileSlice.getRight().stream()
.filter(logFile -> !logFile.equals(currentLogFileName))
.map(logFile -> new StoragePath(partitionPath, logFile))
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
index ca8ae575898..03c1f85f598 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/IncrementalQueryAnalyzer.java
@@ -112,6 +112,7 @@ public class IncrementalQueryAnalyzer {
private final boolean skipCompaction;
private final boolean skipClustering;
private final boolean skipInsertOverwrite;
+ private final boolean readCdcFromChangelog;
private final int limit;
private IncrementalQueryAnalyzer(
@@ -122,6 +123,7 @@ public class IncrementalQueryAnalyzer {
boolean skipCompaction,
boolean skipClustering,
boolean skipInsertOverwrite,
+ boolean readCdcFromChangelog,
int limit) {
this.metaClient = metaClient;
this.startTime = Option.ofNullable(startTime);
@@ -130,6 +132,7 @@ public class IncrementalQueryAnalyzer {
this.skipCompaction = skipCompaction;
this.skipClustering = skipClustering;
this.skipInsertOverwrite = skipInsertOverwrite;
+ this.readCdcFromChangelog = readCdcFromChangelog;
this.limit = limit;
}
@@ -209,13 +212,23 @@ public class IncrementalQueryAnalyzer {
private HoodieTimeline getFilteredTimeline(HoodieTableMetaClient metaClient)
{
HoodieTimeline timeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
- return filterInstantsAsPerUserConfigs(metaClient, timeline,
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite);
+ return filterInstantsAsPerUserConfigs(metaClient, timeline,
this.skipCompaction, this.skipClustering, this.skipInsertOverwrite,
this.readCdcFromChangelog);
}
private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient
metaClient, String startInstant) {
HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant, false);
HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
- return filterInstantsAsPerUserConfigs(metaClient,
archivedCompleteTimeline, this.skipCompaction, this.skipClustering,
this.skipInsertOverwrite);
+ return filterInstantsAsPerUserConfigs(metaClient,
archivedCompleteTimeline, this.skipCompaction, this.skipClustering,
this.skipInsertOverwrite, this.readCdcFromChangelog);
+ }
+
+ @VisibleForTesting
+ public static HoodieTimeline filterInstantsAsPerUserConfigs(
+ HoodieTableMetaClient metaClient,
+ HoodieTimeline timeline,
+ boolean skipCompaction,
+ boolean skipClustering,
+ boolean skipInsertOverwrite) {
+ return filterInstantsAsPerUserConfigs(metaClient, timeline,
skipCompaction, skipClustering, skipInsertOverwrite, false);
}
/**
@@ -225,13 +238,22 @@ public class IncrementalQueryAnalyzer {
*
* @return the filtered timeline
*/
- @VisibleForTesting
- public static HoodieTimeline
filterInstantsAsPerUserConfigs(HoodieTableMetaClient metaClient, HoodieTimeline
timeline, boolean skipCompaction, boolean skipClustering, boolean
skipInsertOverwrite) {
+ private static HoodieTimeline filterInstantsAsPerUserConfigs(
+ HoodieTableMetaClient metaClient,
+ HoodieTimeline timeline,
+ boolean skipCompaction,
+ boolean skipClustering,
+ boolean skipInsertOverwrite,
+ boolean readCdcFromChangelog) {
final HoodieTimeline oriTimeline = timeline;
- if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ &
skipCompaction) {
+ if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ &&
skipCompaction) {
// the compaction commit uses 'commit' as action which is tricky
timeline = timeline.filter(instant ->
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
}
+ if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ &&
metaClient.getTableConfig().isCDCEnabled() && readCdcFromChangelog) {
+ // only compaction yields changelog file
+ timeline = timeline.filter(instant ->
!instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+ }
if (skipClustering) {
timeline = timeline.filter(instant ->
!ClusteringUtils.isCompletedClusteringInstant(instant, oriTimeline));
}
@@ -261,6 +283,7 @@ public class IncrementalQueryAnalyzer {
private boolean skipCompaction = false;
private boolean skipClustering = false;
private boolean skipInsertOverwrite = false;
+ private boolean readCdcFromChangelog = false;
/**
* Maximum number of instants to read per run.
*/
@@ -304,6 +327,11 @@ public class IncrementalQueryAnalyzer {
return this;
}
+ public Builder readCdcFromChangelog(boolean readCdcFromChangelog) {
+ this.readCdcFromChangelog = readCdcFromChangelog;
+ return this;
+ }
+
public Builder limit(int limit) {
this.limit = limit;
return this;
@@ -311,7 +339,7 @@ public class IncrementalQueryAnalyzer {
public IncrementalQueryAnalyzer build() {
return new
IncrementalQueryAnalyzer(Objects.requireNonNull(this.metaClient),
this.startTime, this.endTime,
- Objects.requireNonNull(this.rangeType), this.skipCompaction,
this.skipClustering, this.skipInsertOverwrite, this.limit);
+ Objects.requireNonNull(this.rangeType), this.skipCompaction,
this.skipClustering, this.skipInsertOverwrite, this.readCdcFromChangelog,
this.limit);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9a8b99a85c5..0a0f0612693 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -361,6 +361,17 @@ public class FlinkOptions extends HoodieConfig {
+ "the avg read instants number per-second would be
'read.commits.limit'/'read.streaming.check-interval', by "
+ "default no limit");
+ @AdvancedConfig
+ public static final ConfigOption<Boolean> READ_CDC_FROM_CHANGELOG =
ConfigOptions
+ .key("read.cdc.from.changelog")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to consume the delta changes only from the cdc
changelog files.\n"
+ + "When CDC is enabled, i). for COW table, the changelog is
generated on each file update;\n"
+ + "ii). for MOR table, the changelog is generated on compaction.\n"
+ + "By default, always read from the changelog file,\n"
+ + "once it is disabled, the reader would infer the changes based on
the file slice dependencies.");
+
@AdvancedConfig
public static final ConfigOption<Boolean> READ_DATA_SKIPPING_ENABLED =
ConfigOptions
.key("read.data.skipping.enabled")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index e2a1ab0c320..9d4e289cb20 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -194,7 +194,7 @@ public class OptionsResolver {
/**
* Returns whether the source should emit changelog.
*
- * @return true if the source is read as streaming with changelog mode
enabled
+ * @return true if the source should emit changes.
*/
public static boolean emitChangelog(Configuration conf) {
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) &&
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)
@@ -202,6 +202,15 @@ public class OptionsResolver {
|| isIncrementalQuery(conf) &&
conf.getBoolean(FlinkOptions.CDC_ENABLED);
}
+ /**
+ * Returns whether the source should emit deletes.
+ *
+ * @return true if the source is read as streaming with changelog mode
enabled.
+ */
+ public static boolean emitDeletes(Configuration conf) {
+ return conf.getBoolean(FlinkOptions.READ_AS_STREAMING) &&
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+ }
+
/**
* Returns whether there is need to schedule the async compaction.
*
@@ -354,6 +363,18 @@ public class OptionsResolver {
return handlingMode == HollowCommitHandling.USE_TRANSITION_TIME;
}
+ /**
+ * Returns whether to read the data changes only from changelog files. When
CDC is enabled,
+ * i) for COW table, the changelog is generated on each file update;
+ * ii) for MOR table, the changelog is generated on compaction.
+ *
+ * <p>By default, always read from the changelog file,
+ * once it is disabled, the reader would infer the changes based on the file
slice dependencies.
+ */
+ public static boolean readCDCFromChangelog(Configuration conf) {
+ return conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG);
+ }
+
/**
* Returns the index type.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 3ffed285584..f2c13a1165e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -141,6 +141,7 @@ public class IncrementalInputSplits implements Serializable
{
.skipCompaction(skipCompaction)
.skipClustering(skipClustering)
.skipInsertOverwrite(skipInsertOverwrite)
+
.readCdcFromChangelog(this.conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG))
.build();
IncrementalQueryAnalyzer.QueryContext analyzingResult = analyzer.analyze();
@@ -251,6 +252,7 @@ public class IncrementalInputSplits implements Serializable
{
.skipCompaction(skipCompaction)
.skipClustering(skipClustering)
.skipInsertOverwrite(skipInsertOverwrite)
+
.readCdcFromChangelog(this.conf.getBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG))
.limit(OptionsResolver.getReadCommitsLimit(conf))
.build();
@@ -383,7 +385,7 @@ public class IncrementalInputSplits implements Serializable
{
private List<MergeOnReadInputSplit> getCdcInputSplits(
HoodieTableMetaClient metaClient,
InstantRange instantRange) {
- HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient,
instantRange);
+ HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient,
instantRange, OptionsResolver.readCDCFromChangelog(this.conf));
Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits =
extractor.extractCDCFileSplits();
if (fileSplits.isEmpty()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 8cd8be8b5cc..97d3278977d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -19,17 +19,27 @@
package org.apache.hudi.table.format.cdc;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
@@ -48,11 +58,13 @@ import
org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.RowDataProjection;
+import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -62,6 +74,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -69,6 +83,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -84,6 +99,7 @@ import static
org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
*/
public class CdcInputFormat extends MergeOnReadInputFormat {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(CdcInputFormat.class);
private CdcInputFormat(
Configuration conf,
@@ -163,7 +179,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
switch (fileSplit.getCdcInferCase()) {
case BASE_FILE_INSERT:
ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
- "CDC file path should exist and be only one");
+ "CDC file path should exist and be singleton");
String path = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
return new AddBaseFileIterator(getBaseFileIterator(path));
case BASE_FILE_DELETE:
@@ -185,6 +201,12 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
default:
throw new AssertionError("Unexpected mode" + mode);
}
+ case LOG_FILE:
+ ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
+ "CDC file path should exist and be singleton");
+ String logFilepath = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
+ return new DataLogFileIterator(conf, hadoopConf,
internalSchemaManager, maxCompactionMemoryInBytes, imageManager, fileSplit,
+ singleLogFile2Split(tablePath, logFilepath,
maxCompactionMemoryInBytes), tableState);
case REPLACE_COMMIT:
return new ReplaceCommitIterator(conf, tablePath, tableState,
fileSplit, this::getFileSliceIterator);
default:
@@ -309,6 +331,135 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
}
}
+ // accounting to HoodieCDCInferenceCase.LOG_FILE
+ static class DataLogFileIterator implements ClosableIterator<RowData> {
+ private final Schema tableSchema;
+ private final long maxCompactionMemoryInBytes;
+ private final ImageManager imageManager;
+ private final HoodieMergedLogRecordScanner scanner;
+ private final Iterator<String> logRecordsKeyIterator;
+ private final RowDataProjection projection;
+ private final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
+ private final RowDataToAvroConverters.RowDataToAvroConverter
rowDataToAvroConverter;
+ private final HoodieRecordMerger recordMerger;
+ private final TypedProperties payloadProps;
+
+ private ExternalSpillableMap<String, byte[]> beforeImages;
+ private RowData currentImage;
+ private RowData sideImage;
+
+ DataLogFileIterator(
+ Configuration flinkConf,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ InternalSchemaManager schemaManager,
+ long maxCompactionMemoryInBytes,
+ ImageManager imageManager,
+ HoodieCDCFileSplit cdcFileSplit,
+ MergeOnReadInputSplit split,
+ MergeOnReadTableState tableState) throws IOException {
+ this.tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.imageManager = imageManager;
+ this.scanner = FormatUtils.logScanner(split, tableSchema,
schemaManager.getQuerySchema(), flinkConf, hadoopConf);
+ this.logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
+ this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRowType(),
flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
+ this.rowDataToAvroConverter =
RowDataToAvroConverters.createConverter(tableState.getRowType(),
flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
+ this.projection =
tableState.getRequiredRowType().equals(tableState.getRowType())
+ ? null
+ : RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
+
+ List<String> mergers =
Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
+ .map(String::trim)
+ .distinct()
+ .collect(Collectors.toList());
+ this.recordMerger =
HoodieRecordUtils.createRecordMerger(split.getTablePath(), EngineType.FLINK,
mergers, flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY));
+ this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
+ initImages(cdcFileSplit);
+ }
+
+ private void initImages(HoodieCDCFileSplit fileSplit) throws IOException {
+ // init before images
+ if (fileSplit.getBeforeFileSlice().isPresent() &&
!fileSplit.getBeforeFileSlice().get().isEmpty()) {
+ this.beforeImages = this.imageManager.getOrLoadImages(
+ maxCompactionMemoryInBytes, fileSplit.getBeforeFileSlice().get());
+ } else {
+ // still initializes an empty map
+ this.beforeImages =
FormatUtils.spillableMap(this.imageManager.writeConfig,
maxCompactionMemoryInBytes);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.sideImage != null) {
+ this.currentImage = this.sideImage;
+ this.sideImage = null;
+ return true;
+ }
+ while (logRecordsKeyIterator.hasNext()) {
+ String recordKey = logRecordsKeyIterator.next();
+ HoodieAvroRecord<?> record = (HoodieAvroRecord<?>)
scanner.getRecords().get(recordKey);
+ Option<IndexedRecord> val =
MergeOnReadInputFormat.getInsertVal(record, this.tableSchema);
+ RowData existed =
imageManager.removeImageRecord(record.getRecordKey(), beforeImages);
+ if (val.isEmpty()) {
+ // it's a deleted record.
+ if (existed != null) {
+ // there is a real record deleted.
+ existed.setRowKind(RowKind.DELETE);
+ this.currentImage = existed;
+ return true;
+ }
+ } else {
+ IndexedRecord newAvroVal = val.get();
+ if (existed == null) {
+ // a new record is inserted.
+ RowData newRow = (RowData)
avroToRowDataConverter.convert(newAvroVal);
+ newRow.setRowKind(RowKind.INSERT);
+ this.currentImage = newRow;
+ return true;
+ } else {
+ // an existed record is updated.
+ GenericRecord historyAvroRecord = (GenericRecord)
rowDataToAvroConverter.convert(tableSchema, existed);
+ HoodieRecord<IndexedRecord> merged =
mergeRowWithLog(historyAvroRecord, record).get();
+ if (merged.getData() != historyAvroRecord) {
+ // update happens
+ existed.setRowKind(RowKind.UPDATE_BEFORE);
+ this.currentImage = existed;
+
+ RowData mergedRow = (RowData)
avroToRowDataConverter.convert(merged.getData());
+ mergedRow.setRowKind(RowKind.UPDATE_AFTER);
+ this.imageManager.updateImageRecord(record.getRecordKey(),
beforeImages, mergedRow);
+ this.sideImage = mergedRow;
+
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public RowData next() {
+ return this.projection != null ?
this.projection.project(this.currentImage) : this.currentImage;
+ }
+
+ @Override
+ public void close() {
+ this.scanner.close();
+ this.imageManager.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Option<HoodieRecord<IndexedRecord>> mergeRowWithLog(GenericRecord
historyAvroRecord, HoodieRecord<?> newRecord) {
+ HoodieAvroIndexedRecord historyAvroIndexedRecord = new
HoodieAvroIndexedRecord(historyAvroRecord);
+ try {
+ return recordMerger.merge(historyAvroIndexedRecord, tableSchema,
newRecord, tableSchema, payloadProps).map(Pair::getLeft);
+ } catch (IOException e) {
+ throw new HoodieIOException("Merge base and delta payloads exception",
e);
+ }
+ }
+ }
+
abstract static class BaseImageIterator implements ClosableIterator<RowData>
{
private final Schema requiredSchema;
private final int[] requiredPos;
@@ -673,6 +824,33 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
}
}
+ public void updateImageRecord(
+ String recordKey,
+ ExternalSpillableMap<String, byte[]> cache,
+ RowData row) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ try {
+ serializer.serialize(row, new BytesArrayOutputView(baos));
+ } catch (IOException e) {
+ throw new HoodieException("Serialize row data into bytes exception",
e);
+ }
+ cache.put(recordKey, baos.toByteArray());
+ }
+
+ public RowData removeImageRecord(
+ String recordKey,
+ ExternalSpillableMap<String, byte[]> cache) {
+ byte[] bytes = cache.remove(recordKey);
+ if (bytes == null) {
+ return null;
+ }
+ try {
+ return serializer.deserialize(new BytesArrayInputView(bytes));
+ } catch (IOException e) {
+ throw new HoodieException("Deserialize bytes into row data exception",
e);
+ }
+ }
+
@Override
public void close() {
this.cache.values().forEach(ExternalSpillableMap::close);
@@ -741,7 +919,13 @@ public class CdcInputFormat extends MergeOnReadInputFormat
{
.collect(Collectors.toList()));
String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(0, basePath, logPaths,
- fileSlice.getBaseInstantTime(), tablePath, maxCompactionMemoryInBytes,
+ fileSlice.getLatestInstantTime(), tablePath,
maxCompactionMemoryInBytes,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId());
}
+
+ public static MergeOnReadInputSplit singleLogFile2Split(String tablePath,
String filePath, long maxCompactionMemoryInBytes) {
+ return new MergeOnReadInputSplit(0, null,
Option.of(Collections.singletonList(filePath)),
+ FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)),
tablePath, maxCompactionMemoryInBytes,
+ FlinkOptions.REALTIME_PAYLOAD_COMBINE, null,
FSUtils.getFileIdFromLogPath(new StoragePath(filePath)));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 9946eae930c..66b44576f80 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -149,7 +149,7 @@ public class MergeOnReadInputFormat
*/
private boolean closed = true;
- private final InternalSchemaManager internalSchemaManager;
+ protected final InternalSchemaManager internalSchemaManager;
protected MergeOnReadInputFormat(
Configuration conf,
@@ -205,7 +205,7 @@ public class MergeOnReadInputFormat
}
} else if (!split.getBasePath().isPresent()) {
// log files only
- if (OptionsResolver.emitChangelog(conf)) {
+ if (OptionsResolver.emitDeletes(conf)) {
return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
} else {
return new LogFileOnlyIterator(getLogFileIterator(split));
@@ -312,7 +312,7 @@ public class MergeOnReadInputFormat
try {
return getBaseFileIterator(path, IntStream.range(0,
this.tableState.getRowType().getFieldCount()).toArray());
} catch (IOException e) {
- throw new HoodieException("Get reader error for path: " + path);
+ throw new HoodieException("Get reader error for path: " + path, e);
}
}
@@ -441,13 +441,8 @@ public class MergeOnReadInputFormat
@Override
public boolean hasNext() {
while (recordsIterator.hasNext()) {
- Option<IndexedRecord> curAvroRecord = null;
final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord)
recordsIterator.next();
- try {
- curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
- } catch (IOException e) {
- throw new HoodieException("Get avro insert value error for key: "
+ hoodieRecord.getRecordKey(), e);
- }
+ Option<IndexedRecord> curAvroRecord = getInsertVal(hoodieRecord,
tableSchema);
if (curAvroRecord.isPresent()) {
final IndexedRecord avroRecord = curAvroRecord.get();
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
@@ -475,6 +470,14 @@ public class MergeOnReadInputFormat
};
}
+ protected static Option<IndexedRecord> getInsertVal(HoodieAvroRecord<?>
hoodieRecord, Schema tableSchema) {
+ try {
+ return hoodieRecord.getData().getInsertValue(tableSchema);
+ } catch (IOException e) {
+ throw new HoodieException("Get avro insert value error for key: " +
hoodieRecord.getRecordKey(), e);
+ }
+ }
+
protected ClosableIterator<RowData>
getFullLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new
Schema.Parser().parse(tableState.getAvroSchema());
final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter =
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index df648df9ac0..319ff170ca3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -169,6 +169,17 @@ public class BucketStreamWriteFunctionWrapper<I>
implements TestFunctionWrapper<
}
}
+ @Override
+ public void inlineCompaction() {
+ if (asyncCompaction) {
+ try {
+ compactFunctionWrapper.compact(1); // always uses a constant
checkpoint ID.
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+ }
+
public void close() throws Exception {
coordinator.close();
ioManager.close();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c65e42f1521..fff916bbcc1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -216,6 +216,17 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
}
+ @Override
+ public void inlineCompaction() {
+ if (asyncCompaction) {
+ try {
+ compactFunctionWrapper.compact(1); // always uses a constant
checkpoint ID.
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+ }
+
public void jobFailover() throws Exception {
coordinatorFails();
subTaskFails(0, 0);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index faee168bf25..860b3aee9c0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -68,6 +68,14 @@ public interface TestFunctionWrapper<I> {
*/
void checkpointComplete(long checkpointId);
+ /**
+ * Keep this interface for batch inline compaction job. The batch pipeline
triggers the commit of Hudi table
+ * with "endInput" events in the coordinator whereas there is no good chance
to plug in the compaction sub-pipeline.
+ */
+ default void inlineCompaction() {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Triggers the job failover, including the coordinator and the write tasks.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index d9f3badf1fb..7cd1f4b3e7b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1459,6 +1459,46 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result2.subList(result2.size() - 2, result2.size()),
"[-U[1], +U[2]]");
}
+ @Test
+ void testReadChangelogIncrementalMor() throws Exception {
+ TableEnvironment tableEnv = streamTableEnv;
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_NAME, "t1");
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+ conf.setBoolean(FlinkOptions.READ_CDC_FROM_CHANGELOG, false); // calculate
the changes on the fly
+ conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); // for batch
upsert
+ conf.setBoolean(FlinkOptions.CDC_ENABLED, true);
+
+ // write 3 batches of the same data set
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+
+ String latestCommit =
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+ .option(FlinkOptions.CDC_ENABLED, true)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.dataSetUpsert(2, 1));
+
+ // write another 10 batches of dataset
+ for (int i = 0; i < 10; i++) {
+ TestData.writeDataAsBatch(TestData.dataSetInsert(1, 2), conf);
+ }
+
+ String firstCommit =
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
+ final String query = String.format("select count(*) from t1/*+
options('read.start-commit'='%s')*/", firstCommit);
+ List<Row> result2 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery(query).execute().collect());
+ assertRowsEquals(result2.subList(result2.size() - 2, result2.size()),
"[-U[1], +U[2]]");
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws
Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 235d2421c38..37c2c704a1c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -101,7 +101,9 @@ public class TestInputFormat {
void beforeEach(HoodieTableType tableType, Map<String, String> options)
throws IOException {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
- conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close
the async compaction
+ if (!conf.contains(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
+ conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by
default close the async compaction
+ }
options.forEach((key, value) -> conf.setString(key, value));
StreamerUtil.initTableIfNotExists(conf);
@@ -875,6 +877,64 @@ public class TestInputFormat {
assertThat(commits.size(), is(3));
+ testReadChangelogInternal(commits);
+ }
+
+ @Test
+ void testReadChangelogIncrementallyForMor() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.QUERY_TYPE.key(),
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+ options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); // for
batch update
+ options.put(FlinkOptions.READ_CDC_FROM_CHANGELOG.key(), "false"); //
infers the data changes on the fly
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write 3 commits first
+ // write the same dataset 3 times to generate changelog
+ for (int i = 0; i < 3; i++) {
+ List<RowData> dataset = TestData.dataSetInsert(1, 2);
+ TestData.writeDataAsBatch(dataset, conf);
+ }
+
+ HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+ new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)),
tempFile.getAbsolutePath());
+ List<String> commits =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
+ .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
+
+ assertThat(commits.size(), is(3));
+
+ testReadChangelogInternal(commits);
+ }
+
+ @Test
+ void testReadChangelogIncrementallyForMorWithCompaction() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.QUERY_TYPE.key(),
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ options.put(FlinkOptions.CDC_ENABLED.key(), "true");
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "true");
+ options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); //
compact for every commit
+ options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); // for
batch update
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write 3 commits first
+ // write the same dataset 3 times to generate changelog
+ for (int i = 0; i < 3; i++) {
+ List<RowData> dataset = TestData.dataSetInsert(1, 2);
+ TestData.writeDataAsBatch(dataset, conf);
+ }
+
+ HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(
+ new
HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)),
tempFile.getAbsolutePath());
+ List<String> commits =
metaClient.getCommitsTimeline().filterCompletedInstants()
+ .filter(instant ->
instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)).getInstantsAsStream()
+ .map(HoodieInstant::getCompletionTime).collect(Collectors.toList());
+
+ assertThat(commits.size(), is(3));
+
+ testReadChangelogInternal(commits);
+ }
+
+ private void testReadChangelogInternal(List<String> commits) throws
IOException {
// only the start commit
conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
this.tableSource = getTableSource(conf);
@@ -884,7 +944,6 @@ public class TestInputFormat {
List<RowData> actual1 = readData(inputFormat1);
final List<RowData> expected1 = TestData.dataSetUpsert(2, 1, 2, 1);
TestData.assertRowDataEquals(actual1, expected1);
-
// only the start commit: earliest
conf.setString(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
this.tableSource = getTableSource(conf);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 932d9972bf9..3845b6150e5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -132,6 +132,7 @@ public class TestCompactionUtil {
void testScheduleCompaction() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(),
FlinkOptions.TIME_ELAPSED);
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
beforeEach(options);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 7b14614cd37..3dd5d183049 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -560,6 +560,8 @@ public class TestData {
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+ funcWrapper.inlineCompaction();
+
funcWrapper.close();
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
index a951a60698d..eb91c255c22 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala
@@ -80,7 +80,8 @@ class CDCRelation(
.startInstant(startInstant)
.endInstant(endInstant)
.nullableBoundary(true)
- .rangeType(InstantRange.RangeType.OPEN_CLOSED).build())
+ .rangeType(InstantRange.RangeType.OPEN_CLOSED).build(),
+ false)
override final def needConversion: Boolean = false