prashantwason commented on code in PR #18089:
URL: https://github.com/apache/hudi/pull/18089#discussion_r2955335974
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java:
##########
@@ -54,12 +54,24 @@ public Stream<HoodieInstant>
getCandidateInstants(HoodieTableMetaClient metaClie
HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
if (ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant,
metaClient.getInstantGenerator())
|| COMPACTION_ACTION.equals(currentInstant.getAction())) {
- return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant);
+ return
Stream.concat(getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant),
+ getCandidateInstantsForRollbackConflict(activeTimeline,
currentInstant));
Review Comment:
Good point. Removed `getCandidateInstantsForRollbackConflict` from the table
services path. Table service rollbacks are done by table service jobs/writers
only, not by ingestion threads, so rollback conflict detection is only needed
for ingestion commits.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/PreferWriterConflictResolutionStrategy.java:
##########
@@ -54,12 +54,24 @@ public Stream<HoodieInstant>
getCandidateInstants(HoodieTableMetaClient metaClie
HoodieActiveTimeline activeTimeline = metaClient.reloadActiveTimeline();
if (ClusteringUtils.isClusteringInstant(activeTimeline, currentInstant,
metaClient.getInstantGenerator())
|| COMPACTION_ACTION.equals(currentInstant.getAction())) {
- return getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant);
+ return
Stream.concat(getCandidateInstantsForTableServicesCommits(activeTimeline,
currentInstant),
+ getCandidateInstantsForRollbackConflict(activeTimeline,
currentInstant));
} else {
- return getCandidateInstantsForNonTableServicesCommits(activeTimeline,
currentInstant);
+ return
Stream.concat(getCandidateInstantsForNonTableServicesCommits(activeTimeline,
currentInstant),
+ getCandidateInstantsForRollbackConflict(activeTimeline,
currentInstant));
}
}
+ private Stream<HoodieInstant>
getCandidateInstantsForRollbackConflict(HoodieActiveTimeline activeTimeline,
HoodieInstant currentInstant) {
+ // Add Requested rollback action instants that were created after the
current instant.
+ List<HoodieInstant> pendingRollbacks = activeTimeline
+ .findInstantsAfter(currentInstant.requestedTime())
+ .filterPendingRollbackTimeline()
Review Comment:
No, the ingestion write will NOT complete successfully in that case.
`getCandidateInstantsForRollbackConflict` uses
`filterPendingRollbackTimeline()` which includes both REQUESTED and INFLIGHT
rollback instants. For both states, `ConcurrentOperation.init()` reads the
rollback plan from the requested instant (line 143-145 in
`ConcurrentOperation.java`) to extract the target commit time. So even if the
rollback is inflight when the ingestion writer runs conflict resolution, the
`isRollbackConflict()` check will detect that the rollback targets the current
commit and throw `HoodieWriteConflictException`, aborting the ingestion write.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]