This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit dac93e22f0b64cc0e2e33024c4b37ade50841282
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jan 5 10:31:22 2023 +0800

    [HUDI-5477] Optimize timeline loading in Hudi sync client (#7561)
    
    Before this change, the Hudi archived timeline is always loaded during the 
metastore sync process if the last sync time is given. Besides, the archived 
timeline is not cached inside the meta client if the start instant time is 
given. These cause performance issues and read timeout on cloud storage due to 
rate limiting on requests because of loading archived timeline from the 
storage, when the archived timeline is huge, e.g., hundreds of log files in 
.hoodie/archived folder.
    
    This change improves the timeline loading by
    (1) only reading active timeline if the last sync time is the same as or 
after the start of the active timeline;
    (2) caching the archived timeline based on the start instant time in the 
meta client, to avoid unnecessary repeated loading of the same archived 
timeline.
    
    (cherry picked from commit ab61f61df9686793406300c0018924a119b02855)
---
 .../hudi/common/table/HoodieTableMetaClient.java   |  55 ++++++++---
 .../hudi/common/table/timeline/TimelineUtils.java  |  37 ++++++++
 .../hudi/common/table/TestTimelineUtils.java       | 101 +++++++++++++++++++--
 .../apache/hudi/configuration/FlinkOptions.java    |   8 ++
 .../apache/hudi/source/IncrementalInputSplits.java |  47 +++++++++-
 .../hudi/source/StreamReadMonitoringFunction.java  |   1 +
 .../java/org/apache/hudi/util/ClusteringUtil.java  |  19 ++++
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  12 +--
 8 files changed, 251 insertions(+), 29 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 39f27f4160f..990142f496c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -58,6 +58,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -97,6 +98,10 @@ public class HoodieTableMetaClient implements Serializable {
 
   public static final String MARKER_EXTN = ".marker";
 
+  // In-memory cache for archived timeline based on the start instant time
+  // Only one entry should be present in this map
+  private final Map<String, HoodieArchivedTimeline> archivedTimelineMap = new 
HashMap<>();
+
   // NOTE: Since those two parameters lay on the hot-path of a lot of 
computations, we
   //       use tailored extension of the {@code Path} class allowing to avoid 
repetitive
   //       computations secured by its immutability
@@ -110,7 +115,6 @@ public class HoodieTableMetaClient implements Serializable {
   private TimelineLayoutVersion timelineLayoutVersion;
   protected HoodieTableConfig tableConfig;
   protected HoodieActiveTimeline activeTimeline;
-  private HoodieArchivedTimeline archivedTimeline;
   private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
   private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
   protected HoodieMetastoreConfig metastoreConfig;
@@ -365,10 +369,7 @@ public class HoodieTableMetaClient implements Serializable 
{
    * @return Archived commit timeline
    */
   public synchronized HoodieArchivedTimeline getArchivedTimeline() {
-    if (archivedTimeline == null) {
-      archivedTimeline = new HoodieArchivedTimeline(this);
-    }
-    return archivedTimeline;
+    return getArchivedTimeline(StringUtils.EMPTY_STRING);
   }
 
   public HoodieMetastoreConfig getMetastoreConfig() {
@@ -379,21 +380,49 @@ public class HoodieTableMetaClient implements 
Serializable {
   }
 
   /**
-   * Returns fresh new archived commits as a timeline from startTs (inclusive).
-   *
-   * <p>This is costly operation if really early endTs is specified.
-   * Be caution to use this only when the time range is short.
-   *
-   * <p>This method is not thread safe.
+   * Returns the cached archived timeline from startTs (inclusive).
    *
-   * @return Archived commit timeline
+   * @param startTs The start instant time (inclusive) of the archived 
timeline.
+   * @return the archived timeline.
    */
   public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
-    return new HoodieArchivedTimeline(this, startTs);
+    return getArchivedTimeline(startTs, true);
+  }
+
+  /**
+   * Returns the cached archived timeline if using in-memory cache or a fresh 
new archived
+   * timeline if not using cache, from startTs (inclusive).
+   * <p>
+   * Instantiating an archived timeline is costly operation if really early 
startTs is
+   * specified.
+   * <p>
+   * This method is not thread safe.
+   *
+   * @param startTs  The start instant time (inclusive) of the archived 
timeline.
+   * @param useCache Whether to use in-memory cache.
+   * @return the archived timeline based on the arguments.
+   */
+  public HoodieArchivedTimeline getArchivedTimeline(String startTs, boolean 
useCache) {
+    if (useCache) {
+      if (!archivedTimelineMap.containsKey(startTs)) {
+        // Only keep one entry in the map
+        archivedTimelineMap.clear();
+        archivedTimelineMap.put(startTs, instantiateArchivedTimeline(startTs));
+      }
+      return archivedTimelineMap.get(startTs);
+    }
+    return instantiateArchivedTimeline(startTs);
+  }
+
+  private HoodieArchivedTimeline instantiateArchivedTimeline(String startTs) {
+    return StringUtils.isNullOrEmpty(startTs)
+        ? new HoodieArchivedTimeline(this)
+        : new HoodieArchivedTimeline(this, startTs);
   }
 
   /**
    * Validate table properties.
+   *
    * @param properties Properties from writeConfig.
    */
   public void validateTableProperties(Properties properties) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 1b8450eecca..5cc52295c49 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -209,4 +209,41 @@ public class TimelineUtils {
     }
     return activeTimeline;
   }
+
+  /**
+   * Returns a Hudi timeline with commits after the given instant time 
(exclusive).
+   *
+   * @param metaClient                {@link HoodieTableMetaClient} instance.
+   * @param exclusiveStartInstantTime Start instant time (exclusive).
+   * @return Hudi timeline.
+   */
+  public static HoodieTimeline getCommitsTimelineAfter(
+      HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieDefaultTimeline timeline =
+        activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+            ? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
+            .mergeTimeline(activeTimeline)
+            : activeTimeline;
+    return timeline.getCommitsTimeline()
+        .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Returns the commit metadata of the given instant.
+   *
+   * @param instant  The hoodie instant
+   * @param timeline The timeline
+   * @return the commit metadata
+   */
+  public static HoodieCommitMetadata getCommitMetadata(
+      HoodieInstant instant,
+      HoodieTimeline timeline) throws IOException {
+    byte[] data = timeline.getInstantDetails(instant).get();
+    if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return HoodieReplaceCommitMetadata.fromBytes(data, 
HoodieReplaceCommitMetadata.class);
+    } else {
+      return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index da078372b5c..0cb1036eddb 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -30,6 +30,8 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -49,10 +51,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestTimelineUtils extends HoodieCommonTestHarness {
 
@@ -109,7 +122,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     String olderPartition = "0"; // older partitions that is modified by all 
cleans
     for (int i = 1; i <= 5; i++) {
       String ts = i + "";
-      HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, ts);
+      HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
       activeTimeline.createNewInstant(instant);
       activeTimeline.saveAsComplete(instant, 
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
 
@@ -148,7 +161,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     String partitionPath = "";
     for (int i = 1; i <= 5; i++) {
       String ts = i + "";
-      HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, ts);
+      HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
       activeTimeline.createNewInstant(instant);
       activeTimeline.saveAsComplete(instant, 
Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, 
Collections.emptyMap())));
 
@@ -177,7 +190,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
       String ts = i + "";
       HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.RESTORE_ACTION, ts);
       activeTimeline.createNewInstant(instant);
-      activeTimeline.saveAsComplete(instant, 
Option.of(getRestoreMetadata(basePath, ts, ts, 2, 
HoodieTimeline.COMMIT_ACTION)));
+      activeTimeline.saveAsComplete(instant, 
Option.of(getRestoreMetadata(basePath, ts, ts, 2, COMMIT_ACTION)));
     }
 
     metaClient.reloadActiveTimeline();
@@ -200,12 +213,12 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, 
extraMetadataKey).isPresent());
 
     String ts = "0";
-    HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, ts);
+    HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
     activeTimeline.createNewInstant(instant);
     activeTimeline.saveAsComplete(instant, 
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
 
     ts = "1";
-    instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+    instant = new HoodieInstant(true, COMMIT_ACTION, ts);
     activeTimeline.createNewInstant(instant);
     Map<String, String> extraMetadata = new HashMap<>();
     extraMetadata.put(extraMetadataKey, extraMetadataValue1);
@@ -241,6 +254,82 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
   }
   
+
+  @Test
+  public void testGetCommitsTimelineAfter() throws IOException {
+    // Should only load active timeline
+    String startTs = "010";
+    HoodieTableMetaClient mockMetaClient = prepareMetaClient(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+        startTs
+    );
+    verifyTimeline(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+    verify(mockMetaClient, never()).getArchivedTimeline(any());
+
+    // Should load both archived and active timeline
+    startTs = "001";
+    mockMetaClient = prepareMetaClient(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+        startTs
+    );
+    verifyTimeline(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+    verify(mockMetaClient, times(1)).getArchivedTimeline(any());
+  }
+
+  private HoodieTableMetaClient prepareMetaClient(
+      List<HoodieInstant> activeInstants,
+      List<HoodieInstant> archivedInstants,
+      String startTs
+  ) throws IOException {
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    HoodieArchivedTimeline mockArchivedTimeline = 
mock(HoodieArchivedTimeline.class);
+    when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true)))
+        .thenReturn(activeInstants);
+    HoodieActiveTimeline activeTimeline = new 
HoodieActiveTimeline(mockMetaClient);
+    when(mockMetaClient.getActiveTimeline())
+        .thenReturn(activeTimeline);
+    when(mockMetaClient.getArchivedTimeline(any()))
+        .thenReturn(mockArchivedTimeline);
+    HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline(
+        archivedInstants.stream()
+            .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0),
+        i -> Option.empty())
+        .mergeTimeline(activeTimeline);
+    when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline)))
+        .thenReturn(mergedTimeline);
+
+    return mockMetaClient;
+  }
+
+  public void verifyTimeline(List<HoodieInstant> expectedInstants, 
HoodieTimeline timeline) {
+    assertEquals(
+        expectedInstants.stream().sorted().collect(Collectors.toList()),
+        timeline.getInstants().stream().sorted().collect(Collectors.toList())
+    );
+  }
+
   private void verifyExtraMetadataLatestValue(String extraMetadataKey, String 
expected, boolean includeClustering) {
     final Option<String> extraLatestValue;
     if (includeClustering) {       
@@ -344,4 +433,4 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata);
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index df2c96c8a98..420b2c8d9cf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -279,6 +279,14 @@ public class FlinkOptions extends HoodieConfig {
           + "usually with delta time compaction strategy that is long enough, 
for e.g, one week;\n"
           + "2) changelog mode is enabled, this option is a solution to keep 
data integrity");
 
+  // this option is experimental
+  public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = 
ConfigOptions
+      .key("read.streaming.skip_clustering")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Whether to skip clustering instants for streaming 
read,\n"
+          + "to avoid reading duplicates");
+
   public static final String START_COMMIT_EARLIEST = "earliest";
   public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
       .key("read.start-commit")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 0be2a5300f0..ba8cca30cef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -34,8 +34,10 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.types.logical.RowType;
@@ -89,6 +91,8 @@ public class IncrementalInputSplits implements Serializable {
   private final Set<String> requiredPartitions;
   // skip compaction
   private final boolean skipCompaction;
+  // skip clustering
+  private final boolean skipClustering;
 
   private IncrementalInputSplits(
       Configuration conf,
@@ -96,13 +100,15 @@ public class IncrementalInputSplits implements 
Serializable {
       RowType rowType,
       long maxCompactionMemoryInBytes,
       @Nullable Set<String> requiredPartitions,
-      boolean skipCompaction) {
+      boolean skipCompaction,
+      boolean skipClustering) {
     this.conf = conf;
     this.path = path;
     this.rowType = rowType;
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
     this.requiredPartitions = requiredPartitions;
     this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
   }
 
   /**
@@ -397,6 +403,17 @@ public class IncrementalInputSplits implements 
Serializable {
     return Collections.emptyList();
   }
 
+  private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
+    return filterInstantsByCondition(timeline);
+  }
+
+  private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
+    HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+    return filterInstantsByCondition(archivedCompleteTimeline);
+  }
+
   /**
    * Returns the instants with a given issuedInstant to start from.
    *
@@ -429,6 +446,25 @@ public class IncrementalInputSplits implements 
Serializable {
     return maySkipCompaction(instantStream).collect(Collectors.toList());
   }
 
+  /**
+   * Filters out the unnecessary instants by user specified condition.
+   *
+   * @param timeline The timeline
+   *
+   * @return the filtered timeline
+   */
+  private HoodieTimeline filterInstantsByCondition(HoodieTimeline timeline) {
+    final HoodieTimeline oriTimeline = timeline;
+    if (this.skipCompaction) {
+      // the compaction commit uses 'commit' as action which is tricky
+      timeline = timeline.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    }
+    if (this.skipClustering) {
+      timeline = timeline.filter(instant -> 
!ClusteringUtil.isClusteringInstant(instant, oriTimeline));
+    }
+    return timeline;
+  }
+
   private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> 
instants) {
     return this.skipCompaction
         ? instants.filter(instant -> 
!instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
@@ -488,6 +524,8 @@ public class IncrementalInputSplits implements Serializable 
{
     private Set<String> requiredPartitions;
     // skip compaction
     private boolean skipCompaction = false;
+    // skip clustering
+    private boolean skipClustering = true;
 
     public Builder() {
     }
@@ -522,10 +560,15 @@ public class IncrementalInputSplits implements 
Serializable {
       return this;
     }
 
+    public Builder skipClustering(boolean skipClustering) {
+      this.skipClustering = skipClustering;
+      return this;
+    }
+
     public IncrementalInputSplits build() {
       return new IncrementalInputSplits(
           Objects.requireNonNull(this.conf), 
Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
-          this.maxCompactionMemoryInBytes, this.requiredPartitions, 
this.skipCompaction);
+          this.maxCompactionMemoryInBytes, this.requiredPartitions, 
this.skipCompaction, this.skipClustering);
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index fde5130237c..8224278cdd1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -113,6 +113,7 @@ public class StreamReadMonitoringFunction
         .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
         .requiredPartitions(requiredPartitionPaths)
         
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
+        
.skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING))
         .build();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 580dbacc4d3..e6ce6d4800e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -19,16 +19,21 @@
 package org.apache.hudi.util;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieFlinkTable;
 
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -77,4 +82,18 @@ public class ClusteringUtil {
       table.getMetaClient().reloadActiveTimeline();
     });
   }
+
+  /**
+   * Returns whether the given instant {@code instant} is with clustering 
operation.
+   */
+  public static boolean isClusteringInstant(HoodieInstant instant, 
HoodieTimeline timeline) {
+    if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      return false;
+    }
+    try {
+      return TimelineUtils.getCommitMetadata(instant, 
timeline).getOperationType().equals(WriteOperationType.CLUSTER);
+    } catch (IOException e) {
+      throw new HoodieException("Resolve replace commit metadata error for 
instant: " + instant, e);
+    }
+  }
 }
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index af06f5908ce..d73bf2ede24 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -88,10 +88,9 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
    * Going through archive timeline is a costly operation, and it should be 
avoided unless some start time is given.
    */
   public Set<String> getDroppedPartitionsSince(Option<String> 
lastCommitTimeSynced) {
-    HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? 
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
-        .mergeTimeline(metaClient.getActiveTimeline())
-        .getCommitsTimeline()
-        .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : 
metaClient.getActiveTimeline();
+    HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get())
+        : metaClient.getActiveTimeline();
     return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
   }
 
@@ -117,10 +116,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
       return TimelineUtils.getWrittenPartitions(
-          metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
-              .mergeTimeline(metaClient.getActiveTimeline())
-              .getCommitsTimeline()
-              .findInstantsAfter(lastCommitTimeSynced.get(), 
Integer.MAX_VALUE));
+          TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get()));
     }
   }
 

Reply via email to