gemini-code-assist[bot] commented on code in PR #39096:
URL: https://github.com/apache/beam/pull/39096#discussion_r3469930505
##########
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java:
##########
@@ -359,6 +359,9 @@ private void shutdownIfNecessary(State newState) {
.map(Exception::getMessage)
.collect(Collectors.joining("\n- ", "- ", "")));
visibleUpdates.failed(exception);
+ }
+ pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit a
terminal node
+ if (exception != null) {
throw exception;
}
Review Comment:

### Potential Pipeline Hang due to Unhandled Exceptions during Shutdown
By moving `pipelineState.compareAndSet(State.RUNNING, newState)` to the end
of the method (after the exception is constructed and
`visibleUpdates.failed(exception)` is called), any exception thrown during this
block will prevent the pipeline state from transitioning to a terminal state.
#### Why this is a critical issue:
1. **Null Messages in Errors**: If any exception in `errors` has a `null`
message (e.g., a `NullPointerException` or a custom exception without a
message), `Collectors.joining` will throw a `NullPointerException` because
`StringJoiner` does not allow null elements.
2. **Indefinite Hang**: In the original code, `pipelineState.compareAndSet`
was called first, so even if an NPE occurred during string joining, the
pipeline state was already terminal. In the new code, the NPE will be thrown
*before* the state transition, leaving the pipeline in the `RUNNING` state
forever. Any thread waiting on `waitUntilFinish()` will hang indefinitely.
#### Recommendation:
Wrap the error aggregation and `visibleUpdates.failed` call in a `try`
block, and ensure `pipelineState.compareAndSet` is executed in a `finally`
block. Additionally, safely handle potential `null` messages in the stream.
```java
IllegalStateException exception = null;
try {
if (!errors.isEmpty()) {
exception =
new IllegalStateException(
"Error"
+ (errors.size() == 1 ? "" : "s")
+ " occurred during pipeline execution:\\n"
+ errors.stream()
.map(e -> e.getMessage() == null ?
e.getClass().getName() : e.getMessage())
.collect(Collectors.joining("\\n- ", "- ", "")));
visibleUpdates.failed(exception);
}
} finally {
pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit
a terminal node
}
if (exception != null) {
throw exception;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]