gemini-code-assist[bot] commented on code in PR #38786:
URL: https://github.com/apache/beam/pull/38786#discussion_r3348990984
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -39,6 +42,7 @@ public class FlinkDetachedRunnerResult implements
PipelineResult {
private JobClient jobClient;
private int jobCheckIntervalInSecs;
+ private @Nullable CompletableFuture<String> drainSavepointFuture;
Review Comment:

The `drainSavepointFuture` field is read in `getState()` without
synchronization, but written in the `synchronized` `drain()` method. To ensure
proper visibility and thread safety across threads, this field should be
declared as `volatile`.
```suggestion
private volatile @Nullable CompletableFuture<String> drainSavepointFuture;
```
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -82,6 +90,31 @@ public State cancel() throws IOException {
return getState();
}
+ @Override
+ public synchronized State drain() throws IOException {
+ CompletableFuture<String> drainFuture = drainSavepointFuture;
+ if (drainFuture == null) {
+ drainFuture = this.jobClient.stopWithSavepoint(true, null,
SavepointFormatType.DEFAULT);
+ drainSavepointFuture = drainFuture;
+ }
+ return getDrainState(drainFuture);
+ }
+
+ private State getDrainState(CompletableFuture<String> drainFuture) {
+ if (!drainFuture.isDone()) {
+ return State.DRAINING;
+ }
+ try {
+ drainFuture.get();
+ return State.DRAINED;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Fail to drain flink job", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Fail to drain flink job", e);
+ }
+ }
Review Comment:

We can improve the robustness of the `drain()` method in several ways:
1. **Support Retries**: If a previous drain attempt failed (i.e.,
`drainFuture.isCompletedExceptionally()`), we should allow the user to retry
the drain operation rather than permanently returning the cached failure.
2. **Throw Checked Exceptions**: Since `drain()` is declared to throw
`IOException`, we should wrap and throw `IOException` instead of
`RuntimeException` when the future fails.
3. **Simplify Code**: We can inline the `getDrainState` logic directly into
`drain()` and remove the helper method entirely, making the code cleaner.
```java
@Override
public synchronized State drain() throws IOException {
CompletableFuture<String> drainFuture = drainSavepointFuture;
if (drainFuture == null || drainFuture.isCompletedExceptionally()) {
drainFuture = this.jobClient.stopWithSavepoint(true, null,
SavepointFormatType.DEFAULT);
drainSavepointFuture = drainFuture;
}
if (!drainFuture.isDone()) {
return State.DRAINING;
}
try {
drainFuture.get();
return State.DRAINED;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Failed to drain flink job", e);
} catch (ExecutionException e) {
throw new IOException("Failed to drain flink job", e.getCause());
}
}
```
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java:
##########
@@ -432,69 +433,82 @@ private Exception processJobMessages(
}
private AtomicReference<FutureTask<State>> cancelState = new
AtomicReference<>();
+ private AtomicReference<FutureTask<State>> drainState = new
AtomicReference<>();
@SuppressWarnings("Slf4jFormatShouldBeConst")
@Override
public State cancel() throws IOException {
- // Enforce that a cancel() call on the job is done at most once - as
- // a workaround for Dataflow service's current bugs with multiple
- // cancellation, where it may sometimes return an error when cancelling
- // a job that was already cancelled, but still report the job state as
- // RUNNING.
- // To partially work around these issues, we absorb duplicate cancel()
- // calls. This, of course, doesn't address the case when the job terminates
- // externally almost concurrently to calling cancel(), but at least it
- // makes it possible to safely call cancel() multiple times and from
- // multiple threads in one program.
- FutureTask<State> tentativeCancelTask =
+ return requestJobState(cancelState, "JOB_STATE_CANCELLED", "cancel",
"Cancel");
+ }
+
+ @Override
+ public State drain() throws IOException {
+ return requestJobState(drainState, "JOB_STATE_DRAINED", "drain", "Drain");
+ }
+
+ @SuppressWarnings("Slf4jFormatShouldBeConst")
+ private State requestJobState(
+ AtomicReference<FutureTask<State>> requestedState,
+ String dataflowRequestedState,
+ String action,
+ String capitalizedAction)
+ throws IOException {
+ // Enforce that a lifecycle request on the job is done at most once. This
preserves the
+ // existing cancel() behavior and keeps duplicate drain() calls idempotent
from one client.
+ FutureTask<State> tentativeTask =
new FutureTask<>(
() -> {
Job content = new Job();
content.setProjectId(getProjectId());
String currentJobId = getJobId();
content.setId(currentJobId);
- content.setRequestedState("JOB_STATE_CANCELLED");
+ content.setRequestedState(dataflowRequestedState);
try {
Job job = dataflowClient.updateJob(currentJobId, content);
return MonitoringUtil.toState(job.getCurrentState());
} catch (IOException e) {
State state = getState();
if (state.isTerminal()) {
- LOG.warn("Cancel failed because job is already terminated.
State is {}", state);
+ LOG.warn(
+ "{} failed because job is already terminated. State is
{}",
+ capitalizedAction,
+ state);
return state;
} else if (e.getMessage().contains("has terminated")) {
Review Comment:

If `e.getMessage()` is `null` (which can happen with certain `IOException`
subclasses or in test environments), calling `contains()` on it will throw a
`NullPointerException`, masking the original exception. We should add a null
check before calling `contains()`.
```suggestion
} else if (e.getMessage() != null &&
e.getMessage().contains("has terminated")) {
```
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -47,6 +51,10 @@ public class FlinkDetachedRunnerResult implements
PipelineResult {
@Override
public State getState() {
+ CompletableFuture<String> drainFuture = drainSavepointFuture;
+ if (drainFuture != null) {
+ return getDrainState(drainFuture);
+ }
Review Comment:

If the `drainSavepointFuture` completes exceptionally (e.g., if the
stop-with-savepoint operation fails), calling `getState()` will currently throw
a `RuntimeException` and crash the caller. Instead, we should be more robust:
if the drain operation failed, the job is likely still running or in its
previous state. We can check `isCompletedExceptionally()` and, if true, fall
back to querying the actual Flink job status via `jobClient.getJobStatus()`.
```suggestion
CompletableFuture<String> drainFuture = drainSavepointFuture;
if (drainFuture != null && !drainFuture.isCompletedExceptionally()) {
return drainFuture.isDone() ? State.DRAINED : State.DRAINING;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]