This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 11ad4ed  [HUDI-1661] Exclude clustering commits from 
getExtraMetadataFromLatest API (#2632)
11ad4ed is described below

commit 11ad4ed26b6046201945f0e14449e1cbc5b6f1f2
Author: satishkotha <[email protected]>
AuthorDate: Fri Mar 5 13:42:19 2021 -0800

    [HUDI-1661] Exclude clustering commits from getExtraMetadataFromLatest API 
(#2632)
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 39 ++++++++++++++++++-
 .../hudi/common/table/TestTimelineUtils.java       | 45 ++++++++++++++++++----
 2 files changed, 75 insertions(+), 9 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index f9dacf0..de8c582 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -22,9 +22,12 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -43,6 +46,7 @@ import java.util.stream.Stream;
  * 2) Incremental reads - InputFormats can use this API to query
  */
 public class TimelineUtils {
+  private static final Logger LOG = LogManager.getLogger(TimelineUtils.class);
 
   /**
    * Returns partitions that have new data strictly after commitTime.
@@ -117,13 +121,27 @@ public class TimelineUtils {
   }
 
   /**
-   * Get extra metadata for specified key from latest commit/deltacommit 
instant.
+   * Get extra metadata for specified key from latest 
commit/deltacommit/replacecommit(eg. insert_overwrite) instant.
    */
   public static Option<String> 
getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String 
extraMetadataKey) {
-    return 
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant
 ->
+    return 
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
       
+        // exclude clustering commits for returning user stored extra metadata 
+        .filter(instant -> !isClusteringCommit(metaClient, instant))
+        .findFirst().map(instant ->
         getMetadataValue(metaClient, extraMetadataKey, 
instant)).orElse(Option.empty());
   }
 
+
+  /**
+   * Get extra metadata for specified key from latest 
commit/deltacommit/replacecommit instant including internal commits
+   * such as clustering.
+   */
+  public static Option<String> 
getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, 
String extraMetadataKey) {
+    return 
metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
+        .findFirst().map(instant ->
+            getMetadataValue(metaClient, extraMetadataKey, 
instant)).orElse(Option.empty());
+  }
+
   /**
    * Get extra metadata for specified key from all active commit/deltacommit 
instants.
    */
@@ -134,6 +152,7 @@ public class TimelineUtils {
 
   private static Option<String> getMetadataValue(HoodieTableMetaClient 
metaClient, String extraMetadataKey, HoodieInstant instant) {
     try {
+      LOG.info("reading checkpoint info for:"  + instant + " key: " + 
extraMetadataKey);
       HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
           metaClient.getCommitsTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
 
@@ -142,4 +161,20 @@ public class TimelineUtils {
       throw new HoodieIOException("Unable to parse instant metadata " + 
instant, e);
     }
   }
+  
+  private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, 
HoodieInstant instant) {
+    try {
+      if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+        // replacecommit is used for multiple operations: 
insert_overwrite/cluster etc. 
+        // Check operation type to see if this instant is related to 
clustering.
+        HoodieReplaceCommitMetadata replaceMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+        return 
WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType());
+      }
+      
+      return false;
+    } catch (IOException e) {
+      throw new HoodieIOException("Unable to read instant information: " + 
instant + " for " + metaClient.getBasePath(), e);
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 18c0d3f..cf7f6d8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -73,7 +74,8 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     activeTimeline.createNewInstant(instant1);
     // create replace metadata only with replaced file Ids (no new files 
created)
     activeTimeline.saveAsComplete(instant1,
-        Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, 
newFilePartition, 0, Collections.emptyMap())));
+        Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, 
+            newFilePartition, 0, Collections.emptyMap(), 
WriteOperationType.CLUSTER)));
     metaClient.reloadActiveTimeline();
 
     List<String> partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0",
 10));
@@ -85,7 +87,8 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     activeTimeline.createNewInstant(instant2);
     // create replace metadata only with replaced file Ids (no new files 
created)
     activeTimeline.saveAsComplete(instant2,
-        Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, 
newFilePartition, 3, Collections.emptyMap())));
+        Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0,
+            newFilePartition, 3, Collections.emptyMap(), 
WriteOperationType.CLUSTER)));
     metaClient.reloadActiveTimeline();
     partitions = 
TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1",
 10));
     assertEquals(1, partitions.size());
@@ -211,16 +214,42 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     metaClient.reloadActiveTimeline();
 
     // verify modified partitions included cleaned data
-    Option<String> extraLatestValue = 
TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
-    assertTrue(extraLatestValue.isPresent());
-    assertEquals(extraMetadataValue1, extraLatestValue.get());
+    verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, 
false);
+    assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, 
"unknownKey").isPresent());
+    
+    // verify adding clustering commit doesnt change behavior of 
getExtraMetadataFromLatest
+    String ts2 = "2";
+    HoodieInstant instant2 = new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, ts2);
+    activeTimeline.createNewInstant(instant2);
+    String newValueForMetadata = "newValue2";
+    extraMetadata.put(extraMetadataKey, newValueForMetadata);
+    activeTimeline.saveAsComplete(instant2,
+        Option.of(getReplaceCommitMetadata(basePath, ts2, "p2", 0,
+            "p2", 3, extraMetadata, WriteOperationType.CLUSTER)));
+    metaClient.reloadActiveTimeline();
+    
+    verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, 
false);
+    verifyExtraMetadataLatestValue(extraMetadataKey, newValueForMetadata, 
true);
     assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, 
"unknownKey").isPresent());
 
     Map<String, Option<String>> extraMetadataEntries = 
TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
-    assertEquals(2, extraMetadataEntries.size());
+    assertEquals(3, extraMetadataEntries.size());
     assertFalse(extraMetadataEntries.get("0").isPresent());
     assertTrue(extraMetadataEntries.get("1").isPresent());
     assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
+    assertTrue(extraMetadataEntries.get("2").isPresent());
+    assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
+  }
+  
+  private void verifyExtraMetadataLatestValue(String extraMetadataKey, String 
expected, boolean includeClustering) {
+    final Option<String> extraLatestValue;
+    if (includeClustering) {       
+      extraLatestValue = 
TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient, 
extraMetadataKey);
+    } else {
+      extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, 
extraMetadataKey);
+    }
+    assertTrue(extraLatestValue.isPresent());
+    assertEquals(expected, extraLatestValue.get());
   }
 
   private byte[] getRestoreMetadata(String basePath, String partition, String 
commitTs, int count, String actionType) throws IOException {
@@ -265,9 +294,11 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
   }
 
   private byte[] getReplaceCommitMetadata(String basePath, String commitTs, 
String replacePartition, int replaceCount,
-      String newFilePartition, int newFileCount, Map<String, String> 
extraMetadata)
+                                          String newFilePartition, int 
newFileCount, Map<String, String> extraMetadata,
+                                          WriteOperationType operationType)
       throws IOException {
     HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
+    commit.setOperationType(operationType);
     for (int i = 1; i <= newFileCount; i++) {
       HoodieWriteStat stat = new HoodieWriteStat();
       stat.setFileId(i + "");

Reply via email to