hudi-agent commented on code in PR #19023:
URL: https://github.com/apache/hudi/pull/19023#discussion_r3432062643
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -547,6 +549,11 @@ private boolean recommitInstant(HoodieTimeline
completedTimeline, long checkpoin
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
writeClient.getHeartbeatClient().start(instant);
}
+ // Initialize the transaction state so that OCC conflict resolution uses
the correct
+ // baseline: the last completed instant before this inflight instant was
created.
Review Comment:
🤖 @danny0405 could you sanity-check the multi-inflight restart scenario? If
two inflight instants A (req=T1) and B (req=T2, T1<T2) are recommitted in
order, when B reaches `preTxnForRecommit`,
`getLastCompletedTxnInstantAndMetadata` excludes A (A.completionTime is `now` >
T2), so the baseline falls back to some pre-T1 instant (or empty), and A then
appears in B's OCC candidate set via
`findInstantsAfter(baseline.requestedTime())`. With streaming upserts touching
the same file groups, this would still throw the same false
`HoodieWriteConflictException` the PR is trying to fix. Is multi-inflight
recovery considered out of scope here, or am I missing a guard somewhere?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -603,8 +610,19 @@ private void handleWriteMetaEvent(WriteMetadataEvent
event) {
*/
private boolean commitInstants(long checkpointId) {
// use < instead of <= because the write metadata event sends the last
known checkpoint id which is smaller than the current one.
+ boolean[] isFirstInstant = {true};
Review Comment:
🤖 nit: the `boolean[] isFirstInstant = {true}` trick to fake a mutable flag
inside a `map` reads as a workaround. Could you collect the filtered entries
first and use a plain `for` loop (or iterate with an index) so the "skip the
refresh on the first instant" intent is expressed directly?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]