nsivabalan commented on code in PR #11553:
URL: https://github.com/apache/hudi/pull/11553#discussion_r1676772962
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -70,11 +70,67 @@ public class ClusteringUtils {
public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>>
getAllPendingClusteringPlans(
HoodieTableMetaClient metaClient) {
List<HoodieInstant> pendingClusterInstants =
-
metaClient.getActiveTimeline().filterPendingClusterTimeline().getInstants();
+
metaClient.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants();
return pendingClusterInstants.stream().map(instant ->
getClusteringPlan(metaClient, instant))
.filter(Option::isPresent).map(Option::get);
}
+ /**
+ * Returns the pending clustering instant. This can be older pending replace
commit or a new
+ * clustering inflight commit. After HUDI-7905, all the requested and
inflight clustering instants
+ * use clustering action instead of replacecommit.
+ */
+ public static Option<HoodieInstant> getInflightClusteringInstant(String
timestamp, HoodieActiveTimeline activeTimeline) {
+ HoodieTimeline pendingReplaceOrClusterTimeline =
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+ HoodieInstant inflightInstant =
HoodieTimeline.getClusteringCommitInflightInstant(timestamp);
+ if (pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)) {
+ return Option.of(inflightInstant);
+ }
+ inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(timestamp);
+ return
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)
? inflightInstant : null);
+ }
+
+ /**
+ * Returns the pending clustering instant. This can be older requested
replace commit or a new
+ * clustering requested commit. After HUDI-7905, all the requested and
inflight clustering instants
+ * use clustering action instead of replacecommit.
+ */
+ public static Option<HoodieInstant> getRequestedClusteringInstant(String
timestamp, HoodieActiveTimeline activeTimeline) {
+ HoodieTimeline pendingReplaceOrClusterTimeline =
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+ HoodieInstant requestedInstant =
HoodieTimeline.getClusteringCommitRequestedInstant(timestamp);
+ if (pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)) {
+ return Option.of(requestedInstant);
+ }
+ requestedInstant =
HoodieTimeline.getReplaceCommitRequestedInstant(timestamp);
+ return
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)
? requestedInstant : null);
+ }
+
+ /**
+ * Transitions the provided clustering instant fron inflight to complete
based on the clustering
+ * action type. After HUDI-7905, the new clustering commits are written with
clustering action.
+ */
+ public static void transitionClusterInflightToComplete(boolean shouldLock,
HoodieInstant clusteringInstant,
Review Comment:
oh 1 thing. if there is a pending replacecommit(just requested) in timeline
just before upgrade, then we need to use replacecommit (and not clustering) for
inflight right?
are we handling this case.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -70,11 +70,67 @@ public class ClusteringUtils {
public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>>
getAllPendingClusteringPlans(
HoodieTableMetaClient metaClient) {
List<HoodieInstant> pendingClusterInstants =
-
metaClient.getActiveTimeline().filterPendingClusterTimeline().getInstants();
+
metaClient.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants();
return pendingClusterInstants.stream().map(instant ->
getClusteringPlan(metaClient, instant))
.filter(Option::isPresent).map(Option::get);
}
+ /**
+ * Returns the pending clustering instant. This can be older pending replace
commit or a new
+ * clustering inflight commit. After HUDI-7905, all the requested and
inflight clustering instants
+ * use clustering action instead of replacecommit.
+ */
+ public static Option<HoodieInstant> getInflightClusteringInstant(String
timestamp, HoodieActiveTimeline activeTimeline) {
+ HoodieTimeline pendingReplaceOrClusterTimeline =
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+ HoodieInstant inflightInstant =
HoodieTimeline.getClusteringCommitInflightInstant(timestamp);
+ if (pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)) {
+ return Option.of(inflightInstant);
+ }
+ inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(timestamp);
+ return
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(inflightInstant)
? inflightInstant : null);
+ }
+
+ /**
+ * Returns the pending clustering instant. This can be older requested
replace commit or a new
+ * clustering requested commit. After HUDI-7905, all the requested and
inflight clustering instants
+ * use clustering action instead of replacecommit.
+ */
+ public static Option<HoodieInstant> getRequestedClusteringInstant(String
timestamp, HoodieActiveTimeline activeTimeline) {
+ HoodieTimeline pendingReplaceOrClusterTimeline =
activeTimeline.filterPendingReplaceOrClusteringTimeline();
+ HoodieInstant requestedInstant =
HoodieTimeline.getClusteringCommitRequestedInstant(timestamp);
+ if (pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)) {
+ return Option.of(requestedInstant);
+ }
+ requestedInstant =
HoodieTimeline.getReplaceCommitRequestedInstant(timestamp);
+ return
Option.ofNullable(pendingReplaceOrClusterTimeline.containsInstant(requestedInstant)
? requestedInstant : null);
+ }
+
+ /**
+ * Transitions the provided clustering instant fron inflight to complete
based on the clustering
+ * action type. After HUDI-7905, the new clustering commits are written with
clustering action.
+ */
+ public static void transitionClusterInflightToComplete(boolean shouldLock,
HoodieInstant clusteringInstant,
+ Option<byte[]>
commitMetadata, HoodieActiveTimeline activeTimeline) {
+ if
(clusteringInstant.getAction().equals(HoodieTimeline.CLUSTERING_ACTION)) {
+ activeTimeline.transitionClusterInflightToComplete(shouldLock,
clusteringInstant, commitMetadata);
+ } else {
+ activeTimeline.transitionReplaceInflightToComplete(shouldLock,
clusteringInstant, commitMetadata);
+ }
+ }
+
+ /**
+ * Transitions the provided clustering instant fron requested to inflight
based on the clustering
+ * action type. After HUDI-7905, the new clustering commits are written with
clustering action.
+ */
+ public static void transitionClusterRequestedToInflight(HoodieInstant
requestedClusteringInstant, Option<byte[]> data,
+ HoodieActiveTimeline
activeTimeline) {
+ if
(requestedClusteringInstant.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
{
+
activeTimeline.transitionClusterRequestedToInflight(requestedClusteringInstant,
data);
Review Comment:
ok I see it here. depending on the requested instant's action type, we
follow suit here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]