This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2af83e2d9a8 [HUDI-7411] Meta sync should consider cleaner commit
(#10676)
2af83e2d9a8 is described below
commit 2af83e2d9a8fbb6cc33fdf29e38b72684c2da4ca
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Mar 8 23:34:53 2024 +0530
[HUDI-7411] Meta sync should consider cleaner commit (#10676)
---
.../hudi/common/table/timeline/TimelineUtils.java | 27 ++++++++++---
.../hudi/common/table/TestTimelineUtils.java | 46 ++++++++++++++++++++--
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 7 +---
.../apache/hudi/sync/common/HoodieSyncClient.java | 5 +--
4 files changed, 67 insertions(+), 18 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 5e710800d6f..dbe8f83fdbe 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -81,13 +81,15 @@ public class TimelineUtils {
}
/**
- * Returns partitions that have been deleted or marked for deletion in the
given timeline.
+ * Returns partitions that have been deleted or marked for deletion in the
timeline between given commit time range.
* Does not include internal operations such as clean in the timeline.
*/
- public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
+ public static List<String> getDroppedPartitions(HoodieTableMetaClient
metaClient, Option<String> lastCommitTimeSynced, Option<String>
lastCommitCompletionTimeSynced) {
+ HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+ ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
+ : metaClient.getActiveTimeline();
HoodieTimeline completedTimeline =
timeline.getWriteTimeline().filterCompletedInstants();
HoodieTimeline replaceCommitTimeline =
completedTimeline.getCompletedReplaceTimeline();
-
Map<String, String> partitionToLatestDeleteTimestamp =
replaceCommitTimeline.getInstantsAsStream()
.map(instant -> {
try {
@@ -102,6 +104,21 @@ public class TimelineUtils {
.flatMap(pair ->
pair.getRight().getPartitionToReplaceFileIds().keySet().stream()
.map(partition -> new AbstractMap.SimpleEntry<>(partition,
pair.getLeft().getTimestamp()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
(existing, replace) -> replace));
+ // cleaner could delete a partition when there are no active filegroups in
the partition
+ HoodieTimeline cleanerTimeline =
metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
+ cleanerTimeline.getInstantsAsStream()
+ .forEach(instant -> {
+ try {
+ HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(cleanerTimeline.getInstantDetails(instant).get());
+ cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
+ if (partitionMetadata.getIsPartitionDeleted()) {
+ partitionToLatestDeleteTimestamp.put(partition,
instant.getTimestamp());
+ }
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get partitions cleaned at "
+ instant, e);
+ }
+ });
if (partitionToLatestDeleteTimestamp.isEmpty()) {
// There is no dropped partitions
@@ -244,7 +261,7 @@ public class TimelineUtils {
return false;
} catch (IOException e) {
- throw new HoodieIOException("Unable to read instant information: " +
instant + " for " + metaClient.getBasePath(), e);
+ throw new HoodieIOException("Unable to read instant information: " +
instant + " for " + metaClient.getBasePathV2().toString(), e);
}
}
@@ -440,7 +457,7 @@ public class TimelineUtils {
}
public enum HollowCommitHandling {
- FAIL, BLOCK, USE_TRANSITION_TIME;
+ FAIL, BLOCK, USE_TRANSITION_TIME
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index c81a05b4c20..d258753c3a8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -158,7 +158,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
activeTimeline.createNewInstant(cleanInstant);
- activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(olderPartition, ts));
+ activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(olderPartition, ts, false));
}
metaClient.reloadActiveTimeline();
@@ -197,7 +197,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
activeTimeline.createNewInstant(cleanInstant);
- activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(partitionPath, ts));
+ activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(partitionPath, ts, false));
}
metaClient.reloadActiveTimeline();
@@ -553,7 +553,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
return serializeCommitMetadata(commit).get();
}
- private Option<byte[]> getCleanMetadata(String partition, String time)
throws IOException {
+ private Option<byte[]> getCleanMetadata(String partition, String time,
boolean isPartitionDeleted) throws IOException {
Map<String, HoodieCleanPartitionMetadata> partitionToFilesCleaned = new
HashMap<>();
List<String> filesDeleted = new ArrayList<>();
filesDeleted.add("file-" + partition + "-" + time + "1");
@@ -564,6 +564,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
.setFailedDeleteFiles(Collections.emptyList())
.setDeletePathPatterns(Collections.emptyList())
.setSuccessDeleteFiles(filesDeleted)
+ .setIsPartitionDeleted(isPartitionDeleted)
.build();
partitionToFilesCleaned.putIfAbsent(partition, partitionMetadata);
HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata.newBuilder()
@@ -611,4 +612,43 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
fail("should cover all handling mode.");
}
}
+
+ @Test
+ public void testGetDroppedPartitions() throws Exception {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+ assertTrue(activeCommitTimeline.empty());
+
+ String olderPartition = "p1"; // older partitions that will be deleted by
clean commit
+ // first insert to the older partition
+ HoodieInstant instant1 = new HoodieInstant(true, COMMIT_ACTION, "00001");
+ activeTimeline.createNewInstant(instant1);
+ activeTimeline.saveAsComplete(instant1,
Option.of(getCommitMetadata(basePath, olderPartition, "00001", 2,
Collections.emptyMap())));
+
+ metaClient.reloadActiveTimeline();
+ List<String> droppedPartitions =
TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty());
+ // no dropped partitions
+ assertEquals(0, droppedPartitions.size());
+
+ // another commit inserts to new partition
+ HoodieInstant instant2 = new HoodieInstant(true, COMMIT_ACTION, "00002");
+ activeTimeline.createNewInstant(instant2);
+ activeTimeline.saveAsComplete(instant2,
Option.of(getCommitMetadata(basePath, "p2", "00002", 2,
Collections.emptyMap())));
+
+ metaClient.reloadActiveTimeline();
+ droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient,
Option.empty(), Option.empty());
+ // no dropped partitions
+ assertEquals(0, droppedPartitions.size());
+
+ // clean commit deletes older partition
+ HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION,
"00003");
+ activeTimeline.createNewInstant(cleanInstant);
+ activeTimeline.saveAsComplete(cleanInstant,
getCleanMetadata(olderPartition, "00003", true));
+
+ metaClient.reloadActiveTimeline();
+ droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient,
Option.empty(), Option.empty());
+ // older partition is in the list dropped partitions
+ assertEquals(1, droppedPartitions.size());
+ assertEquals(olderPartition, droppedPartitions.get(0));
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index c1414fe77fe..bfd5613feba 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -159,11 +159,6 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
StructType(tableSchema.filterNot(f => partitionFields.contains(f.name)))
}
- /**
- * The schema of data fields not including hoodie meta fields
- */
- lazy val dataSchemaWithoutMetaFields: StructType =
removeMetaFields(dataSchema)
-
/**
* The schema of partition fields
*/
@@ -173,7 +168,7 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
* All the partition paths, excludes lazily deleted partitions.
*/
def getPartitionPaths: Seq[String] = {
- val droppedPartitions =
TimelineUtils.getDroppedPartitions(metaClient.getActiveTimeline)
+ val droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient,
org.apache.hudi.common.util.Option.empty(),
org.apache.hudi.common.util.Option.empty())
getAllPartitionPaths(spark, table)
.filter(!droppedPartitions.contains(_))
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index aec0e484e6c..fc3e31164ac 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -92,10 +92,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
* Going through archive timeline is a costly operation, and it should be
avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
- HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
- ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
- : metaClient.getActiveTimeline();
- return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
+ return new HashSet<>(TimelineUtils.getDroppedPartitions(metaClient,
lastCommitTimeSynced, lastCommitCompletionTimeSynced));
}
@Override