yihua commented on code in PR #18282:
URL: https://github.com/apache/hudi/pull/18282#discussion_r3035846818
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java:
##########
Review Comment:
**Line 142:** 🤖 The method name `awaitAllInstantsToCompleteIfNecessary`
implies it blocks until **all** pending instants are committed, but it calls
`blockFor(String)` which only does a single `condition.await()` — meaning it
wakes up on the first `unblock()` signal regardless of whether other instants
remain pending.
In the current design this works because the coordinator serializes instant
creation (single-threaded executor) and `restoreEvents()` handles restored
instants synchronously before any new requests. But if multiple pending
instants ever exist when this is called (e.g., due to future changes in the
restore path), this would create a new instant while prior ones are still
in-flight.
Consider using the `blockFor(Supplier<List<String>>)` overload which loops
until the predicate is satisfied, or add a comment documenting the
single-pending-instant invariant.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -555,10 +560,12 @@ private void recommitInstant(HoodieTimeline
completedTimeline, long checkpointId
private void handleBootstrapEvent(WriteMetadataEvent event) {
if (event.getInstantTime().equals(WriteMetadataEvent.BOOTSTRAP_INSTANT)) {
this.eventBuffers.cleanLegacyEvents(event);
+ this.eventBuffers.notifyWriterBootstrapEventReceived();
return;
}
EventBuffer eventBuffer =
this.eventBuffers.getOrCreateBootstrapBuffer(event);
eventBuffer.addBootstrapEvent(event);
+ this.eventBuffers.notifyWriterBootstrapEventReceived();
Review Comment:
🤖 When pending instants are recommitted and RLI bootstrap is enabled, this
triggers `context.failJob()` to force a global failover, so that the next
restart will re-run RLI bootstrap with the newly committed data. This means
every restart with pending instants costs an extra restart cycle.
Is the double-restart design intentional? An alternative would be to block
the RLI bootstrap operator until the coordinator confirms all pending instants
are committed (e.g., via the `CommitGuard` mechanism already in place),
avoiding the extra restart. Was this considered and rejected for simplicity?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]