gemini-code-assist[bot] commented on code in PR #38921:
URL: https://github.com/apache/beam/pull/38921#discussion_r3396351364
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -215,19 +215,18 @@ public void scheduleWork(
Watermarks watermarks,
Work.ProcessingContext processingContext,
boolean drainMode,
+ ImmutableList<Long> appliedFinalizeIds,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
+ // Before any processing starts, call any pending OnCommit callbacks.
+
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
+ commitFinalizer.finalizeCommits(appliedFinalizeIds);
Review Comment:

In high-throughput streaming, the vast majority of work items will not have
any pending finalize IDs. Calling `commitFinalizer.finalizeCommits`
unconditionally for every work item introduces unnecessary overhead (such as
iterator allocation, method call overhead, and potential lock contention inside
the finalizer) on the critical path of the dispatch thread.
We should restore the empty checks (which were previously present in
`SingleSourceWorkerHarness`) and apply them to both lists before calling
`finalizeCommits`.
```java
if (!workItem.getSourceState().getFinalizeIdsList().isEmpty()) {
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
}
if (!appliedFinalizeIds.isEmpty()) {
commitFinalizer.finalizeCommits(appliedFinalizeIds);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]