This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ab61f61df9 [HUDI-5477] Optimize timeline loading in Hudi sync client
(#7561)
ab61f61df9 is described below
commit ab61f61df9686793406300c0018924a119b02855
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 4 18:31:22 2023 -0800
[HUDI-5477] Optimize timeline loading in Hudi sync client (#7561)
Before this change, the Hudi archived timeline is always loaded during the
metastore sync process if the last sync time is given. Besides, the archived
timeline is not cached inside the meta client if the start instant time is
given. These cause performance issues and read timeout on cloud storage due to
rate limiting on requests because of loading archived timeline from the
storage, when the archived timeline is huge, e.g., hundreds of log files in
.hoodie/archived folder.
This change improves the timeline loading by
(1) only reading active timeline if the last sync time is the same as or
after the start of the active timeline;
(2) caching the archived timeline based on the start instant time in the
meta client, to avoid unnecessary repeated loading of the same archived
timeline.
---
.../hudi/common/table/HoodieTableMetaClient.java | 55 +++++++++---
.../hudi/common/table/timeline/TimelineUtils.java | 23 ++++-
.../hudi/common/table/TestTimelineUtils.java | 98 ++++++++++++++++++++--
.../apache/hudi/source/IncrementalInputSplits.java | 4 +-
.../apache/hudi/sync/common/HoodieSyncClient.java | 12 +--
5 files changed, 162 insertions(+), 30 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 9dcd50c1cd..70e9473db3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -58,6 +58,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -97,6 +98,10 @@ public class HoodieTableMetaClient implements Serializable {
public static final String MARKER_EXTN = ".marker";
+ // In-memory cache for archived timeline based on the start instant time
+ // Only one entry should be present in this map
+ private final Map<String, HoodieArchivedTimeline> archivedTimelineMap = new
HashMap<>();
+
// NOTE: Since those two parameters lay on the hot-path of a lot of
computations, we
// use tailored extension of the {@code Path} class allowing to avoid
repetitive
// computations secured by its immutability
@@ -109,7 +114,6 @@ public class HoodieTableMetaClient implements Serializable {
private TimelineLayoutVersion timelineLayoutVersion;
protected HoodieTableConfig tableConfig;
protected HoodieActiveTimeline activeTimeline;
- private HoodieArchivedTimeline archivedTimeline;
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig =
FileSystemRetryConfig.newBuilder().build();
protected HoodieMetastoreConfig metastoreConfig;
@@ -371,10 +375,7 @@ public class HoodieTableMetaClient implements Serializable
{
* @return Archived commit timeline
*/
public synchronized HoodieArchivedTimeline getArchivedTimeline() {
- if (archivedTimeline == null) {
- archivedTimeline = new HoodieArchivedTimeline(this);
- }
- return archivedTimeline;
+ return getArchivedTimeline(StringUtils.EMPTY_STRING);
}
public HoodieMetastoreConfig getMetastoreConfig() {
@@ -385,21 +386,49 @@ public class HoodieTableMetaClient implements
Serializable {
}
/**
- * Returns fresh new archived commits as a timeline from startTs (inclusive).
- *
- * <p>This is costly operation if really early endTs is specified.
- * Be caution to use this only when the time range is short.
- *
- * <p>This method is not thread safe.
+ * Returns the cached archived timeline from startTs (inclusive).
*
- * @return Archived commit timeline
+ * @param startTs The start instant time (inclusive) of the archived
timeline.
+ * @return the archived timeline.
*/
public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
- return new HoodieArchivedTimeline(this, startTs);
+ return getArchivedTimeline(startTs, true);
+ }
+
+ /**
+ * Returns the cached archived timeline if using in-memory cache or a fresh
new archived
+ * timeline if not using cache, from startTs (inclusive).
+ * <p>
+ * Instantiating an archived timeline is costly operation if really early
startTs is
+ * specified.
+ * <p>
+ * This method is not thread safe.
+ *
+ * @param startTs The start instant time (inclusive) of the archived
timeline.
+ * @param useCache Whether to use in-memory cache.
+ * @return the archived timeline based on the arguments.
+ */
+ public HoodieArchivedTimeline getArchivedTimeline(String startTs, boolean
useCache) {
+ if (useCache) {
+ if (!archivedTimelineMap.containsKey(startTs)) {
+ // Only keep one entry in the map
+ archivedTimelineMap.clear();
+ archivedTimelineMap.put(startTs, instantiateArchivedTimeline(startTs));
+ }
+ return archivedTimelineMap.get(startTs);
+ }
+ return instantiateArchivedTimeline(startTs);
+ }
+
+ private HoodieArchivedTimeline instantiateArchivedTimeline(String startTs) {
+ return StringUtils.isNullOrEmpty(startTs)
+ ? new HoodieArchivedTimeline(this)
+ : new HoodieArchivedTimeline(this, startTs);
}
/**
* Validate table properties.
+ *
* @param properties Properties from writeConfig.
*/
public void validateTableProperties(Properties properties) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 6b517d6022..1f9d416b2b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -210,11 +210,30 @@ public class TimelineUtils {
return activeTimeline;
}
+ /**
+ * Returns a Hudi timeline with commits after the given instant time
(exclusive).
+ *
+ * @param metaClient {@link HoodieTableMetaClient} instance.
+ * @param exclusiveStartInstantTime Start instant time (exclusive).
+ * @return Hudi timeline.
+ */
+ public static HoodieTimeline getCommitsTimelineAfter(
+ HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieDefaultTimeline timeline =
+ activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+ ? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
+ .mergeTimeline(activeTimeline)
+ : activeTimeline;
+ return timeline.getCommitsTimeline()
+ .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
+ }
+
/**
* Returns the commit metadata of the given instant.
*
- * @param instant The hoodie instant
- * @param timeline The timeline
+ * @param instant The hoodie instant
+ * @param timeline The timeline
* @return the commit metadata
*/
public static HoodieCommitMetadata getCommitMetadata(
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 842e7069ec..5e91118b26 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -30,6 +30,8 @@ import
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -51,10 +53,21 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests {@link TimelineUtils}.
@@ -119,7 +132,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
String olderPartition = "0"; // older partitions that is modified by all
cleans
for (int i = 1; i <= 5; i++) {
String ts = i + "";
- HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, ts);
+ HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant,
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
@@ -158,7 +171,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
String partitionPath = "";
for (int i = 1; i <= 5; i++) {
String ts = i + "";
- HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, ts);
+ HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant,
Option.of(getCommitMetadata(basePath, partitionPath, ts, 2,
Collections.emptyMap())));
@@ -187,7 +200,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
String ts = i + "";
HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.RESTORE_ACTION, ts);
activeTimeline.createNewInstant(instant);
- activeTimeline.saveAsComplete(instant,
Option.of(getRestoreMetadata(basePath, ts, ts, 2,
HoodieTimeline.COMMIT_ACTION)));
+ activeTimeline.saveAsComplete(instant,
Option.of(getRestoreMetadata(basePath, ts, ts, 2, COMMIT_ACTION)));
}
metaClient.reloadActiveTimeline();
@@ -210,12 +223,12 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient,
extraMetadataKey).isPresent());
String ts = "0";
- HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, ts);
+ HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant,
Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
ts = "1";
- instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
+ instant = new HoodieInstant(true, COMMIT_ACTION, ts);
activeTimeline.createNewInstant(instant);
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(extraMetadataKey, extraMetadataValue1);
@@ -251,6 +264,81 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
}
+ @Test
+ public void testGetCommitsTimelineAfter() throws IOException {
+ // Should only load active timeline
+ String startTs = "010";
+ HoodieTableMetaClient mockMetaClient = prepareMetaClient(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+ Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+ startTs
+ );
+ verifyTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+ verify(mockMetaClient, never()).getArchivedTimeline(any());
+
+ // Should load both archived and active timeline
+ startTs = "001";
+ mockMetaClient = prepareMetaClient(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+ Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+ startTs
+ );
+ verifyTimeline(
+ Arrays.asList(
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+ verify(mockMetaClient, times(1)).getArchivedTimeline(any());
+ }
+
+ private HoodieTableMetaClient prepareMetaClient(
+ List<HoodieInstant> activeInstants,
+ List<HoodieInstant> archivedInstants,
+ String startTs
+ ) throws IOException {
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+ HoodieArchivedTimeline mockArchivedTimeline =
mock(HoodieArchivedTimeline.class);
+ when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true)))
+ .thenReturn(activeInstants);
+ HoodieActiveTimeline activeTimeline = new
HoodieActiveTimeline(mockMetaClient);
+ when(mockMetaClient.getActiveTimeline())
+ .thenReturn(activeTimeline);
+ when(mockMetaClient.getArchivedTimeline(any()))
+ .thenReturn(mockArchivedTimeline);
+ HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline(
+ archivedInstants.stream()
+ .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0),
+ i -> Option.empty())
+ .mergeTimeline(activeTimeline);
+ when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline)))
+ .thenReturn(mergedTimeline);
+
+ return mockMetaClient;
+ }
+
+ public void verifyTimeline(List<HoodieInstant> expectedInstants,
HoodieTimeline timeline) {
+ assertEquals(
+ expectedInstants.stream().sorted().collect(Collectors.toList()),
+ timeline.getInstants().stream().sorted().collect(Collectors.toList())
+ );
+ }
+
private void verifyExtraMetadataLatestValue(String extraMetadataKey, String
expected, boolean includeClustering) {
final Option<String> extraLatestValue;
if (includeClustering) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 92ba50cf19..a5bab0e575 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -18,7 +18,6 @@
package org.apache.hudi.source;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
@@ -43,6 +42,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
@@ -462,7 +462,7 @@ public class IncrementalInputSplits implements Serializable
{
}
private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient
metaClient, String startInstant) {
- HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant);
+ HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline(startInstant, false);
HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
return filterInstantsByCondition(archivedCompleteTimeline);
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 56ff82f5e4..1db3707895 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -88,10 +88,9 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
* Going through archive timeline is a costly operation, and it should be
avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced) {
- HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ?
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
- .mergeTimeline(metaClient.getActiveTimeline())
- .getCommitsTimeline()
- .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) :
metaClient.getActiveTimeline();
+ HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+ ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get())
+ : metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}
@@ -126,10 +125,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ",
Getting commits since then");
return TimelineUtils.getWrittenPartitions(
- metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
- .mergeTimeline(metaClient.getActiveTimeline())
- .getCommitsTimeline()
- .findInstantsAfter(lastCommitTimeSynced.get(),
Integer.MAX_VALUE));
+ TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get()));
}
}