This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dbe16f3f965a76c94804e19063df0253dd6e69d7 Author: Sagar Sumit <[email protected]> AuthorDate: Fri Mar 8 23:34:53 2024 +0530 [HUDI-7411] Meta sync should consider cleaner commit (#10676) --- .../hudi/common/table/timeline/TimelineUtils.java | 27 ++++++++++--- .../hudi/common/table/TestTimelineUtils.java | 46 ++++++++++++++++++++-- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 7 +--- .../apache/hudi/sync/common/HoodieSyncClient.java | 5 +-- 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 52788acc437..ca6d5b57907 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -81,13 +81,15 @@ public class TimelineUtils { } /** - * Returns partitions that have been deleted or marked for deletion in the given timeline. + * Returns partitions that have been deleted or marked for deletion in the timeline between given commit time range. * Does not include internal operations such as clean in the timeline. */ - public static List<String> getDroppedPartitions(HoodieTimeline timeline) { + public static List<String> getDroppedPartitions(HoodieTableMetaClient metaClient, Option<String> lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) { + HoodieTimeline timeline = lastCommitTimeSynced.isPresent() + ? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced) + : metaClient.getActiveTimeline(); HoodieTimeline completedTimeline = timeline.getWriteTimeline().filterCompletedInstants(); HoodieTimeline replaceCommitTimeline = completedTimeline.getCompletedReplaceTimeline(); - Map<String, String> partitionToLatestDeleteTimestamp = replaceCommitTimeline.getInstantsAsStream() .map(instant -> { try { @@ -102,6 +104,21 @@ public class TimelineUtils { .flatMap(pair -> pair.getRight().getPartitionToReplaceFileIds().keySet().stream() .map(partition -> new AbstractMap.SimpleEntry<>(partition, pair.getLeft().getTimestamp())) ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (existing, replace) -> replace)); + // cleaner could delete a partition when there are no active filegroups in the partition + HoodieTimeline cleanerTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); + cleanerTimeline.getInstantsAsStream() + .forEach(instant -> { + try { + HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(cleanerTimeline.getInstantDetails(instant).get()); + cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + if (partitionMetadata.getIsPartitionDeleted()) { + partitionToLatestDeleteTimestamp.put(partition, instant.getTimestamp()); + } + }); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions cleaned at " + instant, e); + } + }); if (partitionToLatestDeleteTimestamp.isEmpty()) { // There is no dropped partitions @@ -244,7 +261,7 @@ public class TimelineUtils { return false; } catch (IOException e) { - throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); + throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePathV2().toString(), e); } } @@ -440,7 +457,7 @@ public class TimelineUtils { } public enum HollowCommitHandling { - FAIL, BLOCK, USE_TRANSITION_TIME; + FAIL, BLOCK, USE_TRANSITION_TIME } public static boolean isDeletePartition(WriteOperationType operation) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 842366940da..eef515c6ada 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -158,7 +158,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts); activeTimeline.createNewInstant(cleanInstant); - activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts)); + activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts, false)); } metaClient.reloadActiveTimeline(); @@ -197,7 +197,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts); activeTimeline.createNewInstant(cleanInstant); - activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts)); + activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts, false)); } metaClient.reloadActiveTimeline(); @@ -553,7 +553,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { return getUTF8Bytes(commit.toJsonString()); } - private Option<byte[]> getCleanMetadata(String partition, String time) throws IOException { + private Option<byte[]> getCleanMetadata(String partition, String time, boolean isPartitionDeleted) throws IOException { Map<String, HoodieCleanPartitionMetadata> partitionToFilesCleaned = new HashMap<>(); List<String> filesDeleted = new ArrayList<>(); filesDeleted.add("file-" + partition + "-" + time + "1"); @@ -564,6 +564,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { .setFailedDeleteFiles(Collections.emptyList()) .setDeletePathPatterns(Collections.emptyList()) .setSuccessDeleteFiles(filesDeleted) + .setIsPartitionDeleted(isPartitionDeleted) .build(); partitionToFilesCleaned.putIfAbsent(partition, partitionMetadata); HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata.newBuilder() @@ -611,4 +612,43 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { fail("should cover all handling mode."); } } + + @Test + public void testGetDroppedPartitions() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + assertTrue(activeCommitTimeline.empty()); + + String olderPartition = "p1"; // older partitions that will be deleted by clean commit + // first insert to the older partition + HoodieInstant instant1 = new HoodieInstant(true, COMMIT_ACTION, "00001"); + activeTimeline.createNewInstant(instant1); + activeTimeline.saveAsComplete(instant1, Option.of(getCommitMetadata(basePath, olderPartition, "00001", 2, Collections.emptyMap()))); + + metaClient.reloadActiveTimeline(); + List<String> droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty()); + // no dropped partitions + assertEquals(0, droppedPartitions.size()); + + // another commit inserts to new partition + HoodieInstant instant2 = new HoodieInstant(true, COMMIT_ACTION, "00002"); + activeTimeline.createNewInstant(instant2); + activeTimeline.saveAsComplete(instant2, Option.of(getCommitMetadata(basePath, "p2", "00002", 2, Collections.emptyMap()))); + + metaClient.reloadActiveTimeline(); + droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty()); + // no dropped partitions + assertEquals(0, droppedPartitions.size()); + + // clean commit deletes older partition + HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, "00003"); + activeTimeline.createNewInstant(cleanInstant); + activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, "00003", true)); + + metaClient.reloadActiveTimeline(); + droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty()); + // older partition is in the list dropped partitions + assertEquals(1, droppedPartitions.size()); + assertEquals(olderPartition, droppedPartitions.get(0)); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 5fcc750ac5b..b194be57f7a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -159,11 +159,6 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten StructType(tableSchema.filterNot(f => partitionFields.contains(f.name))) } - /** - * The schema of data fields not including hoodie meta fields - */ - lazy val dataSchemaWithoutMetaFields: StructType = removeMetaFields(dataSchema) - /** * The schema of partition fields */ @@ -173,7 +168,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten * All the partition paths, excludes lazily deleted partitions. */ def getPartitionPaths: Seq[String] = { - val droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient.getActiveTimeline) + val droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, org.apache.hudi.common.util.Option.empty(), org.apache.hudi.common.util.Option.empty()) getAllPartitionPaths(spark, table) .filter(!droppedPartitions.contains(_)) diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 2c2d77651cb..9078e9d0711 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -93,10 +93,7 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. */ public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) { - HoodieTimeline timeline = lastCommitTimeSynced.isPresent() - ? TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced) - : metaClient.getActiveTimeline(); - return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline)); + return new HashSet<>(TimelineUtils.getDroppedPartitions(metaClient, lastCommitTimeSynced, lastCommitCompletionTimeSynced)); } @Override
