This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 56159e2855e [HUDI-8610] Fix clustering/replacecommit action inference
(#12375)
56159e2855e is described below
commit 56159e2855e7165e81caec2f5492b2e364e35b0e
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Nov 30 19:31:51 2024 +0530
[HUDI-8610] Fix clustering/replacecommit action inference (#12375)
At a few places, the completed instant is used to derive the requested
instant
for clustering. Now the pending clustering instants can have different
actions in
tv 6 and 8. This patch infers the action based on timeline layout version.
---
.../hudi/table/action/cluster/ClusteringPlanActionExecutor.java | 5 ++++-
.../org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java | 3 ---
.../common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java | 4 ++++
.../src/main/java/org/apache/hudi/common/util/ClusteringUtils.java | 4 +++-
4 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 549c382d404..d698053d7b5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -90,8 +91,10 @@ public class ClusteringPlanActionExecutor<T, I, K, O>
extends BaseActionExecutor
public Option<HoodieClusteringPlan> execute() {
Option<HoodieClusteringPlan> planOption = createClusteringPlan();
if (planOption.isPresent()) {
+ // To support writing and reading with table version SIX, we need to
allow instant action to be REPLACE_COMMIT_ACTION
+ String action =
TimelineLayoutVersion.LAYOUT_VERSION_2.equals(table.getMetaClient().getTimelineLayoutVersion())
? HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
HoodieInstant clusteringInstant =
- instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, instantTime);
+ instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED,
action, instantTime);
try {
HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.CLUSTER.name())
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 5b79ce11b79..48294c96741 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -150,9 +150,6 @@ public class TimelineMetadataUtils {
public static Option<byte[]> serializeCommitMetadata(CommitMetadataSerDe
commitMetadataSerDe,
org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata) throws
IOException {
- if (commitMetadata instanceof
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
- return
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
HoodieReplaceCommitMetadata.class);
- }
return commitMetadataSerDe.serialize(commitMetadata);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
index 5d5b5c29384..2fbb9e42485 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/CommitMetadataSerDeV2.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.timeline.versioning.v2;
import org.apache.hudi.avro.model.HoodieCommitMetadata;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.MetadataConversionUtils;
@@ -71,6 +72,9 @@ public class CommitMetadataSerDeV2 implements
CommitMetadataSerDe {
@Override
public Option<byte[]>
serialize(org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata)
throws IOException {
+ if (commitMetadata instanceof
org.apache.hudi.common.model.HoodieReplaceCommitMetadata) {
+ return
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
HoodieReplaceCommitMetadata.class);
+ }
return
serializeAvroMetadata(MetadataConversionUtils.convertCommitMetadata(commitMetadata),
HoodieCommitMetadata.class);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 4b2f19a2327..021486dff7f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantGeneratorV2;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -180,7 +181,8 @@ public class ClusteringUtils {
} else if (pendingReplaceOrClusterInstant.isRequested()) {
requestedInstant = pendingReplaceOrClusterInstant;
} else {
- requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION,
pendingReplaceOrClusterInstant.requestedTime());
+ String action = factory instanceof InstantGeneratorV2 ?
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
+ requestedInstant =
factory.createNewInstant(HoodieInstant.State.REQUESTED, action,
pendingReplaceOrClusterInstant.requestedTime());
}
Option<byte[]> content = Option.empty();
try {