This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2af83e2d9a8 [HUDI-7411] Meta sync should consider cleaner commit 
(#10676)
2af83e2d9a8 is described below

commit 2af83e2d9a8fbb6cc33fdf29e38b72684c2da4ca
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Mar 8 23:34:53 2024 +0530

    [HUDI-7411] Meta sync should consider cleaner commit (#10676)
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 27 ++++++++++---
 .../hudi/common/table/TestTimelineUtils.java       | 46 ++++++++++++++++++++--
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  7 +---
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  5 +--
 4 files changed, 67 insertions(+), 18 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 5e710800d6f..dbe8f83fdbe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -81,13 +81,15 @@ public class TimelineUtils {
   }
 
   /**
-   * Returns partitions that have been deleted or marked for deletion in the 
given timeline.
+   * Returns partitions that have been deleted or marked for deletion in the 
timeline between given commit time range.
    * Does not include internal operations such as clean in the timeline.
    */
-  public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
+  public static List<String> getDroppedPartitions(HoodieTableMetaClient 
metaClient, Option<String> lastCommitTimeSynced, Option<String> 
lastCommitCompletionTimeSynced) {
+    HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
+        : metaClient.getActiveTimeline();
     HoodieTimeline completedTimeline = 
timeline.getWriteTimeline().filterCompletedInstants();
     HoodieTimeline replaceCommitTimeline = 
completedTimeline.getCompletedReplaceTimeline();
-
     Map<String, String> partitionToLatestDeleteTimestamp = 
replaceCommitTimeline.getInstantsAsStream()
         .map(instant -> {
           try {
@@ -102,6 +104,21 @@ public class TimelineUtils {
         .flatMap(pair -> 
pair.getRight().getPartitionToReplaceFileIds().keySet().stream()
             .map(partition -> new AbstractMap.SimpleEntry<>(partition, 
pair.getLeft().getTimestamp()))
         ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
(existing, replace) -> replace));
+    // cleaner could delete a partition when there are no active filegroups in 
the partition
+    HoodieTimeline cleanerTimeline = 
metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
+    cleanerTimeline.getInstantsAsStream()
+        .forEach(instant -> {
+          try {
+            HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(cleanerTimeline.getInstantDetails(instant).get());
+            cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
+              if (partitionMetadata.getIsPartitionDeleted()) {
+                partitionToLatestDeleteTimestamp.put(partition, 
instant.getTimestamp());
+              }
+            });
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get partitions cleaned at " 
+ instant, e);
+          }
+        });
 
     if (partitionToLatestDeleteTimestamp.isEmpty()) {
       // There is no dropped partitions
@@ -244,7 +261,7 @@ public class TimelineUtils {
 
       return false;
     } catch (IOException e) {
-      throw new HoodieIOException("Unable to read instant information: " + 
instant + " for " + metaClient.getBasePath(), e);
+      throw new HoodieIOException("Unable to read instant information: " + 
instant + " for " + metaClient.getBasePathV2().toString(), e);
     }
   }
 
@@ -440,7 +457,7 @@ public class TimelineUtils {
   }
 
   public enum HollowCommitHandling {
-    FAIL, BLOCK, USE_TRANSITION_TIME;
+    FAIL, BLOCK, USE_TRANSITION_TIME
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index c81a05b4c20..d258753c3a8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -158,7 +158,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
 
       HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
       activeTimeline.createNewInstant(cleanInstant);
-      activeTimeline.saveAsComplete(cleanInstant, 
getCleanMetadata(olderPartition, ts));
+      activeTimeline.saveAsComplete(cleanInstant, 
getCleanMetadata(olderPartition, ts, false));
     }
 
     metaClient.reloadActiveTimeline();
@@ -197,7 +197,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
 
       HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
       activeTimeline.createNewInstant(cleanInstant);
-      activeTimeline.saveAsComplete(cleanInstant, 
getCleanMetadata(partitionPath, ts));
+      activeTimeline.saveAsComplete(cleanInstant, 
getCleanMetadata(partitionPath, ts, false));
     }
 
     metaClient.reloadActiveTimeline();
@@ -553,7 +553,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     return serializeCommitMetadata(commit).get();
   }
 
-  private Option<byte[]> getCleanMetadata(String partition, String time) 
throws IOException {
+  private Option<byte[]> getCleanMetadata(String partition, String time, 
boolean isPartitionDeleted) throws IOException {
     Map<String, HoodieCleanPartitionMetadata> partitionToFilesCleaned = new 
HashMap<>();
     List<String> filesDeleted = new ArrayList<>();
     filesDeleted.add("file-" + partition + "-" + time + "1");
@@ -564,6 +564,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
         .setFailedDeleteFiles(Collections.emptyList())
         .setDeletePathPatterns(Collections.emptyList())
         .setSuccessDeleteFiles(filesDeleted)
+        .setIsPartitionDeleted(isPartitionDeleted)
         .build();
     partitionToFilesCleaned.putIfAbsent(partition, partitionMetadata);
     HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata.newBuilder()
@@ -611,4 +612,43 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
         fail("should cover all handling mode.");
     }
   }
+
+  @Test
+  public void testGetDroppedPartitions() throws Exception {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
+    assertTrue(activeCommitTimeline.empty());
+
+    String olderPartition = "p1"; // older partitions that will be deleted by 
clean commit
+    // first insert to the older partition
+    HoodieInstant instant1 = new HoodieInstant(true, COMMIT_ACTION, "00001");
+    activeTimeline.createNewInstant(instant1);
+    activeTimeline.saveAsComplete(instant1, 
Option.of(getCommitMetadata(basePath, olderPartition, "00001", 2, 
Collections.emptyMap())));
+
+    metaClient.reloadActiveTimeline();
+    List<String> droppedPartitions = 
TimelineUtils.getDroppedPartitions(metaClient, Option.empty(), Option.empty());
+    // no dropped partitions
+    assertEquals(0, droppedPartitions.size());
+
+    // another commit inserts to new partition
+    HoodieInstant instant2 = new HoodieInstant(true, COMMIT_ACTION, "00002");
+    activeTimeline.createNewInstant(instant2);
+    activeTimeline.saveAsComplete(instant2, 
Option.of(getCommitMetadata(basePath, "p2", "00002", 2, 
Collections.emptyMap())));
+
+    metaClient.reloadActiveTimeline();
+    droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, 
Option.empty(), Option.empty());
+    // no dropped partitions
+    assertEquals(0, droppedPartitions.size());
+
+    // clean commit deletes older partition
+    HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, 
"00003");
+    activeTimeline.createNewInstant(cleanInstant);
+    activeTimeline.saveAsComplete(cleanInstant, 
getCleanMetadata(olderPartition, "00003", true));
+
+    metaClient.reloadActiveTimeline();
+    droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, 
Option.empty(), Option.empty());
+    // older partition is in the list dropped partitions
+    assertEquals(1, droppedPartitions.size());
+    assertEquals(olderPartition, droppedPartitions.get(0));
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index c1414fe77fe..bfd5613feba 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -159,11 +159,6 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
     StructType(tableSchema.filterNot(f => partitionFields.contains(f.name)))
   }
 
-  /**
-   * The schema of data fields not including hoodie meta fields
-   */
-  lazy val dataSchemaWithoutMetaFields: StructType = 
removeMetaFields(dataSchema)
-
   /**
    * The schema of partition fields
    */
@@ -173,7 +168,7 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
    * All the partition paths, excludes lazily deleted partitions.
    */
   def getPartitionPaths: Seq[String] = {
-    val droppedPartitions = 
TimelineUtils.getDroppedPartitions(metaClient.getActiveTimeline)
+    val droppedPartitions = TimelineUtils.getDroppedPartitions(metaClient, 
org.apache.hudi.common.util.Option.empty(), 
org.apache.hudi.common.util.Option.empty())
 
     getAllPartitionPaths(spark, table)
       .filter(!droppedPartitions.contains(_))
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index aec0e484e6c..fc3e31164ac 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -92,10 +92,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
    * Going through archive timeline is a costly operation, and it should be 
avoided unless some start time is given.
    */
   public Set<String> getDroppedPartitionsSince(Option<String> 
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
-    HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
-        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
-        : metaClient.getActiveTimeline();
-    return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
+    return new HashSet<>(TimelineUtils.getDroppedPartitions(metaClient, 
lastCommitTimeSynced, lastCommitCompletionTimeSynced));
   }
 
   @Override

Reply via email to