This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e9a2c5f0a51 [HUDI-7284] Fix cluster stream sync check (#10501)
e9a2c5f0a51 is described below
commit e9a2c5f0a5131f82c9020e6f43a42f65136bc65f
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jan 19 15:26:08 2024 -0500
[HUDI-7284] Fix cluster stream sync check (#10501)
Co-authored-by: Jonathan Vexler <=>
---
.../table/timeline/HoodieDefaultTimeline.java | 17 +++---------
.../apache/hudi/common/util/ClusteringUtils.java | 30 ++++++++++++++++------
.../hudi/common/util/TestClusteringUtils.java | 15 +++++++++++
3 files changed, 40 insertions(+), 22 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index ac0cb83cf15..1918a944fea 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -517,21 +518,9 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public Option<HoodieInstant> getLastPendingClusterCommit() {
- return Option.fromJavaOptional(getCommitsTimeline().filter(s ->
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+ return Option.fromJavaOptional(filterPendingReplaceTimeline()
.getReverseOrderedInstants()
- .filter(i -> {
- try {
- if (!i.isCompleted()) {
- HoodieCommitMetadata metadata =
TimelineUtils.getCommitMetadata(i, this);
- return
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
- } else {
- return false;
- }
- } catch (IOException e) {
- LOG.warn("Unable to read commit metadata for " + i + " due to " +
e.getMessage());
- return false;
- }
- }).findFirst());
+ .filter(i -> ClusteringUtils.isPendingClusteringInstant(this,
i)).findFirst());
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index eea644a0bbc..b7ed15a07af 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -82,12 +82,12 @@ public class ClusteringUtils {
/**
* Get requested replace metadata from timeline.
- * @param metaClient
- * @param pendingReplaceInstant
- * @return
+ * @param timeline used to get the bytes stored in the requested replace
instant in the timeline
+ * @param pendingReplaceInstant can be in any state, because it will always
be converted to requested state
+ * @return option of the replace metadata if present, else empty
* @throws IOException
*/
- private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant
pendingReplaceInstant) throws IOException {
+ private static Option<HoodieRequestedReplaceMetadata>
getRequestedReplaceMetadata(HoodieTimeline timeline, HoodieInstant
pendingReplaceInstant) throws IOException {
final HoodieInstant requestedInstant;
if (!pendingReplaceInstant.isRequested()) {
// inflight replacecommit files don't have clustering plan.
@@ -97,7 +97,7 @@ public class ClusteringUtils {
} else {
requestedInstant = pendingReplaceInstant;
}
- Option<byte[]> content =
metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
+ Option<byte[]> content = timeline.getInstantDetails(requestedInstant);
if (!content.isPresent() || content.get().length == 0) {
// few operations create requested file without any content. Assume
these are not clustering
return Option.empty();
@@ -107,13 +107,23 @@ public class ClusteringUtils {
/**
* Get Clustering plan from timeline.
- * @param metaClient
+ * @param metaClient used to get the active timeline
+ * @param pendingReplaceInstant can be in any state, because it will always
be converted to requested state
+ * @return option of the replace metadata if present, else empty
+ */
+ public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant
pendingReplaceInstant) {
+ return getClusteringPlan(metaClient.getActiveTimeline(),
pendingReplaceInstant);
+ }
+
+ /**
+ * Get Clustering plan from timeline.
+ * @param timeline
* @param pendingReplaceInstant
* @return
*/
- public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant
pendingReplaceInstant) {
+ public static Option<Pair<HoodieInstant, HoodieClusteringPlan>>
getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant)
{
try {
- Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(metaClient, pendingReplaceInstant);
+ Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata =
getRequestedReplaceMetadata(timeline, pendingReplaceInstant);
if (requestedReplaceMetadata.isPresent() &&
WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType()))
{
return Option.of(Pair.of(pendingReplaceInstant,
requestedReplaceMetadata.get().getClusteringPlan()));
}
@@ -235,6 +245,10 @@ public class ClusteringUtils {
return getClusteringPlan(metaClient, instant).isPresent();
}
+ public static boolean isPendingClusteringInstant(HoodieTimeline timeline,
HoodieInstant instant) {
+ return getClusteringPlan(timeline, instant).isPresent();
+ }
+
/**
* Returns the earliest instant to retain.
* Make sure the clustering instant won't be archived before cleaned, and
the earliest inflight clustering instant has a previous commit.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index fb3adaf1c01..bd8dc707651 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -104,6 +104,21 @@ public class TestClusteringUtils extends
HoodieCommonTestHarness {
validateClusteringInstant(fileIds1, partitionPath1, clusterTime1,
fileGroupToInstantMap);
validateClusteringInstant(fileIds2, partitionPath1, clusterTime,
fileGroupToInstantMap);
validateClusteringInstant(fileIds3, partitionPath1, clusterTime,
fileGroupToInstantMap);
+ Option<HoodieInstant> lastPendingClustering =
metaClient.getActiveTimeline().getLastPendingClusterCommit();
+ assertTrue(lastPendingClustering.isPresent());
+ assertEquals("2", lastPendingClustering.get().getTimestamp());
+
+ //check that it still gets picked if it is inflight
+ HoodieInstant inflight =
metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(lastPendingClustering.get(),
Option.empty());
+ assertEquals(HoodieInstant.State.INFLIGHT, inflight.getState());
+ lastPendingClustering =
metaClient.reloadActiveTimeline().getLastPendingClusterCommit();
+ assertEquals("2", lastPendingClustering.get().getTimestamp());
+
+ //now that it is complete, the first instant should be picked
+ HoodieInstant complete =
metaClient.getActiveTimeline().transitionReplaceInflightToComplete(false,
inflight, Option.empty());
+ assertEquals(HoodieInstant.State.COMPLETED, complete.getState());
+ lastPendingClustering =
metaClient.reloadActiveTimeline().getLastPendingClusterCommit();
+ assertEquals("1", lastPendingClustering.get().getTimestamp());
}
// replacecommit.inflight doesn't have clustering plan.