This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 56159e2855e [HUDI-8610] Fix clustering/replacecommit action inference 
(#12375)
56159e2855e is described below

commit 56159e2855e7165e81caec2f5492b2e364e35b0e
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Nov 30 19:31:51 2024 +0530

    [HUDI-8610] Fix clustering/replacecommit action inference (#12375)
    
    At a few places, the completed instant is used to derive the requested 
instant
    for clustering. Now the pending clustering instants can have different 
actions in
    tv 6 and 8. This patch infers the action based on timeline layout version.
---
 .../hudi/table/action/cluster/ClusteringPlanActionExecutor.java      | 5 ++++-
 .../org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java | 3 ---
 .../common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java   | 4 ++++
 .../src/main/java/org/apache/hudi/common/util/ClusteringUtils.java   | 4 +++-
 4 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 549c382d404..d698053d7b5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -90,8 +91,10 @@ public class ClusteringPlanActionExecutor<T, I, K, O> 
extends BaseActionExecutor
   public Option<HoodieClusteringPlan> execute() {
     Option<HoodieClusteringPlan> planOption = createClusteringPlan();
     if (planOption.isPresent()) {
+      // To support writing and reading with table version SIX, we need to 
allow instant action to be REPLACE_COMMIT_ACTION
+      String action =  
TimelineLayoutVersion.LAYOUT_VERSION_2.equals(table.getMetaClient().getTimelineLayoutVersion())
 ? HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       HoodieInstant clusteringInstant =
-          instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.CLUSTERING_ACTION, instantTime);
+          instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, 
action, instantTime);
       try {
         HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
             .setOperationType(WriteOperationType.CLUSTER.name())
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 5b79ce11b79..48294c96741 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -150,9 +150,6 @@ public class TimelineMetadataUtils {
 
   public static Option<byte[]> serializeCommitMetadata(CommitMetadataSerDe 
commitMetadataSerDe,
                                                        
org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata) throws 
IOException {
-    if (commitMetadata instanceof 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
-      return 
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
 HoodieReplaceCommitMetadata.class);
-    }
     return commitMetadataSerDe.serialize(commitMetadata);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
index 5d5b5c29384..2fbb9e42485 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.timeline.versioning.v2;
 
 import org.apache.hudi.avro.model.HoodieCommitMetadata;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
@@ -71,6 +72,9 @@ public class CommitMetadataSerDeV2 implements 
CommitMetadataSerDe {
 
   @Override
   public Option<byte[]> 
serialize(org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata) 
throws IOException {
+    if (commitMetadata instanceof 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
+      return 
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
 HoodieReplaceCommitMetadata.class);
+    }
     return 
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
 HoodieCommitMetadata.class);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 4b2f19a2327..021486dff7f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantGeneratorV2;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -180,7 +181,8 @@ public class ClusteringUtils {
     } else if (pendingReplaceOrClusterInstant.isRequested()) {
       requestedInstant = pendingReplaceOrClusterInstant;
     } else {
-      requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.CLUSTERING_ACTION, 
pendingReplaceOrClusterInstant.requestedTime());
+      String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
+      requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
     Option<byte[]> content = Option.empty();
     try {

Reply via email to