This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 431ade06fc [HUDI-5234] Streaming read skip clustering (#7296)
431ade06fc is described below

commit 431ade06fc2753fb5f691caa043dd2bfa4fcccb3
Author: Danny Chan <[email protected]>
AuthorDate: Fri Nov 25 08:33:08 2022 +0800

    [HUDI-5234] Streaming read skip clustering (#7296)
    
    Co-authored-by: zhuanshenbsj1 <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    |  8 +++
 .../apache/hudi/source/IncrementalInputSplits.java | 65 ++++++++++++++-----
 .../hudi/source/StreamReadMonitoringFunction.java  |  1 +
 .../java/org/apache/hudi/util/ClusteringUtil.java  | 18 ++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 31 +++++++++-
 .../apache/hudi/table/format/TestInputFormat.java  | 72 +++++++++++++++++++++-
 .../test/java/org/apache/hudi/utils/TestData.java  | 10 ++-
 .../test/java/org/apache/hudi/utils/TestUtils.java |  4 +-
 8 files changed, 187 insertions(+), 22 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 55abeaaa56..8706c5960a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -280,6 +280,14 @@ public class FlinkOptions extends HoodieConfig {
           + "usually with delta time compaction strategy that is long enough, 
for e.g, one week;\n"
           + "2) changelog mode is enabled, this option is a solution to keep 
data integrity");
 
+  // this option is experimental
+  public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = 
ConfigOptions
+          .key("read.streaming.skip_clustering")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether to skip clustering instants for streaming 
read,\n"
+              + "to avoid reading duplicates");
+
   public static final String START_COMMIT_EARLIEST = "earliest";
   public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
       .key("read.start-commit")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 09f7054cd7..bb826ce79b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -39,6 +39,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.table.format.cdc.CdcInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.Configuration;
@@ -95,6 +96,8 @@ public class IncrementalInputSplits implements Serializable {
   private final Set<String> requiredPartitions;
   // skip compaction
   private final boolean skipCompaction;
+  // skip clustering
+  private final boolean skipClustering;
 
   private IncrementalInputSplits(
       Configuration conf,
@@ -102,13 +105,15 @@ public class IncrementalInputSplits implements 
Serializable {
       RowType rowType,
       long maxCompactionMemoryInBytes,
       @Nullable Set<String> requiredPartitions,
-      boolean skipCompaction) {
+      boolean skipCompaction,
+      boolean skipClustering) {
     this.conf = conf;
     this.path = path;
     this.rowType = rowType;
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
     this.requiredPartitions = requiredPartitions;
     this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
   }
 
   /**
@@ -128,7 +133,7 @@ public class IncrementalInputSplits implements Serializable 
{
   public Result inputSplits(
       HoodieTableMetaClient metaClient,
       org.apache.hadoop.conf.Configuration hadoopConf) {
-    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    HoodieTimeline commitTimeline = getReadTimeline(metaClient);
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
       return Result.EMPTY;
@@ -240,7 +245,7 @@ public class IncrementalInputSplits implements Serializable 
{
       String issuedInstant,
       boolean cdcEnabled) {
     metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    HoodieTimeline commitTimeline = getReadTimeline(metaClient);
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
       return Result.EMPTY;
@@ -442,17 +447,26 @@ public class IncrementalInputSplits implements 
Serializable {
       String tableName) {
     if (commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) 
{
       // read the archived metadata if the start instant is archived.
-      HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(instantRange.getStartInstant());
-      HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
-      if (!archivedCompleteTimeline.empty()) {
-        Stream<HoodieInstant> instantStream = 
archivedCompleteTimeline.getInstants();
-        return maySkipCompaction(instantStream)
+      HoodieTimeline archivedTimeline = getArchivedReadTimeline(metaClient, 
instantRange.getStartInstant());
+      if (!archivedTimeline.empty()) {
+        return archivedTimeline.getInstants()
             .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, archivedTimeline)).collect(Collectors.toList());
       }
     }
     return Collections.emptyList();
   }
 
+  private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    return filterInstantsByCondition(timeline);
+  }
+
+  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant);
+    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+    return filterInstantsByCondition(archivedCompleteTimeline);
+  }
+
   /**
    * Returns the instants with a given issuedInstant to start from.
    *
@@ -466,7 +480,8 @@ public class IncrementalInputSplits implements Serializable 
{
     HoodieTimeline completedTimeline = 
commitTimeline.filterCompletedInstants();
     if (issuedInstant != null) {
       // returns early for streaming mode
-      return maySkipCompaction(completedTimeline.getInstants())
+      return commitTimeline
+          .getInstants()
           .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
           .collect(Collectors.toList());
     }
@@ -482,13 +497,26 @@ public class IncrementalInputSplits implements 
Serializable {
       final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
       instantStream = instantStream.filter(s -> 
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, 
endCommit));
     }
-    return maySkipCompaction(instantStream).collect(Collectors.toList());
+    return instantStream.collect(Collectors.toList());
   }
 
-  private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> 
instants) {
-    return this.skipCompaction
-        ? instants.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
-        : instants;
+  /**
+   * Filters out the unnecessary instants by user specified condition.
+   *
+   * @param timeline The timeline
+   *
+   * @return the filtered timeline
+   */
+  private HoodieTimeline filterInstantsByCondition(HoodieTimeline timeline) {
+    final HoodieTimeline oriTimeline = timeline;
+    if (this.skipCompaction) {
+      // the compaction commit uses 'commit' as action which is tricky
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    }
+    if (this.skipClustering) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtil.isClusteringInstant(instant, oriTimeline));
+    }
+    return timeline;
   }
 
   private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
@@ -544,6 +572,8 @@ public class IncrementalInputSplits implements Serializable 
{
     private Set<String> requiredPartitions;
     // skip compaction
     private boolean skipCompaction = false;
+    // skip clustering
+    private boolean skipClustering = true;
 
     public Builder() {
     }
@@ -578,10 +608,15 @@ public class IncrementalInputSplits implements 
Serializable {
       return this;
     }
 
+    public Builder skipClustering(boolean skipClustering) {
+      this.skipClustering = skipClustering;
+      return this;
+    }
+
     public IncrementalInputSplits build() {
       return new IncrementalInputSplits(
           Objects.requireNonNull(this.conf), 
Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
-          this.maxCompactionMemoryInBytes, this.requiredPartitions, 
this.skipCompaction);
+          this.maxCompactionMemoryInBytes, this.requiredPartitions, 
this.skipCompaction, this.skipClustering);
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 2ad312241e..c8757bf10f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -119,6 +119,7 @@ public class StreamReadMonitoringFunction
         .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
         .requiredPartitions(requiredPartitionPaths)
         
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
+        
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
         .build();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index cb1e54b44d..34352ec80b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -19,17 +19,21 @@
 package org.apache.hudi.util;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -94,4 +98,18 @@ public class ClusteringUtil {
           commitToRollback -> 
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, 
false));
     }
   }
+
+  /**
+   * Returns whether the given instant {@code instant} is with clustering 
operation.
+   */
+  public static boolean isClusteringInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
+    if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return false;
+    }
+    try {
+      return TimelineUtils.getCommitMetadata(instant, 
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
+    } catch (IOException e) {
+      throw new HoodieException("Resolve replace commit metadata error for 
instant: " + instant, e);
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 0b383870d6..88072e6b8d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -316,7 +316,7 @@ public class ITTestHoodieDataSource {
     String insertInto = "insert into t1 select * from source";
     execInsertSql(streamTableEnv, insertInto);
 
-    String instant = 
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);
+    String instant = 
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, 
HoodieTimeline.DELTA_COMMIT_ACTION);
 
     streamTableEnv.getConfig().getConfiguration()
         .setBoolean("table.dynamic-table-options.enabled", true);
@@ -325,6 +325,35 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
   }
 
+  @Test
+  void testAppendWriteReadSkippingClustering() throws Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source", 4);
+    streamTableEnv.executeSql(createSource);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.OPERATION, "insert")
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true)
+        .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED,true)
+        .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
+        .option(FlinkOptions.CLUSTERING_DELTA_COMMITS,1)
+        .option(FlinkOptions.CLUSTERING_TASKS, 1)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    String instant = 
TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, 
HoodieTimeline.COMMIT_ACTION);
+
+    streamTableEnv.getConfig().getConfiguration()
+            .setBoolean("table.dynamic-table-options.enabled", true);
+    final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/", instant);
+    List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
+  }
+
   @Test
   void testStreamWriteWithCleaning() {
     // create filesystem table named source
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index d8f292e9f5..4480465075 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.source.IncrementalInputSplits;
@@ -439,7 +440,7 @@ public class TestInputFormat {
     TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
 
     // read from the compaction commit
-    String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false);
+    String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, 
HoodieTimeline.COMMIT_ACTION);
     conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
 
     IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
@@ -457,6 +458,75 @@ public class TestInputFormat {
     this.tableSource.reset();
     inputFormat = this.tableSource.getInputFormat(true);
 
+    // filter out the last commit by partition pruning
+    IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits3.isEmpty());
+    List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    String actual3 = TestData.rowDataToString(result3);
+    String expected3 = 
TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+    assertThat(actual3, is(expected3));
+  }
+
+  @Test
+  void testReadSkipClustering() throws Exception {
+    beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+    org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
+
+    // write base first with clustering
+    conf.setString(FlinkOptions.OPERATION, "insert");
+    conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+    conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    InputFormat<RowData, ?> inputFormat = 
this.tableSource.getInputFormat(true);
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    IncrementalInputSplits incrementalInputSplits = 
IncrementalInputSplits.builder()
+        .rowType(TestConfigurations.ROW_TYPE)
+        .conf(conf)
+        .path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
+        .requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", 
"par3", "par4")))
+        .skipClustering(true)
+        .build();
+
+    // default read the latest commit
+    // the clustering files are skipped
+    IncrementalInputSplits.Result splits1 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits1.isEmpty());
+    List<RowData> result1 = readData(inputFormat, 
splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+
+    String actual1 = TestData.rowDataToString(result1);
+    String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
+    assertThat(actual1, is(expected1));
+
+    // write another commit and read again
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+
+    // read from the clustering commit
+    String secondCommit = 
TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+    conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);
+
+    IncrementalInputSplits.Result splits2 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
+    assertFalse(splits2.isEmpty());
+    List<RowData> result2 = readData(inputFormat, 
splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
+    String actual2 = TestData.rowDataToString(result2);
+    String expected2 = 
TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
+    assertThat(actual2, is(expected2));
+
+    // write another commit with separate partition
+    // so the file group has only base files
+    conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true);
+    TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
+
+    // refresh the input format
+    this.tableSource.reset();
+    inputFormat = this.tableSource.getInputFormat(true);
+
+    // filter out the last commit by partition pruning
     IncrementalInputSplits.Result splits3 = 
incrementalInputSplits.inputSplits(metaClient, hadoopConf, null, false);
     assertFalse(splits3.isEmpty());
     List<RowData> result3 = readData(inputFormat, 
splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 0461868bb1..ed5b178bb7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,7 +34,10 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.utils.InsertFunctionWrapper;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.TestFunctionWrapper;
 import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.avro.Schema;
@@ -410,9 +413,10 @@ public class TestData {
   public static void writeData(
       List<RowData> dataBuffer,
       Configuration conf) throws Exception {
-    StreamWriteFunctionWrapper<RowData> funcWrapper = new 
StreamWriteFunctionWrapper<>(
-        conf.getString(FlinkOptions.PATH),
-        conf);
+    TestFunctionWrapper<RowData> funcWrapper =
+        OptionsResolver.isInsertOperation(conf)
+            ? new InsertFunctionWrapper<>(conf.getString(FlinkOptions.PATH), 
conf)
+            : new 
StreamWriteFunctionWrapper<>(conf.getString(FlinkOptions.PATH), conf);
     funcWrapper.openFunction();
 
     for (RowData rowData : dataBuffer) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 4b3d87e387..bd437a9070 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -78,12 +78,12 @@ public class TestUtils {
   }
 
   @Nullable
-  public static String getNthCompleteInstant(String basePath, int n, boolean 
isDelta) {
+  public static String getNthCompleteInstant(String basePath, int n, String 
action) {
     final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
         .setConf(HadoopConfigurations.getHadoopConf(new 
Configuration())).setBasePath(basePath).build();
     return metaClient.getActiveTimeline()
         .filterCompletedInstants()
-        .filter(instant -> isDelta ? 
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : 
HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
+        .filter(instant -> action.equals(instant.getAction()))
         .nthInstant(n).map(HoodieInstant::getTimestamp)
         .orElse(null);
   }

Reply via email to