This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 431ade06fc [HUDI-5234] Streaming read skip clustering (#7296)
431ade06fc is described below
commit 431ade06fc2753fb5f691caa043dd2bfa4fcccb3
Author: Danny Chan <[email protected]>
AuthorDate: Fri Nov 25 08:33:08 2022 +0800
[HUDI-5234] Streaming read skip clustering (#7296)
Co-authored-by: zhuanshenbsj1 <[email protected]>
---
.../apache/hudi/configuration/FlinkOptions.java | 8 +++
.../apache/hudi/source/IncrementalInputSplits.java | 65 ++++++++++++++-----
.../hudi/source/StreamReadMonitoringFunction.java | 1 +
.../java/org/apache/hudi/util/ClusteringUtil.java | 18 ++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 31 +++++++++-
.../apache/hudi/table/format/TestInputFormat.java | 72 +++++++++++++++++++++-
.../test/java/org/apache/hudi/utils/TestData.java | 10 ++-
.../test/java/org/apache/hudi/utils/TestUtils.java | 4 +-
8 files changed, 187 insertions(+), 22 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 55abeaaa56..8706c5960a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -280,6 +280,14 @@ public class FlinkOptions extends HoodieConfig {
+ "usually with delta time compaction strategy that is long enough,
for e.g, one week;\n"
+ "2) changelog mode is enabled, this option is a solution to keep
data integrity");
+ // this option is experimental
+ public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING =
ConfigOptions
+ .key("read.streaming.skip_clustering")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to skip clustering instants for streaming
read,\n"
+ + "to avoid reading duplicates");
+
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
.key("read.start-commit")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 09f7054cd7..bb826ce79b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -39,6 +39,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
@@ -95,6 +96,8 @@ public class IncrementalInputSplits implements Serializable {
private final Set<String> requiredPartitions;
// skip compaction
private final boolean skipCompaction;
+ // skip clustering
+ private final boolean skipClustering;
private IncrementalInputSplits(
Configuration conf,
@@ -102,13 +105,15 @@ public class IncrementalInputSplits implements
Serializable {
RowType rowType,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitions,
- boolean skipCompaction) {
+ boolean skipCompaction,
+ boolean skipClustering) {
this.conf = conf;
this.path = path;
this.rowType = rowType;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitions = requiredPartitions;
this.skipCompaction = skipCompaction;
+ this.skipClustering = skipClustering;
}
/**
@@ -128,7 +133,7 @@ public class IncrementalInputSplits implements Serializable
{
public Result inputSplits(
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf) {
- HoodieTimeline commitTimeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ HoodieTimeline commitTimeline = getReadTimeline(metaClient);
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return Result.EMPTY;
@@ -240,7 +245,7 @@ public class IncrementalInputSplits implements Serializable
{
String issuedInstant,
boolean cdcEnabled) {
metaClient.reloadActiveTimeline();
- HoodieTimeline commitTimeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ HoodieTimeline commitTimeline = getReadTimeline(metaClient);
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return Result.EMPTY;
@@ -442,17 +447,26 @@ public class IncrementalInputSplits implements
Serializable {
String tableName) {
if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant()))
{
// read the archived metadata if the start instant is archived.
- HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(instantRange.getStartInstant());
- HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
- if (!archivedCompleteTimeline.empty()) {
- Stream<HoodieInstant> instantStream =
archivedCompleteTimeline.getInstants();
- return maySkipCompaction(instantStream)
+ HoodieTimeline archivedTimeline = getArchivedReadTimeline(metaClient,
instantRange.getStartInstant());
+ if (!archivedTimeline.empty()) {
+ return archivedTimeline.getInstants()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, archivedTimeline)).collect(Collectors.toList());
}
}
return Collections.emptyList();
}
+ private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
+ HoodieTimeline timeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+ return filterInstantsByCondition(timeline);
+ }
+
+ private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient
metaClient, String startInstant) {
+ HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant);
+ HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+ return filterInstantsByCondition(archivedCompleteTimeline);
+ }
+
/**
* Returns the instants with a given issuedInstant to start from.
*
@@ -466,7 +480,8 @@ public class IncrementalInputSplits implements Serializable
{
HoodieTimeline completedTimeline =
commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
- return maySkipCompaction(completedTimeline.getInstants())
+ return commitTimeline
+ .getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
}
@@ -482,13 +497,26 @@ public class IncrementalInputSplits implements
Serializable {
final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
instantStream = instantStream.filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS,
endCommit));
}
- return maySkipCompaction(instantStream).collect(Collectors.toList());
+ return instantStream.collect(Collectors.toList());
}
- private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant>
instants) {
- return this.skipCompaction
- ? instants.filter(instant ->
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
- : instants;
+ /**
+ * Filters out the unnecessary instants by user specified condition.
+ *
+ * @param timeline The timeline
+ *
+ * @return the filtered timeline
+ */
+ private HoodieTimeline filterInstantsByCondition(HoodieTimeline timeline) {
+ final HoodieTimeline oriTimeline = timeline;
+ if (this.skipCompaction) {
+ // the compaction commit uses 'commit' as action which is tricky
+ timeline = timeline.filter(instant ->
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ }
+ if (this.skipClustering) {
+ timeline = timeline.filter(instant ->
!ClusteringUtil.isClusteringInstant(instant, oriTimeline));
+ }
+ return timeline;
}
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
@@ -544,6 +572,8 @@ public class IncrementalInputSplits implements Serializable
{
private Set<String> requiredPartitions;
// skip compaction
private boolean skipCompaction = false;
+ // skip clustering
+ private boolean skipClustering = true;
public Builder() {
}
@@ -578,10 +608,15 @@ public class IncrementalInputSplits implements
Serializable {
return this;
}
+ public Builder skipClustering(boolean skipClustering) {
+ this.skipClustering = skipClustering;
+ return this;
+ }
+
public IncrementalInputSplits build() {
return new IncrementalInputSplits(
Objects.requireNonNull(this.conf),
Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
- this.maxCompactionMemoryInBytes, this.requiredPartitions,
this.skipCompaction);
+ this.maxCompactionMemoryInBytes, this.requiredPartitions,
this.skipCompaction, this.skipClustering);
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 2ad312241e..c8757bf10f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -119,6 +119,7 @@ public class StreamReadMonitoringFunction
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(requiredPartitionPaths)
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
+
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
.build();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index cb1e54b44d..34352ec80b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -19,17 +19,21 @@
package org.apache.hudi.util;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -94,4 +98,18 @@ public class ClusteringUtil {
commitToRollback ->
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback,
false));
}
}
+
+ /**
+ * Returns whether the given instant {@code instant} is with clustering
operation.
+ */
+ public static boolean isClusteringInstant(HoodieInstant instant,
HoodieTimeline timeline) {
+ if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ return false;
+ }
+ try {
+ return TimelineUtils.getCommitMetadata(instant,
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
+ } catch (IOException e) {
+ throw new HoodieException("Resolve replace commit metadata error for
instant: " + instant, e);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 0b383870d6..88072e6b8d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -316,7 +316,7 @@ public class ITTestHoodieDataSource {
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
- String instant =
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);
+ String instant =
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2,
HoodieTimeline.DELTA_COMMIT_ACTION);
streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
@@ -325,6 +325,35 @@ public class ITTestHoodieDataSource {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}
+ @Test
+ void testAppendWriteReadSkippingClustering() throws Exception {
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source", 4);
+ streamTableEnv.executeSql(createSource);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.OPERATION, "insert")
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+ .option(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true)
+ .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
+ .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
+ .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
+ .option(FlinkOptions.CLUSTERING_TASKS, 1)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ String instant =
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2,
HoodieTimeline.COMMIT_ACTION);
+
+ streamTableEnv.getConfig().getConfiguration()
+ .setBoolean("table.dynamic-table-options.enabled", true);
+ final String query = String.format("select * from t1/*+
options('read.start-commit'='%s')*/", instant);
+ List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+ }
+
@Test
void testStreamWriteWithCleaning() {
// create filesystem table named source
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index d8f292e9f5..4480465075 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.IncrementalInputSplits;
@@ -439,7 +440,7 @@ public class TestInputFormat {
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
// read from the compaction commit
- String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false);
+ String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0,
HoodieTimeline.COMMIT_ACTION);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
@@ -457,6 +458,75 @@ public class TestInputFormat {
this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat(true);
+ // filter out the last commit by partition pruning
+ IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits3.isEmpty());
+ List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ String actual3 = TestData.rowDataToString(result3);
+ String expected3 =
TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+ assertThat(actual3, is(expected3));
+ }
+
+ @Test
+ void testReadSkipClustering() throws Exception {
+ beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+ org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
+
+ // write base first with clustering
+ conf.setString(FlinkOptions.OPERATION, "insert");
+ conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+ conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+ conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ InputFormat<RowData, ?> inputFormat =
this.tableSource.getInputFormat(true);
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ IncrementalInputSplits incrementalInputSplits =
IncrementalInputSplits.builder()
+ .rowType(TestConfigurations.ROW_TYPE)
+ .conf(conf)
+ .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+ .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2",
"par3", "par4")))
+ .skipClustering(true)
+ .build();
+
+ // default read the latest commit
+ // the clustering files are skipped
+ IncrementalInputSplits.Result splits1 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits1.isEmpty());
+ List<RowData> result1 = readData(inputFormat,
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+ String actual1 = TestData.rowDataToString(result1);
+ String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
+ assertThat(actual1, is(expected1));
+
+ // write another commit and read again
+ conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+ // read from the clustering commit
+ String secondCommit =
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
+
+ IncrementalInputSplits.Result splits2 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+ assertFalse(splits2.isEmpty());
+ List<RowData> result2 = readData(inputFormat,
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+ String actual2 = TestData.rowDataToString(result2);
+ String expected2 =
TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+ assertThat(actual2, is(expected2));
+
+ // write another commit with separate partition
+ // so the file group has only base files
+ conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+ TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+
+ // refresh the input format
+ this.tableSource.reset();
+ inputFormat = this.tableSource.getInputFormat(true);
+
+ // filter out the last commit by partition pruning
IncrementalInputSplits.Result splits3 =
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat,
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 0461868bb1..ed5b178bb7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,7 +34,10 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.utils.InsertFunctionWrapper;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.avro.Schema;
@@ -410,9 +413,10 @@ public class TestData {
public static void writeData(
List<RowData> dataBuffer,
Configuration conf) throws Exception {
- StreamWriteFunctionWrapper<RowData> funcWrapper = new
StreamWriteFunctionWrapper<>(
- conf.getString(FlinkOptions.PATH),
- conf);
+ TestFunctionWrapper<RowData> funcWrapper =
+ OptionsResolver.isInsertOperation(conf)
+ ? new InsertFunctionWrapper<>(conf.getString(FlinkOptions.PATH),
conf)
+ : new
StreamWriteFunctionWrapper<>(conf.getString(FlinkOptions.PATH), conf);
funcWrapper.openFunction();
for (RowData rowData : dataBuffer) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 4b3d87e387..bd437a9070 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -78,12 +78,12 @@ public class TestUtils {
}
@Nullable
- public static String getNthCompleteInstant(String basePath, int n, boolean
isDelta) {
+ public static String getNthCompleteInstant(String basePath, int n, String
action) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopConfigurations.getHadoopConf(new
Configuration())).setBasePath(basePath).build();
return metaClient.getActiveTimeline()
.filterCompletedInstants()
- .filter(instant -> isDelta ?
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) :
HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
+ .filter(instant -> action.equals(instant.getAction()))
.nthInstant(n).map(HoodieInstant::getTimestamp)
.orElse(null);
}