nsivabalan commented on code in PR #18302:
URL: https://github.com/apache/hudi/pull/18302#discussion_r2977881497
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -235,14 +237,45 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Columns to sort the data by when clustering");
+ static final String SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy";
+
+ static final String SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy";
+
public static final ConfigProperty<String> UPDATES_STRATEGY = ConfigProperty
.key("hoodie.clustering.updates.strategy")
-
.defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
+ .noDefaultValue()
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME);
+ }
+ return Option.of(SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME);
+ })
.markAdvanced()
.sinceVersion("0.7.0")
.withDocumentation("Determines how to handle updates, deletes to file
groups that are under clustering."
+ " Default strategy just rejects the update");
+ public static final ConfigProperty<Boolean> ENABLE_EXPIRATIONS =
ConfigProperty
+ .key("hoodie.clustering.enable.expirations")
+ .defaultValue(false)
+ .markAdvanced()
+ .withDocumentation("When enabled, rollback of failed writes (under LAZY
cleaning policy) will also attempt to rollback "
+ + "clustering replacecommit instants whose heartbeat has expired.
Clustering jobs will start a heartbeat before "
+ + "scheduling a plan, so that other writers can detect stale/failed
clustering attempts. Note that the same "
+ + "client must be used to schedule, execute, and commit the
clustering instant. And a clustering plan cannot be "
+ + "re-attempted");
+
+ public static final ConfigProperty<Long> EXPIRATION_TIME_MINS =
ConfigProperty
Review Comment:
sorry. my bad. `hoodie.clustering.expiration.threshold.mins`
`time` might give a wrong notion of absolute value. but here, we are
referring to interval or threshold.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java:
##########
@@ -242,6 +251,231 @@ void
cleanerPlanIsCalledWithInflightCleanAndAllowMultipleCleans() throws IOExcep
verify(mockMetaClient).reloadActiveTimeline();
}
+ // --- Tests for clustering expiration logic ---
+
+ @Test
+ void isClusteringInstantEligibleForRollback_returnsFalseWhenConfigDisabled()
throws IOException {
Review Comment:
can we parametrize these tests or both v6 and v9. bcoz, the requested and
inflight have different actions in the timeline in these table versions.
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java:
##########
@@ -242,6 +251,231 @@ void
cleanerPlanIsCalledWithInflightCleanAndAllowMultipleCleans() throws IOExcep
verify(mockMetaClient).reloadActiveTimeline();
}
+ // --- Tests for clustering expiration logic ---
+
+ @Test
+ void isClusteringInstantEligibleForRollback_returnsFalseWhenConfigDisabled()
throws IOException {
Review Comment:
similarly for some of tests added here, can we parametrize for v6 and v9?
lets be tactical. lets not parametrize all tests, but a good no of tests to
give us good coverage
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
Review Comment:
can we make it static then.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
Review Comment:
sg
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationTimeMins())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) {
Review Comment:
sg.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1155,13 +1178,37 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationTimeMins())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) {
Review Comment:
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]