gemini-code-assist[bot] commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3285138426
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,32 +228,54 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ private void executeMonitorHeld(ExecutableWork work, long workBytes) {
bytesOutstanding += workBytes;
++elementsOutstanding;
monitor.leave();
+ executor.execute(
+ () -> {
+ // Any execution exception thrown by work.run() propagates uncaught,
triggering
+ // the default JVM UncaughtExceptionHandler which immediately
crashes/terminates
+ // the JVM. Since the process exits immediately, reclaiming resource
budgets in
+ // this JVM is unnecessary. Furthermore, since a failed execution
does not return
+ // a WorkResult, we do not have a good/accurate fallback value to
decrement.
+ WorkResult result = work.run();
+ decrementCounters(result);
+ });
Review Comment:

The `executeMonitorHeld` method for `ExecutableWork` lacks error handling
for task submission and execution failures, which can lead to resource leaks in
`elementsOutstanding` and `bytesOutstanding`.
1. If `executor.execute()` throws a `RejectedExecutionException` (e.g.,
during worker shutdown), the counters incremented just before will never be
decremented, leading to a permanent capacity leak in the JVM.
2. If `work.run()` throws an exception and the JVM does not immediately
terminate (e.g., if the exception is caught by the thread pool's uncaught
exception handler but the process continues), the counters will also leak.
This implementation should be made consistent with the `Runnable` version of
`executeMonitorHeld` (lines 248-272), which correctly handles these scenarios.
Even if a JVM crash is expected on work failure, providing a defensive
decrement (using the initial `workBytes` as a fallback) is safer.
```java
try {
executor.execute(
() -> {
WorkResult result = null;
try {
result = work.run();
} finally {
// Any execution exception thrown by work.run() propagates
uncaught, triggering
// the default JVM UncaughtExceptionHandler which immediately
crashes/terminates
// the JVM. Since the process exits immediately, reclaiming
resource budgets in
// this JVM is unnecessary. Furthermore, since a failed
execution does not return
// a WorkResult, we fallback to decrementing the initial
budget to prevent
// resource leaks in case the JVM does not crash.
decrementCounters(result != null ? result :
WorkResult.create(1, workBytes));
}
});
} catch (Throwable e) {
// If the execute() call threw an exception, decrement counters here.
decrementCounters(WorkResult.create(1, workBytes));
throw ExceptionUtils.propagate(e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]