suryaprasanna commented on code in PR #19023:
URL: https://github.com/apache/hudi/pull/19023#discussion_r3432176382


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -141,6 +144,28 @@ public static Option<Pair<HoodieInstant, Map<String, 
String>>> getLastCompletedT
     return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption);
   }
 
+  /**
+   * Get the last completed transaction hoodie instant before the given 
instant time.
+   * The returned instant has both requested time and completion time less 
than the given instant time,
+   * ensuring it was fully completed before the given instant was created.
+   *
+   * @param metaClient table meta client
+   * @param currentInstantTime the requested time of the current inflight 
instant
+   * @return the last completed instant before the given instant, with its 
extra metadata
+   */
+  public static Option<Pair<HoodieInstant, Map<String, String>>> 
getLastCompletedTxnInstantAndMetadata(

Review Comment:
   Agreed, added three unit tests in TestTransactionUtils covering interleaved 
completions, the completionTime exclusion filter, and the empty-result edge 
case. Also added testCommittingMultipleInstantsWithOCC in TestWriteMergeOnRead 
as the end-to-end guard.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -141,6 +145,28 @@ public static Option<Pair<HoodieInstant, Map<String, 
String>>> getLastCompletedT
     return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption);
   }
 
+  /**
+   * Get the last completed transaction hoodie instant before the given 
instant time.
+   * The returned instant has both requested time and completion time less 
than the given instant time,
+   * ensuring it was fully completed before the given instant was created.
+   *
+   * @param metaClient table meta client
+   * @param currentInstantTime the requested time of the current inflight 
instant
+   * @return the last completed instant before the given instant, with its 
extra metadata
+   */
+  public static Option<Pair<HoodieInstant, Map<String, String>>> 
getLastCompletedTxnInstantAndMetadata(
+      HoodieTableMetaClient metaClient, String currentInstantTime) {
+    Option<HoodieInstant> hoodieInstantOption = Option.fromJavaOptional(
+        metaClient.getActiveTimeline().getCommitsTimeline()
+            .filterCompletedInstants()
+            .findInstantsBefore(currentInstantTime)
+            .getInstantsAsStream()
+            .filter(instant -> instant.getCompletionTime() != null
+                && compareTimestamps(instant.getCompletionTime(), LESSER_THAN, 
currentInstantTime))

Review Comment:
   Good catch. Fixed -- switched to requestedTime for the max selection. The 
completionTime filter on the preceding line is retained to ensure we only 
consider fully completed instants.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -547,6 +549,11 @@ private boolean recommitInstant(HoodieTimeline 
completedTimeline, long checkpoin
       if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
         writeClient.getHeartbeatClient().start(instant);
       }
+      // Initialize the transaction state so that OCC conflict resolution uses 
the correct
+      // baseline: the last completed instant before this inflight instant was 
created.

Review Comment:
    The timeline is now reloaded before each recommit in restoreEvents(), so 
when B is recommitted, A is already visible as completed on the refreshed 
timeline. preTxnForRecommit then correctly sets A as the baseline, and 
findInstantsAfter(A.requestedTime()) excludes A from B's candidate set.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -547,6 +547,11 @@ private boolean recommitInstant(HoodieTimeline 
completedTimeline, long checkpoin
       if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
         writeClient.getHeartbeatClient().start(instant);
       }
+      // Initialize the transaction state so that OCC conflict resolution uses 
the correct
+      // baseline: the last completed instant before this inflight instant was 
created.
+      // Without this, lastCompletedTxnAndMetadata is empty and conflict 
resolution checks
+      // against all completed instants on the timeline, causing false 
conflicts.
+      writeClient.preTxnForRecommit(tableState.operationType, this.metaClient, 
instant);

Review Comment:
   Fixed. Moved reloadActiveTimeline() inside the loop in restoreEvents() so 
each iteration sees the previously committed instant. Also found the same issue 
in commitInstants() (L611) -- when multiple buffered instants are committed in 
sequence during checkpointComplete, the second commit sees the first as a 
concurrent conflict. Fixed by refreshing the timeline and preTxn for subsequent 
instants (skipping the first to preserve multi-writer conflict detection).



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -603,8 +610,19 @@ private void handleWriteMetaEvent(WriteMetadataEvent 
event) {
    */
   private boolean commitInstants(long checkpointId) {
     // use < instead of <= because the write metadata event sends the last 
known checkpoint id which is smaller than the current one.
+    boolean[] isFirstInstant = {true};

Review Comment:
   Fair point. Refactored to collect the filtered entries first and use a plain 
for loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to