This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 698694a  [HUDI-1498] Read clustering plan from requested file for 
inflight instant (#2389)
698694a is described below

commit 698694a1571cdcc9848fc79aa34c8cbbf9662bc4
Author: satishkotha <[email protected]>
AuthorDate: Mon Jan 4 10:36:44 2021 -0800

    [HUDI-1498] Read clustering plan from requested file for inflight instant 
(#2389)
---
 .../org/apache/hudi/common/util/ClusteringUtils.java | 20 +++++++++++++++-----
 .../common/table/view/TestIncrementalFSViewSync.java | 15 +++++++++++++--
 .../apache/hudi/common/util/TestClusteringUtils.java | 16 ++++++++++++++++
 3 files changed, 44 insertions(+), 7 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5cdb6fc..fcc3274 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
@@ -68,21 +69,30 @@ public class ClusteringUtils {
         .filter(Option::isPresent).map(Option::get);
   }
 
-  public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant 
requestedReplaceInstant) {
+  public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> 
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant 
pendingReplaceInstant) {
     try {
-      Option<byte[]> content = 
metaClient.getActiveTimeline().getInstantDetails(requestedReplaceInstant);
+      final HoodieInstant requestedInstant;
+      if (!pendingReplaceInstant.isRequested()) {
+        // inflight replacecommit files don't have clustering plan.
+        // This is because replacecommit inflight can have workload profile 
for 'insert_overwrite'.
+        // Get the plan from corresponding requested instant.
+        requestedInstant = 
HoodieTimeline.getReplaceCommitRequestedInstant(pendingReplaceInstant.getTimestamp());
+      } else {
+        requestedInstant = pendingReplaceInstant;
+      }
+      Option<byte[]> content = 
metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
       if (!content.isPresent() || content.get().length == 0) {
         // few operations create requested file without any content. Assume 
these are not clustering
-        LOG.warn("No content found in requested file for instant " + 
requestedReplaceInstant);
+        LOG.warn("No content found in requested file for instant " + 
pendingReplaceInstant);
         return Option.empty();
       }
       HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
       if 
(WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType()))
 {
-        return Option.of(Pair.of(requestedReplaceInstant, 
requestedReplaceMetadata.getClusteringPlan()));
+        return Option.of(Pair.of(pendingReplaceInstant, 
requestedReplaceMetadata.getClusteringPlan()));
       }
       return Option.empty();
     } catch (IOException e) {
-      throw new HoodieIOException("Error reading clustering plan " + 
requestedReplaceInstant.getTimestamp(), e);
+      throw new HoodieIOException("Error reading clustering plan " + 
pendingReplaceInstant.getTimestamp(), e);
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index e4933cf..5400dc4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.view;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.HoodieCleanStat;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -857,11 +859,20 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
   private List<String> addReplaceInstant(HoodieTableMetaClient metaClient, 
String instant,
                                  List<Pair<String, HoodieWriteStat>> 
writeStats,
                                  Map<String, List<String>> 
partitionToReplaceFileIds) throws IOException {
+    // created requested
+    HoodieInstant newRequestedInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instant);
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
+        .setOperationType(WriteOperationType.UNKNOWN.name()).build();
+    
metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant,
+        
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+    
+    metaClient.reloadActiveTimeline();
+    // transition to inflight
+    HoodieInstant inflightInstant = 
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(newRequestedInstant,
 Option.empty());
+    // transition to replacecommit
     HoodieReplaceCommitMetadata replaceCommitMetadata = new 
HoodieReplaceCommitMetadata();
     writeStats.forEach(e -> replaceCommitMetadata.addWriteStat(e.getKey(), 
e.getValue()));
     
replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
-    HoodieInstant inflightInstant = new HoodieInstant(true, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instant);
-    metaClient.getActiveTimeline().createNewInstant(inflightInstant);
     metaClient.getActiveTimeline().saveAsComplete(inflightInstant,
         
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     return writeStats.stream().map(e -> 
e.getValue().getPath()).collect(Collectors.toList());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 5d82bbc..54ca072 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -98,6 +98,22 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     validateClusteringInstant(fileIds3, partitionPath1, clusterTime, 
fileGroupToInstantMap);
   }
 
+  // replacecommit.inflight doesnt have clustering plan. 
+  // Verify that getClusteringPlan fetches content from corresponding 
requested file.
+  @Test
+  public void testClusteringPlanInflight() throws Exception {
+    String partitionPath1 = "partition1";
+    List<String> fileIds1 = new ArrayList<>();
+    fileIds1.add(UUID.randomUUID().toString());
+    fileIds1.add(UUID.randomUUID().toString());
+    String clusterTime1 = "1";
+    HoodieInstant requestedInstant = 
createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+    HoodieInstant inflightInstant = 
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant,
 Option.empty());
+    HoodieClusteringPlan requestedClusteringPlan = 
ClusteringUtils.getClusteringPlan(metaClient, 
requestedInstant).get().getRight();
+    HoodieClusteringPlan inflightClusteringPlan = 
ClusteringUtils.getClusteringPlan(metaClient, inflightInstant).get().getRight();
+    assertEquals(requestedClusteringPlan, inflightClusteringPlan);
+  }
+
   private void validateClusteringInstant(List<String> fileIds, String 
partitionPath,
                                          String expectedInstantTime, 
Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
     for (String fileId : fileIds) {

Reply via email to