This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 236658b4cc9 [HUDI-6157] Fix potential data loss for flink streaming
source from table with multi writer (#8611)
236658b4cc9 is described below
commit 236658b4cc9ea28e9fa4a3a94d87315a31be4ccc
Author: Danny Chan <[email protected]>
AuthorDate: Thu May 4 20:42:05 2023 +0800
[HUDI-6157] Fix potential data loss for flink streaming source from table
with multi writer (#8611)
---
.../apache/hudi/configuration/OptionsResolver.java | 11 ++
.../sink/partitioner/profile/WriteProfiles.java | 18 ++
.../apache/hudi/source/IncrementalInputSplits.java | 197 +++++++++++++++------
.../hudi/source/StreamReadMonitoringFunction.java | 29 +--
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 14 ++
.../hudi/source/TestIncrementalInputSplits.java | 12 +-
.../source/TestStreamReadMonitoringFunction.java | 104 +++++++++++
.../apache/hudi/table/format/TestInputFormat.java | 115 ++++++++++--
.../test/java/org/apache/hudi/utils/TestUtils.java | 18 ++
10 files changed, 436 insertions(+), 84 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 1cceb619c54..113155f47eb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -260,6 +260,17 @@ public class OptionsResolver {
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
}
+ /**
+ * Returns whether to read the instants using completion time.
+ *
+ * <p>A Hudi instant contains both the txn start time and completion time,
for incremental subscription
+ * of the source reader, using completion time to filter the candidate
instants can avoid data loss
+ * in scenarios like multiple writers.
+ */
+ public static boolean isReadByTxnCompletionTime(Configuration conf) {
+ return
conf.getBoolean(HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.key(),
HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.defaultValue());
+ }
+
/**
* Returns the index type.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
index 7c4886852f1..2f959b241dd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
@@ -82,6 +82,24 @@ public class WriteProfiles {
PROFILES.remove(path);
}
+ /**
+ * Returns all the incremental write file statuses with the given commits
metadata.
+ * Only existing files are included.
+ *
+ * @param basePath Table base path
+ * @param hadoopConf The hadoop conf
+ * @param metadataList The commit metadata list (should in ascending
order)
+ * @param tableType The table type
+ * @return the file status array
+ */
+ public static FileStatus[] getFilesFromMetadata(
+ Path basePath,
+ Configuration hadoopConf,
+ List<HoodieCommitMetadata> metadataList,
+ HoodieTableType tableType) {
+ return getFilesFromMetadata(basePath, hadoopConf, metadataList, tableType,
true);
+ }
+
/**
* Returns all the incremental write file statuses with the given commits
metadata.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 2c21f40ce57..ba96b6acead 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -41,6 +41,7 @@ import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -67,6 +68,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
/**
@@ -125,14 +127,12 @@ public class IncrementalInputSplits implements
Serializable {
* Returns the incremental input splits.
*
* @param metaClient The meta client
- * @param hadoopConf The hadoop configuration
* @param cdcEnabled Whether cdc is enabled
*
* @return The list of incremental input splits or empty if there are no new
instants
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
- org.apache.hadoop.conf.Configuration hadoopConf,
boolean cdcEnabled) {
HoodieTimeline commitTimeline = getReadTimeline(metaClient);
if (commitTimeline.empty()) {
@@ -209,7 +209,7 @@ public class IncrementalInputSplits implements Serializable
{
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
- FileStatus[] files = WriteProfiles.getFilesFromMetadata(path,
hadoopConf, metadataList, metaClient.getTableType(), false);
+ FileStatus[] files = WriteProfiles.getFilesFromMetadata(path,
metaClient.getHadoopConf(), metadataList, metaClient.getTableType(), false);
if (files == null) {
LOG.warn("Found deleted files in metadata, fall back to full table
scan.");
// fallback to full table scan
@@ -241,16 +241,16 @@ public class IncrementalInputSplits implements
Serializable {
* Returns the incremental input splits.
*
* @param metaClient The meta client
- * @param hadoopConf The hadoop configuration
* @param issuedInstant The last issued instant, only valid in streaming read
+ * @param issuedOffset The last issued offset, only valid in streaming read
* @param cdcEnabled Whether cdc is enabled
*
* @return The list of incremental input splits or empty if there are no new
instants
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
- @Nullable org.apache.hadoop.conf.Configuration hadoopConf,
- String issuedInstant,
+ @Nullable String issuedInstant,
+ @Nullable String issuedOffset,
boolean cdcEnabled) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = getReadTimeline(metaClient);
@@ -258,85 +258,152 @@ public class IncrementalInputSplits implements
Serializable {
LOG.warn("No splits found for the table under path " + path);
return Result.EMPTY;
}
+
+ // Assumes a timeline:
+ // c1.inflight, c2(issued instant), c3, c4
+ // -> c1, c2(issued instant), c3, c4
+ // c1, c3 and c4 are the candidate instants,
+ // we call c1 a 'hollow' instant which has lower version number but
greater completion time,
+ // filtering the timeline using just c2 could cause data loss,
+ // check these hollow instants first.
+ Result hollowSplits = getHollowInputSplits(metaClient,
metaClient.getHadoopConf(), issuedInstant, issuedOffset, commitTimeline,
cdcEnabled);
+
List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline,
issuedInstant);
// get the latest instant that satisfies condition
- final HoodieInstant instantToIssue = instants.size() == 0 ? null :
instants.get(instants.size() - 1);
+ final String endInstant = instants.size() == 0 ? null :
instants.get(instants.size() - 1).getTimestamp();
final InstantRange instantRange;
- if (instantToIssue != null) {
+ if (endInstant != null) {
// when cdc is enabled, returns instant range with nullable boundary
// to filter the reading instants on the timeline
- instantRange = getInstantRange(issuedInstant,
instantToIssue.getTimestamp(), cdcEnabled);
- } else {
+ instantRange = getInstantRange(issuedInstant, endInstant, cdcEnabled);
+ } else if (hollowSplits.isEmpty()) {
LOG.info("No new instant found for the table under path " + path + ",
skip reading");
return Result.EMPTY;
+ } else {
+ return hollowSplits;
}
- final Set<String> readPartitions;
- final FileStatus[] fileStatuses;
+ // version number should be monotonically increasing
+ // fetch the instant offset by completion time
+ String offsetToIssue =
instants.stream().map(HoodieInstant::getStateTransitionTime).max(String::compareTo).orElse(endInstant);
if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = getFileIndex();
- readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths());
+
+ Set<String> readPartitions = new
TreeSet<>(fileIndex.getOrBuildPartitionPaths());
if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading under path: " + path);
return Result.EMPTY;
}
- fileStatuses = fileIndex.getFilesInPartitions();
+ FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading under path: " + path);
return Result.EMPTY;
}
- final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
fileStatuses, readPartitions, endInstant, null, false);
- return Result.instance(inputSplits, endInstant);
+ return Result.instance(inputSplits, endInstant, offsetToIssue);
} else {
- // streaming read
- if (cdcEnabled) {
- // case1: cdc change log enabled
- final String endInstant = instantToIssue.getTimestamp();
- List<MergeOnReadInputSplit> inputSplits =
getCdcInputSplits(metaClient, instantRange);
- return Result.instance(inputSplits, endInstant);
- }
- // case2: normal streaming read
- String tableName = conf.getString(FlinkOptions.TABLE_NAME);
- List<HoodieCommitMetadata> activeMetadataList = instants.stream()
- .map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline)).collect(Collectors.toList());
- List<HoodieCommitMetadata> archivedMetadataList =
getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
- if (archivedMetadataList.size() > 0) {
- LOG.warn("\n"
- +
"--------------------------------------------------------------------------------\n"
- + "---------- caution: the reader has fall behind too much from
the writer,\n"
- + "---------- tweak 'read.tasks' option to add parallelism of read
tasks.\n"
- +
"--------------------------------------------------------------------------------");
- }
- List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
- // IMPORTANT: the merged metadata list must be in ascending order by
instant time
- ? mergeList(archivedMetadataList, activeMetadataList)
- : activeMetadataList;
+ List<MergeOnReadInputSplit> inputSplits = getIncInputSplits(metaClient,
metaClient.getHadoopConf(), commitTimeline, instants, instantRange, endInstant,
cdcEnabled);
+ return Result.instance(mergeList(hollowSplits.getInputSplits(),
inputSplits), endInstant, offsetToIssue);
+ }
+ }
- readPartitions = getReadPartitions(metadataList);
- if (readPartitions.size() == 0) {
- LOG.warn("No partitions found for reading under path: " + path);
- return Result.EMPTY;
- }
- fileStatuses = WriteProfiles.getFilesFromMetadata(path, hadoopConf,
metadataList, metaClient.getTableType(), true);
+ /**
+ * Returns the input splits for streaming incremental read.
+ */
+ private List<MergeOnReadInputSplit> getIncInputSplits(
+ HoodieTableMetaClient metaClient,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ HoodieTimeline commitTimeline,
+ List<HoodieInstant> instants,
+ InstantRange instantRange,
+ String endInstant,
+ boolean cdcEnabled) {
+ // streaming read
+ if (cdcEnabled) {
+ // case1: cdc change log enabled
+ return getCdcInputSplits(metaClient, instantRange);
+ }
+ // case2: normal streaming read
+ String tableName = conf.getString(FlinkOptions.TABLE_NAME);
+ List<HoodieCommitMetadata> activeMetadataList = instants.stream()
+ .map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline)).collect(Collectors.toList());
+ List<HoodieCommitMetadata> archivedMetadataList =
getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
+ if (archivedMetadataList.size() > 0) {
+ LOG.warn("\n"
+ +
"--------------------------------------------------------------------------------\n"
+ + "---------- caution: the reader has fall behind too much from the
writer,\n"
+ + "---------- tweak 'read.tasks' option to add parallelism of read
tasks.\n"
+ +
"--------------------------------------------------------------------------------");
+ }
+ // IMPORTANT: the merged metadata list must be in ascending order by
instant time
+ List<HoodieCommitMetadata> metadataList = mergeList(archivedMetadataList,
activeMetadataList);
- if (fileStatuses.length == 0) {
- LOG.warn("No files found for reading under path: " + path);
- return Result.EMPTY;
- }
+ Set<String> readPartitions = getReadPartitions(metadataList);
+ if (readPartitions.size() == 0) {
+ LOG.warn("No partitions found for reading under path: " + path);
+ return Collections.emptyList();
+ }
+ FileStatus[] fileStatuses = WriteProfiles.getFilesFromMetadata(path,
hadoopConf, metadataList, metaClient.getTableType());
- final String endInstant = instantToIssue.getTimestamp();
- List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
- fileStatuses, readPartitions, endInstant, instantRange,
skipCompaction);
+ if (fileStatuses.length == 0) {
+ LOG.warn("No files found for reading under path: " + path);
+ return Collections.emptyList();
+ }
+
+ return getInputSplits(metaClient, commitTimeline,
+ fileStatuses, readPartitions, endInstant, instantRange,
skipCompaction);
+ }
- return Result.instance(inputSplits, endInstant);
+ /**
+ * Returns the input splits for 'hollow' instants.
+ */
+ private Result getHollowInputSplits(
+ HoodieTableMetaClient metaClient,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ @Nullable String issuedInstant,
+ @Nullable String issuedOffset,
+ HoodieTimeline commitTimeline,
+ boolean cdcEnabled) {
+ if (issuedInstant == null || issuedOffset == null) {
+ return Result.EMPTY;
+ }
+ // find the write commit instant that finishes later than the issued
instant
+ // while with smaller txn start time.
+ List<HoodieInstant> instants = commitTimeline.getInstantsAsStream()
+ .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, issuedInstant))
+ .filter(s ->
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN,
issuedOffset))
+ .filter(s -> StreamerUtil.isWriteCommit(metaClient.getTableType(), s,
commitTimeline)).collect(Collectors.toList());
+ if (instants.isEmpty()) {
+ return Result.EMPTY;
}
+ String offsetToIssue =
instants.stream().map(HoodieInstant::getStateTransitionTime).max(String::compareTo).orElse(issuedOffset);
+ List<MergeOnReadInputSplit> inputSplits = instants.stream().map(instant ->
{
+ String instantTs = instant.getTimestamp();
+
+ // Assumes we consume from timeline:
+ // c0, c1.inflight, c2(issued instant), c3, c4
+ // -> c0, c1, c2(issued instant), c3, c4
+ // c1, c3 and c4 are the candidate instants,
+
+ // c4 data file could include overlapping records from c2,
+ // use (c2, c4] instant range for c3 and c4,
+
+ // c1 data file could include overlapping records from c0,
+ // use the [c1, c1] instant range for c1.
+ InstantRange instantRange = InstantRange.builder()
+ .startInstant(instantTs)
+ .endInstant(instantTs)
+ .nullableBoundary(cdcEnabled)
+ .rangeType(InstantRange.RangeType.CLOSE_CLOSE).build();
+ return getIncInputSplits(metaClient, hadoopConf, commitTimeline,
Collections.singletonList(instant), instantRange, instantTs, cdcEnabled);
+ }).flatMap(Collection::stream).collect(Collectors.toList());
+ return Result.instance(inputSplits, issuedInstant, offsetToIssue);
}
@Nullable
@@ -491,12 +558,13 @@ public class IncrementalInputSplits implements
Serializable {
*
* @param commitTimeline The completed commits timeline
* @param issuedInstant The last issued instant that has already been
delivered to downstream
+ *
* @return the filtered hoodie instants
*/
@VisibleForTesting
public List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline commitTimeline,
- final String issuedInstant) {
+ @Nullable final String issuedInstant) {
HoodieTimeline completedTimeline =
commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
@@ -546,6 +614,12 @@ public class IncrementalInputSplits implements
Serializable {
}
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
+ if (list1.isEmpty()) {
+ return list2;
+ }
+ if (list2.isEmpty()) {
+ return list1;
+ }
List<T> merged = new ArrayList<>(list1);
merged.addAll(list2);
return merged;
@@ -561,6 +635,7 @@ public class IncrementalInputSplits implements Serializable
{
public static class Result {
private final List<MergeOnReadInputSplit> inputSplits; // input splits
private final String endInstant; // end instant to consume to
+ private final String offset; // monotonic increasing consumption offset
public static final Result EMPTY = instance(Collections.emptyList(), "");
@@ -576,13 +651,23 @@ public class IncrementalInputSplits implements
Serializable {
return this.endInstant;
}
- private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant)
{
+ @Nullable
+ public String getOffset() {
+ return offset;
+ }
+
+ private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant,
@Nullable String offset) {
this.inputSplits = inputSplits;
this.endInstant = endInstant;
+ this.offset = offset;
}
public static Result instance(List<MergeOnReadInputSplit> inputSplits,
String endInstant) {
- return new Result(inputSplits, endInstant);
+ return new Result(inputSplits, endInstant, null);
+ }
+
+ public static Result instance(List<MergeOnReadInputSplit> inputSplits,
String endInstant, String offset) {
+ return new Result(inputSplits, endInstant, offset);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 5bd7abeef4d..4d325b3a649 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -92,12 +92,12 @@ public class StreamReadMonitoringFunction
private String issuedInstant;
+ private String issuedOffset;
+
private transient ListState<String> instantState;
private final Configuration conf;
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
private HoodieTableMetaClient metaClient;
private final IncrementalInputSplits incrementalInputSplits;
@@ -145,7 +145,7 @@ public class StreamReadMonitoringFunction
retrievedStates.add(entry);
}
- ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+ ValidationUtils.checkArgument(retrievedStates.size() <= 2,
getClass().getSimpleName() + " retrieved invalid state.");
if (retrievedStates.size() == 1 && issuedInstant != null) {
@@ -156,21 +156,23 @@ public class StreamReadMonitoringFunction
"The " + getClass().getSimpleName() + " has already restored from
a previous Flink version.");
} else if (retrievedStates.size() == 1) {
+ // for forward compatibility
this.issuedInstant = retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
- LOG.debug("{} retrieved a issued instant of time {} for table {}
with path {}.",
+ LOG.debug("{} retrieved an issued instant of time {} for table {}
with path {}.",
getClass().getSimpleName(), issuedInstant,
conf.get(FlinkOptions.TABLE_NAME), path);
}
+ } else if (retrievedStates.size() == 2) {
+ this.issuedInstant = retrievedStates.get(0);
+ this.issuedOffset = retrievedStates.get(1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} retrieved an issued instant of time [{}, {}] for table
{} with path {}.",
+ getClass().getSimpleName(), issuedInstant, issuedOffset,
conf.get(FlinkOptions.TABLE_NAME), path);
+ }
}
}
}
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
- }
-
@Override
public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context)
throws Exception {
checkpointLock = context.getCheckpointLock();
@@ -187,6 +189,7 @@ public class StreamReadMonitoringFunction
if (this.metaClient != null) {
return this.metaClient;
}
+ org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
this.metaClient = StreamerUtil.createMetaClient(this.path.toString(),
hadoopConf);
return this.metaClient;
@@ -203,7 +206,7 @@ public class StreamReadMonitoringFunction
return;
}
IncrementalInputSplits.Result result =
- incrementalInputSplits.inputSplits(metaClient, this.hadoopConf,
this.issuedInstant, this.cdcEnabled);
+ incrementalInputSplits.inputSplits(metaClient, this.issuedInstant,
this.issuedOffset, this.cdcEnabled);
if (result.isEmpty()) {
// no new instants, returns early
return;
@@ -214,6 +217,7 @@ public class StreamReadMonitoringFunction
}
// update the issues instant time
this.issuedInstant = result.getEndInstant();
+ this.issuedOffset = result.getOffset();
LOG.info("\n"
+ "------------------------------------------------------------\n"
+ "---------- consumed to instant: {}\n"
@@ -259,5 +263,8 @@ public class StreamReadMonitoringFunction
if (this.issuedInstant != null) {
this.instantState.add(this.issuedInstant);
}
+ if (this.issuedOffset != null) {
+ this.instantState.add(this.issuedOffset);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index a9c72754b7a..4136c67cb28 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -398,7 +398,7 @@ public class HoodieTableSource implements
.partitionPruner(partitionPruner)
.build();
final boolean cdcEnabled =
this.conf.getBoolean(FlinkOptions.CDC_ENABLED);
- final IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, cdcEnabled);
+ final IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, cdcEnabled);
if (result.isEmpty()) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for incremental read, returns
empty collection instead");
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 61643e68214..071646de61a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -22,6 +22,7 @@ import
org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -448,6 +449,19 @@ public class StreamerUtil {
}
}
+ /**
+ * Returns whether the given instant is a data writing commit.
+ *
+ * @param tableType The table type
+ * @param instant The instant
+ * @param timeline The timeline
+ */
+ public static boolean isWriteCommit(HoodieTableType tableType, HoodieInstant
instant, HoodieTimeline timeline) {
+ return tableType == HoodieTableType.MERGE_ON_READ
+ ? !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) // not a
compaction
+ : !ClusteringUtil.isClusteringInstant(instant, timeline); // not a
clustering
+ }
+
/**
* Returns the auxiliary path.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index a0cdb588ae4..609e099f997 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -62,7 +62,7 @@ import static
org.junit.jupiter.api.Assertions.assertIterableEquals;
public class TestIncrementalInputSplits extends HoodieCommonTestHarness {
@BeforeEach
- private void init() throws IOException {
+ void init() throws IOException {
initPath();
initMetaClient();
}
@@ -92,7 +92,7 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
assertEquals(2, instantRange2.size());
assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2);
- // simulate first iteration cycle with read from LATEST commit
+ // simulate first iteration cycle with read from the LATEST commit
List<HoodieInstant> instantRange1 = iis.filterInstantsWithRange(timeline,
null);
assertEquals(1, instantRange1.size());
assertIterableEquals(Collections.singletonList(commit3), instantRange1);
@@ -161,7 +161,7 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.path(new Path(basePath))
.rowType(TestConfigurations.ROW_TYPE)
.build();
- IncrementalInputSplits.Result result = iis.inputSplits(metaClient,
metaClient.getHadoopConf(), null, false);
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
null, false);
List<String> partitions = getFilteredPartitions(result);
assertEquals(Arrays.asList("par1", "par2", "par3", "par4", "par5",
"par6"), partitions);
}
@@ -186,11 +186,15 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.rowType(TestConfigurations.ROW_TYPE)
.partitionPruner(partitionPruner)
.build();
- IncrementalInputSplits.Result result = iis.inputSplits(metaClient,
metaClient.getHadoopConf(), null, false);
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
null, false);
List<String> partitions = getFilteredPartitions(result);
assertEquals(expectedPartitions, partitions);
}
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
private static Stream<Arguments> partitionEvaluators() {
FieldReferenceExpression partitionFieldRef = new
FieldReferenceExpression("partition", DataTypes.STRING(), 0, 0);
// `partition` != 'par3'
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 616edc37f1c..3449f7a950f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -18,7 +18,12 @@
package org.apache.hudi.source;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.InstantRange;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -31,6 +36,7 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -39,6 +45,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -199,6 +206,96 @@ public class TestStreamReadMonitoringFunction {
}
}
+ @Test
+ public void testConsumingHollowInstants() throws Exception {
+ // write 4 commits
+ conf.setString("hoodie.parquet.small.file.limit", "0"); // invalidate the
small file strategy
+ for (int i = 0; i < 8; i += 2) {
+ List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+ TestData.writeData(dataset, conf);
+ }
+
+ // we got 4 commits on the timeline: c1, c2, c3, c4
+ // re-create the metadata file for c2 and c3 so that they have greater
completion time than c4.
+ // the completion time sequence become: c1, c4, c2, c3,
+ // we will test with the same consumption sequence.
+ HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(),
HadoopConfigurations.getHadoopConf(conf));
+ List<HoodieInstant> oriInstants =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
+ assertThat(oriInstants.size(), is(4));
+ List<HoodieCommitMetadata> metadataList = new ArrayList<>();
+ // timeline: c1, c2.inflight, c3.inflight, c4
+ for (int i = 1; i <= 2; i++) {
+ HoodieInstant instant = oriInstants.get(i);
+ metadataList.add(TestUtils.deleteInstantFile(metaClient, instant));
+ }
+
+ List<HoodieInstant> instants =
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
+ assertThat(instants.size(), is(2));
+
+ String c2 = oriInstants.get(1).getTimestamp();
+ String c3 = oriInstants.get(2).getTimestamp();
+ String c4 = instants.get(1).getTimestamp();
+
+ conf.setString(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ // timeline: c1, c2.inflight, c3.inflight, c4
+ // -> c1
+ CountDownLatch latch = new CountDownLatch(2);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+
+ runAsync(sourceContext, function);
+
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
+ sourceContext.getPartitionPaths(), is("par1"));
+ assertTrue(sourceContext.splits.stream().noneMatch(split ->
split.getInstantRange().isPresent()),
+ "No instants should have range limit");
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(c4)),
+ "All the splits should be with specified instant time");
+
+ // reset the source context
+ latch = new CountDownLatch(1);
+ sourceContext.reset(latch);
+
+ // timeline: c1, c2, c3.inflight, c4
+ // c4 -> c2
+ TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(1),
metadataList.get(0)); // complete c2
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
+ sourceContext.getPartitionPaths(), is("par1"));
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
isPointInstantRange(split.getInstantRange().get(), c2)),
+ "All the splits should have point instant range");
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(c2)),
+ "All the splits should be with specified instant time");
+
+ // reset the source context
+ latch = new CountDownLatch(1);
+ sourceContext.reset(latch);
+
+ // timeline: c1, c2, c3, c4
+ // c4 -> c3
+ TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(2),
metadataList.get(1)); // complete c3
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should
finish splits generation");
+ assertThat("Should produce the expected splits",
+ sourceContext.getPartitionPaths(), is("par1"));
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getInstantRange().isPresent()),
+ "All instants should have range limit");
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
isPointInstantRange(split.getInstantRange().get(), c3)),
+ "All the splits should have point instant range");
+ assertTrue(sourceContext.splits.stream().allMatch(split ->
split.getLatestCommit().equals(c3)),
+ "All the splits should be with specified instant time");
+
+ // Stop the stream task.
+ function.close();
+ }
+ }
+
@Test
public void testCheckpointRestore() throws Exception {
TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -317,6 +414,12 @@ public class TestStreamReadMonitoringFunction {
}
}
+ private static boolean isPointInstantRange(InstantRange instantRange, String
timestamp) {
+ return instantRange != null
+ && Objects.equals(timestamp, instantRange.getStartInstant())
+ && Objects.equals(timestamp, instantRange.getEndInstant());
+ }
+
private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit>
createHarness(
StreamReadMonitoringFunction function) throws Exception {
StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction>
streamSource = new StreamSource<>(function);
@@ -387,6 +490,7 @@ public class TestStreamReadMonitoringFunction {
public String getPartitionPaths() {
return this.splits.stream()
.map(TestUtils::getSplitPartitionPath)
+ .distinct()
.sorted(Comparator.naturalOrder())
.collect(Collectors.joining(","));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 56730368f6f..d1b5516d1ad 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
@@ -72,6 +73,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for MergeOnReadInputFormat and ParquetInputFormat.
@@ -431,7 +433,7 @@ public class TestInputFormat {
// default read the latest commit
// the compaction base files are skipped
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -447,7 +449,7 @@ public class TestInputFormat {
String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0,
HoodieTimeline.COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
@@ -463,7 +465,7 @@ public class TestInputFormat {
inputFormat = this.tableSource.getInputFormat(true);
// filter out the last commit by partition pruning
- IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual3 = TestData.rowDataToString(result3);
@@ -475,8 +477,6 @@ public class TestInputFormat {
void testReadSkipClustering() throws Exception {
beforeEach(HoodieTableType.COPY_ON_WRITE);
- org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
-
// write base first with clustering
conf.setString(FlinkOptions.OPERATION, "insert");
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
@@ -498,7 +498,7 @@ public class TestInputFormat {
// default read the latest commit
// the clustering files are skipped
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -514,7 +514,7 @@ public class TestInputFormat {
String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0,
HoodieTimeline.REPLACE_COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
@@ -531,7 +531,7 @@ public class TestInputFormat {
inputFormat = this.tableSource.getInputFormat(true);
// filter out the last commit by partition pruning
- IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual3 = TestData.rowDataToString(result3);
@@ -539,12 +539,103 @@ public class TestInputFormat {
assertThat(actual3, is(expected3));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testReadHollowInstants(HoodieTableType tableType) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("hoodie.parquet.small.file.limit", "0"); // invalidate the
small file strategy
+ beforeEach(tableType, options);
+
+ // write 4 commits
+ for (int i = 0; i < 8; i += 2) {
+ List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+ TestData.writeData(dataset, conf);
+ }
+
+ // we got 4 commits on the timeline: c1, c2, c3, c4
+ // re-create the metadata file for c2 and c3 so that they have greater
completion time than c4.
+ // the completion time sequence become: c1, c4, c2, c3,
+ // we will test with the same consumption sequence.
+ HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tempFile.getAbsolutePath(),
HadoopConfigurations.getHadoopConf(conf));
+ List<HoodieInstant> oriInstants =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants();
+ assertThat(oriInstants.size(), is(4));
+ List<HoodieCommitMetadata> metadataList = new ArrayList<>();
+ // timeline: c1, c2.inflight, c3.inflight, c4
+ for (int i = 1; i <= 2; i++) {
+ HoodieInstant instant = oriInstants.get(i);
+ metadataList.add(TestUtils.deleteInstantFile(metaClient, instant));
+ }
+
+ List<HoodieInstant> instants =
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants();
+ assertThat(instants.size(), is(2));
+
+ String c4 = instants.get(1).getTimestamp();
+
+ InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ IncrementalInputSplits incrementalInputSplits =
IncrementalInputSplits.builder()
+ .rowType(TestConfigurations.ROW_TYPE)
+ .conf(conf)
+ .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+ .build();
+
+ // timeline: c1, c2.inflight, c3.inflight, c4
+ // default read the latest commit
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ assertFalse(splits1.isEmpty());
+ List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ TestData.assertRowDataEquals(result1, TestData.dataSetInsert(7, 8));
+
+ // timeline: c1, c2.inflight, c3.inflight, c4
+ // -> c1
+ conf.setString(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
+ assertFalse(splits2.isEmpty());
+ List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ TestData.assertRowDataEquals(result2, TestData.dataSetInsert(1, 2, 7, 8));
+
+ // timeline: c1, c2, c3.inflight, c4
+ // c4 -> c2
+ TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(1),
metadataList.get(0)); // complete c2
+ assertThat(splits2.getEndInstant(), is(c4));
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, splits2.getEndInstant(),
splits2.getOffset(), false);
+ assertFalse(splits3.isEmpty());
+ List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ TestData.assertRowDataEquals(result3, TestData.dataSetInsert(3, 4));
+
+ // test c2 and c4, c2 completion time > c1, so it is not a hollow instant
+ IncrementalInputSplits.Result splits4 =
incrementalInputSplits.inputSplits(metaClient,
oriInstants.get(0).getTimestamp(), oriInstants.get(0).getStateTransitionTime(),
false);
+ assertFalse(splits4.isEmpty());
+ List<RowData> result4 = readData(inputFormat,
splits4.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ TestData.assertRowDataEquals(result4, TestData.dataSetInsert(3, 4, 7, 8));
+
+ // timeline: c1, c2, c3, c4
+ // c4 -> c3
+ TestUtils.saveInstantAsComplete(metaClient, oriInstants.get(2),
metadataList.get(1)); // complete c3
+ assertThat(splits3.getEndInstant(), is(c4));
+ IncrementalInputSplits.Result splits5 =
incrementalInputSplits.inputSplits(metaClient, splits3.getEndInstant(),
splits3.getOffset(), false);
+ assertFalse(splits5.isEmpty());
+ List<RowData> result5 = readData(inputFormat,
splits5.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ TestData.assertRowDataEquals(result5, TestData.dataSetInsert(5, 6));
+
+ // c4 ->
+ assertThat(splits5.getEndInstant(), is(c4));
+ IncrementalInputSplits.Result splits6 =
incrementalInputSplits.inputSplits(metaClient, splits5.getEndInstant(),
splits5.getOffset(), false);
+ assertTrue(splits6.isEmpty());
+
+ // test c2 and c4, c2 is recognized as a hollow instant
+ // the (version_number, completion_time) pair is not consistent, just for
test purpose
+ IncrementalInputSplits.Result splits7 =
incrementalInputSplits.inputSplits(metaClient,
oriInstants.get(2).getTimestamp(), oriInstants.get(3).getStateTransitionTime(),
false);
+ assertFalse(splits7.isEmpty());
+ List<RowData> result7 = readData(inputFormat,
splits7.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ TestData.assertRowDataEquals(result7, TestData.dataSetInsert(3, 4, 7, 8));
+ }
+
@Test
void testReadBaseFilesWithStartCommit() throws Exception {
beforeEach(HoodieTableType.COPY_ON_WRITE);
- org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
-
// write base files
TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -560,7 +651,7 @@ public class TestInputFormat {
.build();
// default read the latest commit
- IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
@@ -575,7 +666,7 @@ public class TestInputFormat {
String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 1,
HoodieTimeline.COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
- IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, null, null, false);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 46af5fa7a82..1e951dc3cb0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -18,9 +18,14 @@
package org.apache.hudi.utils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.StreamReadMonitoringFunction;
@@ -32,6 +37,8 @@ import org.apache.flink.core.fs.Path;
import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -117,4 +124,15 @@ public class TestUtils {
.countInstants();
}
+ public static HoodieCommitMetadata deleteInstantFile(HoodieTableMetaClient
metaClient, HoodieInstant instant) throws Exception {
+ ValidationUtils.checkArgument(instant.isCompleted());
+ HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(instant,
metaClient.getActiveTimeline());
+ HoodieActiveTimeline.deleteInstantFile(metaClient.getFs(),
metaClient.getMetaPath(), instant);
+ return metadata;
+ }
+
+ public static void saveInstantAsComplete(HoodieTableMetaClient metaClient,
HoodieInstant instant, HoodieCommitMetadata metadata) throws Exception {
+ metaClient.getActiveTimeline().saveAsComplete(new HoodieInstant(true,
instant.getAction(), instant.getTimestamp()),
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
}