suryaprasanna commented on code in PR #19023:
URL: https://github.com/apache/hudi/pull/19023#discussion_r3432176382
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -141,6 +144,28 @@ public static Option<Pair<HoodieInstant, Map<String,
String>>> getLastCompletedT
return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption);
}
+ /**
+ * Get the last completed transaction hoodie instant before the given
instant time.
+ * The returned instant has both requested time and completion time less
than the given instant time,
+ * ensuring it was fully completed before the given instant was created.
+ *
+ * @param metaClient table meta client
+ * @param currentInstantTime the requested time of the current inflight
instant
+ * @return the last completed instant before the given instant, with its
extra metadata
+ */
+ public static Option<Pair<HoodieInstant, Map<String, String>>>
getLastCompletedTxnInstantAndMetadata(
Review Comment:
Agreed, added three unit tests in TestTransactionUtils covering interleaved
completions, the completionTime exclusion filter, and the empty-result edge
case. Also added testCommittingMultipleInstantsWithOCC in TestWriteMergeOnRead
as the end-to-end guard.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -141,6 +145,28 @@ public static Option<Pair<HoodieInstant, Map<String,
String>>> getLastCompletedT
return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption);
}
+ /**
+ * Get the last completed transaction hoodie instant before the given
instant time.
+ * The returned instant has both requested time and completion time less
than the given instant time,
+ * ensuring it was fully completed before the given instant was created.
+ *
+ * @param metaClient table meta client
+ * @param currentInstantTime the requested time of the current inflight
instant
+ * @return the last completed instant before the given instant, with its
extra metadata
+ */
+ public static Option<Pair<HoodieInstant, Map<String, String>>>
getLastCompletedTxnInstantAndMetadata(
+ HoodieTableMetaClient metaClient, String currentInstantTime) {
+ Option<HoodieInstant> hoodieInstantOption = Option.fromJavaOptional(
+ metaClient.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants()
+ .findInstantsBefore(currentInstantTime)
+ .getInstantsAsStream()
+ .filter(instant -> instant.getCompletionTime() != null
+ && compareTimestamps(instant.getCompletionTime(), LESSER_THAN,
currentInstantTime))
Review Comment:
Good catch. Fixed -- switched to requestedTime for the max selection. The
completionTime filter on the preceding line is retained to ensure we only
consider fully completed instants.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -547,6 +549,11 @@ private boolean recommitInstant(HoodieTimeline
completedTimeline, long checkpoin
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
writeClient.getHeartbeatClient().start(instant);
}
+ // Initialize the transaction state so that OCC conflict resolution uses
the correct
+ // baseline: the last completed instant before this inflight instant was
created.
Review Comment:
The timeline is now reloaded before each recommit in restoreEvents(), so
when B is recommitted, A is already visible as completed on the refreshed
timeline. preTxnForRecommit then correctly sets A as the baseline, and
findInstantsAfter(A.requestedTime()) excludes A from B's candidate set.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -547,6 +547,11 @@ private boolean recommitInstant(HoodieTimeline
completedTimeline, long checkpoin
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
writeClient.getHeartbeatClient().start(instant);
}
+ // Initialize the transaction state so that OCC conflict resolution uses
the correct
+ // baseline: the last completed instant before this inflight instant was
created.
+ // Without this, lastCompletedTxnAndMetadata is empty and conflict
resolution checks
+ // against all completed instants on the timeline, causing false
conflicts.
+ writeClient.preTxnForRecommit(tableState.operationType, this.metaClient,
instant);
Review Comment:
Fixed. Moved reloadActiveTimeline() inside the loop in restoreEvents() so
each iteration sees the previously committed instant. Also found the same issue
in commitInstants() (L611) -- when multiple buffered instants are committed in
sequence during checkpointComplete, the second commit sees the first as a
concurrent conflict. Fixed by refreshing the timeline and preTxn for subsequent
instants (skipping the first to preserve multi-writer conflict detection).
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -603,8 +610,19 @@ private void handleWriteMetaEvent(WriteMetadataEvent
event) {
*/
private boolean commitInstants(long checkpointId) {
// use < instead of <= because the write metadata event sends the last
known checkpoint id which is smaller than the current one.
+ boolean[] isFirstInstant = {true};
Review Comment:
Fair point. Refactored to collect the filtered entries first and use a plain
for loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]