This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ab61f61df9 [HUDI-5477] Optimize timeline loading in Hudi sync client 
(#7561)
ab61f61df9 is described below

commit ab61f61df9686793406300c0018924a119b02855
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 4 18:31:22 2023 -0800

    [HUDI-5477] Optimize timeline loading in Hudi sync client (#7561)
    
    Before this change, the Hudi archived timeline is always loaded during the 
metastore sync process if the last sync time is given. Besides, the archived 
timeline is not cached inside the meta client if the start instant time is 
given. These cause performance issues and read timeout on cloud storage due to 
rate limiting on requests because of loading archived timeline from the 
storage, when the archived timeline is huge, e.g., hundreds of log files in 
.hoodie/archived folder.
    
    This change improves the timeline loading by
    (1) only reading active timeline if the last sync time is the same as or 
after the start of the active timeline;
    (2) caching the archived timeline based on the start instant time in the 
meta client, to avoid unnecessary repeated loading of the same archived 
timeline.
---
 .../hudi/common/table/HoodieTableMetaClient.java   | 55 +++++++++---
 .../hudi/common/table/timeline/TimelineUtils.java  | 23 ++++-
 .../hudi/common/table/TestTimelineUtils.java       | 98 ++++++++++++++++++++--
 .../apache/hudi/source/IncrementalInputSplits.java |  4 +-
 .../apache/hudi/sync/common/HoodieSyncClient.java  | 12 +--
 5 files changed, 162 insertions(+), 30 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 9dcd50c1cd..70e9473db3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -58,6 +58,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -97,6 +98,10 @@ public class HoodieTableMetaClient implements Serializable {
 
   public static final String MARKER_EXTN = ".marker";
 
+  // In-memory cache for archived timeline based on the start instant time
+  // Only one entry should be present in this map
+  private final Map<String, HoodieArchivedTimeline> archivedTimelineMap = new 
HashMap<>();
+
   // NOTE: Since those two parameters lay on the hot-path of a lot of 
computations, we
   //       use tailored extension of the {@code Path} class allowing to avoid 
repetitive
   //       computations secured by its immutability
@@ -109,7 +114,6 @@ public class HoodieTableMetaClient implements Serializable {
   private TimelineLayoutVersion timelineLayoutVersion;
   protected HoodieTableConfig tableConfig;
   protected HoodieActiveTimeline activeTimeline;
-  private HoodieArchivedTimeline archivedTimeline;
   private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
   private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
   protected HoodieMetastoreConfig metastoreConfig;
@@ -371,10 +375,7 @@ public class HoodieTableMetaClient implements Serializable 
{
    * @return Archived commit timeline
    */
   public synchronized HoodieArchivedTimeline getArchivedTimeline() {
-    if (archivedTimeline == null) {
-      archivedTimeline = new HoodieArchivedTimeline(this);
-    }
-    return archivedTimeline;
+    return getArchivedTimeline(StringUtils.EMPTY_STRING);
   }
 
   public HoodieMetastoreConfig getMetastoreConfig() {
@@ -385,21 +386,49 @@ public class HoodieTableMetaClient implements 
Serializable {
   }
 
   /**
-   * Returns fresh new archived commits as a timeline from startTs (inclusive).
-   *
-   * <p>This is costly operation if really early endTs is specified.
-   * Be caution to use this only when the time range is short.
-   *
-   * <p>This method is not thread safe.
+   * Returns the cached archived timeline from startTs (inclusive).
    *
-   * @return Archived commit timeline
+   * @param startTs The start instant time (inclusive) of the archived 
timeline.
+   * @return the archived timeline.
    */
   public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
-    return new HoodieArchivedTimeline(this, startTs);
+    return getArchivedTimeline(startTs, true);
+  }
+
+  /**
+   * Returns the cached archived timeline if using in-memory cache or a fresh 
new archived
+   * timeline if not using cache, from startTs (inclusive).
+   * <p>
+   * Instantiating an archived timeline is costly operation if really early 
startTs is
+   * specified.
+   * <p>
+   * This method is not thread safe.
+   *
+   * @param startTs  The start instant time (inclusive) of the archived 
timeline.
+   * @param useCache Whether to use in-memory cache.
+   * @return the archived timeline based on the arguments.
+   */
+  public HoodieArchivedTimeline getArchivedTimeline(String startTs, boolean 
useCache) {
+    if (useCache) {
+      if (!archivedTimelineMap.containsKey(startTs)) {
+        // Only keep one entry in the map
+        archivedTimelineMap.clear();
+        archivedTimelineMap.put(startTs, instantiateArchivedTimeline(startTs));
+      }
+      return archivedTimelineMap.get(startTs);
+    }
+    return instantiateArchivedTimeline(startTs);
+  }
+
+  private HoodieArchivedTimeline instantiateArchivedTimeline(String startTs) {
+    return StringUtils.isNullOrEmpty(startTs)
+        ? new HoodieArchivedTimeline(this)
+        : new HoodieArchivedTimeline(this, startTs);
   }
 
   /**
    * Validate table properties.
+   *
    * @param properties Properties from writeConfig.
    */
   public void validateTableProperties(Properties properties) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 6b517d6022..1f9d416b2b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -210,11 +210,30 @@ public class TimelineUtils {
     return activeTimeline;
   }
 
+  /**
+   * Returns a Hudi timeline with commits after the given instant time 
(exclusive).
+   *
+   * @param metaClient                {@link HoodieTableMetaClient} instance.
+   * @param exclusiveStartInstantTime Start instant time (exclusive).
+   * @return Hudi timeline.
+   */
+  public static HoodieTimeline getCommitsTimelineAfter(
+      HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieDefaultTimeline timeline =
+        activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+            ? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
+            .mergeTimeline(activeTimeline)
+            : activeTimeline;
+    return timeline.getCommitsTimeline()
+        .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
+  }
+  
   /**
    * Returns the commit metadata of the given instant.
    *
-   * @param instant   The hoodie instant
-   * @param timeline  The timeline
+   * @param instant  The hoodie instant
+   * @param timeline The timeline
    * @return the commit metadata
    */
   public static HoodieCommitMetadata getCommitMetadata(
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 842e7069ec..5e91118b26 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -30,6 +30,8 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -51,10 +53,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests {@link TimelineUtils}.
@@ -119,7 +132,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     String olderPartition = "0"; // older partitions that is modified by all 
cleans
     for (int i = 1; i <= 5; i++) {
       String ts = i + "";
-      HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, ts);
+      HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
       activeTimeline.createNewInstant(instant);
       activeTimeline.saveAsComplete(instant, 
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
 
@@ -158,7 +171,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     String partitionPath = "";
     for (int i = 1; i <= 5; i++) {
       String ts = i + "";
-      HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, ts);
+      HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
       activeTimeline.createNewInstant(instant);
       activeTimeline.saveAsComplete(instant, 
Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, 
Collections.emptyMap())));
 
@@ -187,7 +200,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
       String ts = i + "";
       HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.RESTORE_ACTION, ts);
       activeTimeline.createNewInstant(instant);
-      activeTimeline.saveAsComplete(instant, 
Option.of(getRestoreMetadata(basePath, ts, ts, 2, 
HoodieTimeline.COMMIT_ACTION)));
+      activeTimeline.saveAsComplete(instant, 
Option.of(getRestoreMetadata(basePath, ts, ts, 2, COMMIT_ACTION)));
     }
 
     metaClient.reloadActiveTimeline();
@@ -210,12 +223,12 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, 
extraMetadataKey).isPresent());
 
     String ts = "0";
-    HoodieInstant instant = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, ts);
+    HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
     activeTimeline.createNewInstant(instant);
     activeTimeline.saveAsComplete(instant, 
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
 
     ts = "1";
-    instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+    instant = new HoodieInstant(true, COMMIT_ACTION, ts);
     activeTimeline.createNewInstant(instant);
     Map<String, String> extraMetadata = new HashMap<>();
     extraMetadata.put(extraMetadataKey, extraMetadataValue1);
@@ -251,6 +264,81 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
   }
 
+  @Test
+  public void testGetCommitsTimelineAfter() throws IOException {
+    // Should only load active timeline
+    String startTs = "010";
+    HoodieTableMetaClient mockMetaClient = prepareMetaClient(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+        startTs
+    );
+    verifyTimeline(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+    verify(mockMetaClient, never()).getArchivedTimeline(any());
+
+    // Should load both archived and active timeline
+    startTs = "001";
+    mockMetaClient = prepareMetaClient(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+        startTs
+    );
+    verifyTimeline(
+        Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+    verify(mockMetaClient, times(1)).getArchivedTimeline(any());
+  }
+
+  private HoodieTableMetaClient prepareMetaClient(
+      List<HoodieInstant> activeInstants,
+      List<HoodieInstant> archivedInstants,
+      String startTs
+  ) throws IOException {
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    HoodieArchivedTimeline mockArchivedTimeline = 
mock(HoodieArchivedTimeline.class);
+    when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true)))
+        .thenReturn(activeInstants);
+    HoodieActiveTimeline activeTimeline = new 
HoodieActiveTimeline(mockMetaClient);
+    when(mockMetaClient.getActiveTimeline())
+        .thenReturn(activeTimeline);
+    when(mockMetaClient.getArchivedTimeline(any()))
+        .thenReturn(mockArchivedTimeline);
+    HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline(
+        archivedInstants.stream()
+            .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0),
+        i -> Option.empty())
+        .mergeTimeline(activeTimeline);
+    when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline)))
+        .thenReturn(mergedTimeline);
+
+    return mockMetaClient;
+  }
+
+  public void verifyTimeline(List<HoodieInstant> expectedInstants, 
HoodieTimeline timeline) {
+    assertEquals(
+        expectedInstants.stream().sorted().collect(Collectors.toList()),
+        timeline.getInstants().stream().sorted().collect(Collectors.toList())
+    );
+  }
+
   private void verifyExtraMetadataLatestValue(String extraMetadataKey, String 
expected, boolean includeClustering) {
     final Option<String> extraLatestValue;
     if (includeClustering) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 92ba50cf19..a5bab0e575 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.source;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
@@ -43,6 +42,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.types.logical.RowType;
@@ -462,7 +462,7 @@ public class IncrementalInputSplits implements Serializable 
{
   }
 
   private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient 
metaClient, String startInstant) {
-    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant);
+    HoodieArchivedTimeline archivedTimeline = 
metaClient.getArchivedTimeline(startInstant, false);
     HoodieTimeline archivedCompleteTimeline = 
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
     return filterInstantsByCondition(archivedCompleteTimeline);
   }
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 56ff82f5e4..1db3707895 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -88,10 +88,9 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
    * Going through archive timeline is a costly operation, and it should be 
avoided unless some start time is given.
    */
   public Set<String> getDroppedPartitionsSince(Option<String> 
lastCommitTimeSynced) {
-    HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? 
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
-        .mergeTimeline(metaClient.getActiveTimeline())
-        .getCommitsTimeline()
-        .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : 
metaClient.getActiveTimeline();
+    HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get())
+        : metaClient.getActiveTimeline();
     return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
   }
 
@@ -126,10 +125,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
       return TimelineUtils.getWrittenPartitions(
-          metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
-              .mergeTimeline(metaClient.getActiveTimeline())
-              .getCommitsTimeline()
-              .findInstantsAfter(lastCommitTimeSynced.get(), 
Integer.MAX_VALUE));
+          TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get()));
     }
   }
 

Reply via email to