This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dac93e22f0b64cc0e2e33024c4b37ade50841282 Author: Y Ethan Guo <[email protected]> AuthorDate: Thu Jan 5 10:31:22 2023 +0800 [HUDI-5477] Optimize timeline loading in Hudi sync client (#7561) Before this change, the Hudi archived timeline is always loaded during the metastore sync process if the last sync time is given. Besides, the archived timeline is not cached inside the meta client if the start instant time is given. These cause performance issues and read timeout on cloud storage due to rate limiting on requests because of loading archived timeline from the storage, when the archived timeline is huge, e.g., hundreds of log files in .hoodie/archived folder. This change improves the timeline loading by (1) only reading active timeline if the last sync time is the same as or after the start of the active timeline; (2) caching the archived timeline based on the start instant time in the meta client, to avoid unnecessary repeated loading of the same archived timeline. (cherry picked from commit ab61f61df9686793406300c0018924a119b02855) --- .../hudi/common/table/HoodieTableMetaClient.java | 55 ++++++++--- .../hudi/common/table/timeline/TimelineUtils.java | 37 ++++++++ .../hudi/common/table/TestTimelineUtils.java | 101 +++++++++++++++++++-- .../apache/hudi/configuration/FlinkOptions.java | 8 ++ .../apache/hudi/source/IncrementalInputSplits.java | 47 +++++++++- .../hudi/source/StreamReadMonitoringFunction.java | 1 + .../java/org/apache/hudi/util/ClusteringUtil.java | 19 ++++ .../apache/hudi/sync/common/HoodieSyncClient.java | 12 +-- 8 files changed, 251 insertions(+), 29 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 39f27f4160f..990142f496c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -58,6 +58,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -97,6 +98,10 @@ public class HoodieTableMetaClient implements Serializable { public static final String MARKER_EXTN = ".marker"; + // In-memory cache for archived timeline based on the start instant time + // Only one entry should be present in this map + private final Map<String, HoodieArchivedTimeline> archivedTimelineMap = new HashMap<>(); + // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we // use tailored extension of the {@code Path} class allowing to avoid repetitive // computations secured by its immutability @@ -110,7 +115,6 @@ public class HoodieTableMetaClient implements Serializable { private TimelineLayoutVersion timelineLayoutVersion; protected HoodieTableConfig tableConfig; protected HoodieActiveTimeline activeTimeline; - private HoodieArchivedTimeline archivedTimeline; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); protected HoodieMetastoreConfig metastoreConfig; @@ -365,10 +369,7 @@ public class HoodieTableMetaClient implements Serializable { * @return Archived commit timeline */ public synchronized HoodieArchivedTimeline getArchivedTimeline() { - if (archivedTimeline == null) { - archivedTimeline = new HoodieArchivedTimeline(this); - } - return archivedTimeline; + return getArchivedTimeline(StringUtils.EMPTY_STRING); } public HoodieMetastoreConfig getMetastoreConfig() { @@ -379,21 +380,49 @@ public class HoodieTableMetaClient implements Serializable { } /** - * Returns fresh new archived commits as a timeline from startTs (inclusive). - * - * <p>This is costly operation if really early endTs is specified. - * Be caution to use this only when the time range is short. - * - * <p>This method is not thread safe. + * Returns the cached archived timeline from startTs (inclusive). * - * @return Archived commit timeline + * @param startTs The start instant time (inclusive) of the archived timeline. + * @return the archived timeline. */ public HoodieArchivedTimeline getArchivedTimeline(String startTs) { - return new HoodieArchivedTimeline(this, startTs); + return getArchivedTimeline(startTs, true); + } + + /** + * Returns the cached archived timeline if using in-memory cache or a fresh new archived + * timeline if not using cache, from startTs (inclusive). + * <p> + * Instantiating an archived timeline is costly operation if really early startTs is + * specified. + * <p> + * This method is not thread safe. + * + * @param startTs The start instant time (inclusive) of the archived timeline. + * @param useCache Whether to use in-memory cache. + * @return the archived timeline based on the arguments. + */ + public HoodieArchivedTimeline getArchivedTimeline(String startTs, boolean useCache) { + if (useCache) { + if (!archivedTimelineMap.containsKey(startTs)) { + // Only keep one entry in the map + archivedTimelineMap.clear(); + archivedTimelineMap.put(startTs, instantiateArchivedTimeline(startTs)); + } + return archivedTimelineMap.get(startTs); + } + return instantiateArchivedTimeline(startTs); + } + + private HoodieArchivedTimeline instantiateArchivedTimeline(String startTs) { + return StringUtils.isNullOrEmpty(startTs) + ? new HoodieArchivedTimeline(this) + : new HoodieArchivedTimeline(this, startTs); } /** * Validate table properties. + * * @param properties Properties from writeConfig. */ public void validateTableProperties(Properties properties) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 1b8450eecca..5cc52295c49 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -209,4 +209,41 @@ public class TimelineUtils { } return activeTimeline; } + + /** + * Returns a Hudi timeline with commits after the given instant time (exclusive). + * + * @param metaClient {@link HoodieTableMetaClient} instance. + * @param exclusiveStartInstantTime Start instant time (exclusive). + * @return Hudi timeline. + */ + public static HoodieTimeline getCommitsTimelineAfter( + HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieDefaultTimeline timeline = + activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime) + ? metaClient.getArchivedTimeline(exclusiveStartInstantTime) + .mergeTimeline(activeTimeline) + : activeTimeline; + return timeline.getCommitsTimeline() + .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE); + } + + /** + * Returns the commit metadata of the given instant. + * + * @param instant The hoodie instant + * @param timeline The timeline + * @return the commit metadata + */ + public static HoodieCommitMetadata getCommitMetadata( + HoodieInstant instant, + HoodieTimeline timeline) throws IOException { + byte[] data = timeline.getInstantDetails(instant).get(); + if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class); + } else { + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index da078372b5c..0cb1036eddb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -30,6 +30,8 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -49,10 +51,21 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestTimelineUtils extends HoodieCommonTestHarness { @@ -109,7 +122,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { String olderPartition = "0"; // older partitions that is modified by all cleans for (int i = 1; i <= 5; i++) { String ts = i + ""; - HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); @@ -148,7 +161,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { String partitionPath = ""; for (int i = 1; i <= 5; i++) { String ts = i + ""; - HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap()))); @@ -177,7 +190,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { String ts = i + ""; HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, ts); activeTimeline.createNewInstant(instant); - activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2, HoodieTimeline.COMMIT_ACTION))); + activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2, COMMIT_ACTION))); } metaClient.reloadActiveTimeline(); @@ -200,12 +213,12 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent()); String ts = "0"; - HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + HoodieInstant instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); ts = "1"; - instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + instant = new HoodieInstant(true, COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); Map<String, String> extraMetadata = new HashMap<>(); extraMetadata.put(extraMetadataKey, extraMetadataValue1); @@ -241,6 +254,82 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get()); } + + @Test + public void testGetCommitsTimelineAfter() throws IOException { + // Should only load active timeline + String startTs = "010"; + HoodieTableMetaClient mockMetaClient = prepareMetaClient( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")), + startTs + ); + verifyTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs)); + verify(mockMetaClient, never()).getArchivedTimeline(any()); + + // Should load both archived and active timeline + startTs = "001"; + mockMetaClient = prepareMetaClient( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")), + startTs + ); + verifyTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")), + TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs)); + verify(mockMetaClient, times(1)).getArchivedTimeline(any()); + } + + private HoodieTableMetaClient prepareMetaClient( + List<HoodieInstant> activeInstants, + List<HoodieInstant> archivedInstants, + String startTs + ) throws IOException { + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + HoodieArchivedTimeline mockArchivedTimeline = mock(HoodieArchivedTimeline.class); + when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true))) + .thenReturn(activeInstants); + HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(mockMetaClient); + when(mockMetaClient.getActiveTimeline()) + .thenReturn(activeTimeline); + when(mockMetaClient.getArchivedTimeline(any())) + .thenReturn(mockArchivedTimeline); + HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline( + archivedInstants.stream() + .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0), + i -> Option.empty()) + .mergeTimeline(activeTimeline); + when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline))) + .thenReturn(mergedTimeline); + + return mockMetaClient; + } + + public void verifyTimeline(List<HoodieInstant> expectedInstants, HoodieTimeline timeline) { + assertEquals( + expectedInstants.stream().sorted().collect(Collectors.toList()), + timeline.getInstants().stream().sorted().collect(Collectors.toList()) + ); + } + private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) { final Option<String> extraLatestValue; if (includeClustering) { @@ -344,4 +433,4 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata); } -} \ No newline at end of file +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index df2c96c8a98..420b2c8d9cf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -279,6 +279,14 @@ public class FlinkOptions extends HoodieConfig { + "usually with delta time compaction strategy that is long enough, for e.g, one week;\n" + "2) changelog mode is enabled, this option is a solution to keep data integrity"); + // this option is experimental + public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = ConfigOptions + .key("read.streaming.skip_clustering") + .booleanType() + .defaultValue(false) + .withDescription("Whether to skip clustering instants for streaming read,\n" + + "to avoid reading duplicates"); + public static final String START_COMMIT_EARLIEST = "earliest"; public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions .key("read.start-commit") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 0be2a5300f0..ba8cca30cef 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -34,8 +34,10 @@ import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.types.logical.RowType; @@ -89,6 +91,8 @@ public class IncrementalInputSplits implements Serializable { private final Set<String> requiredPartitions; // skip compaction private final boolean skipCompaction; + // skip clustering + private final boolean skipClustering; private IncrementalInputSplits( Configuration conf, @@ -96,13 +100,15 @@ public class IncrementalInputSplits implements Serializable { RowType rowType, long maxCompactionMemoryInBytes, @Nullable Set<String> requiredPartitions, - boolean skipCompaction) { + boolean skipCompaction, + boolean skipClustering) { this.conf = conf; this.path = path; this.rowType = rowType; this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.requiredPartitions = requiredPartitions; this.skipCompaction = skipCompaction; + this.skipClustering = skipClustering; } /** @@ -397,6 +403,17 @@ public class IncrementalInputSplits implements Serializable { return Collections.emptyList(); } + private HoodieTimeline getReadTimeline(HoodieTableMetaClient metaClient) { + HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); + return filterInstantsByCondition(timeline); + } + + private HoodieTimeline getArchivedReadTimeline(HoodieTableMetaClient metaClient, String startInstant) { + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(startInstant, false); + HoodieTimeline archivedCompleteTimeline = archivedTimeline.getCommitsTimeline().filterCompletedInstants(); + return filterInstantsByCondition(archivedCompleteTimeline); + } + /** * Returns the instants with a given issuedInstant to start from. * @@ -429,6 +446,25 @@ public class IncrementalInputSplits implements Serializable { return maySkipCompaction(instantStream).collect(Collectors.toList()); } + /** + * Filters out the unnecessary instants by user specified condition. + * + * @param timeline The timeline + * + * @return the filtered timeline + */ + private HoodieTimeline filterInstantsByCondition(HoodieTimeline timeline) { + final HoodieTimeline oriTimeline = timeline; + if (this.skipCompaction) { + // the compaction commit uses 'commit' as action which is tricky + timeline = timeline.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)); + } + if (this.skipClustering) { + timeline = timeline.filter(instant -> !ClusteringUtil.isClusteringInstant(instant, oriTimeline)); + } + return timeline; + } + private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> instants) { return this.skipCompaction ? instants.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) @@ -488,6 +524,8 @@ public class IncrementalInputSplits implements Serializable { private Set<String> requiredPartitions; // skip compaction private boolean skipCompaction = false; + // skip clustering + private boolean skipClustering = true; public Builder() { } @@ -522,10 +560,15 @@ public class IncrementalInputSplits implements Serializable { return this; } + public Builder skipClustering(boolean skipClustering) { + this.skipClustering = skipClustering; + return this; + } + public IncrementalInputSplits build() { return new IncrementalInputSplits( Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), - this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction); + this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction, this.skipClustering); } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index fde5130237c..8224278cdd1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -113,6 +113,7 @@ public class StreamReadMonitoringFunction .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) .requiredPartitions(requiredPartitionPaths) .skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)) + .skipClustering(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING)) .build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java index 580dbacc4d3..e6ce6d4800e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java @@ -19,16 +19,21 @@ package org.apache.hudi.util; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -77,4 +82,18 @@ public class ClusteringUtil { table.getMetaClient().reloadActiveTimeline(); }); } + + /** + * Returns whether the given instant {@code instant} is with clustering operation. + */ + public static boolean isClusteringInstant(HoodieInstant instant, HoodieTimeline timeline) { + if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return false; + } + try { + return TimelineUtils.getCommitMetadata(instant, timeline).getOperationType().equals(WriteOperationType.CLUSTER); + } catch (IOException e) { + throw new HoodieException("Resolve replace commit metadata error for instant: " + instant, e); + } + } } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index af06f5908ce..d73bf2ede24 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -88,10 +88,9 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. */ public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) { - HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) - .mergeTimeline(metaClient.getActiveTimeline()) - .getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline(); + HoodieTimeline timeline = lastCommitTimeSynced.isPresent() + ? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get()) + : metaClient.getActiveTimeline(); return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline)); } @@ -117,10 +116,7 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); return TimelineUtils.getWrittenPartitions( - metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) - .mergeTimeline(metaClient.getActiveTimeline()) - .getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); + TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get())); } }
