Copilot commented on code in PR #38786:
URL: https://github.com/apache/beam/pull/38786#discussion_r3348985703
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -39,6 +42,7 @@ public class FlinkDetachedRunnerResult implements
PipelineResult {
private JobClient jobClient;
private int jobCheckIntervalInSecs;
+ private @Nullable CompletableFuture<String> drainSavepointFuture;
Review Comment:
`drainSavepointFuture` is written under `synchronized` in `drain()` but read
without synchronization (and without `volatile`) in `getState()`, so other
threads may not observe the updated future and can keep returning the
underlying job status instead of `DRAINING`/`DRAINED`. Make
`drainSavepointFuture` `volatile`, or guard reads in `getState()` with the same
lock (or switch to `AtomicReference`).
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -82,6 +90,31 @@ public State cancel() throws IOException {
return getState();
}
+ @Override
+ public synchronized State drain() throws IOException {
+ CompletableFuture<String> drainFuture = drainSavepointFuture;
+ if (drainFuture == null) {
+ drainFuture = this.jobClient.stopWithSavepoint(true, null,
SavepointFormatType.DEFAULT);
+ drainSavepointFuture = drainFuture;
+ }
+ return getDrainState(drainFuture);
+ }
+
+ private State getDrainState(CompletableFuture<String> drainFuture) {
+ if (!drainFuture.isDone()) {
+ return State.DRAINING;
+ }
+ try {
+ drainFuture.get();
+ return State.DRAINED;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Fail to drain flink job", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Fail to drain flink job", e);
+ }
Review Comment:
The message \"Fail to drain flink job\" is grammatically incorrect and the
exception type is a generic `RuntimeException` even though the
`PipelineResult#drain()` contract is `throws IOException`. Prefer a clearer
message (e.g., \"Failed to drain Flink job\") and wrap failures in an
`IOException` (or a Beam-specific checked exception, if used elsewhere) to keep
lifecycle operations consistent for callers.
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -47,6 +51,10 @@ public class FlinkDetachedRunnerResult implements
PipelineResult {
@Override
public State getState() {
+ CompletableFuture<String> drainFuture = drainSavepointFuture;
+ if (drainFuture != null) {
+ return getDrainState(drainFuture);
+ }
Review Comment:
`drainSavepointFuture` is written under `synchronized` in `drain()` but read
without synchronization (and without `volatile`) in `getState()`, so other
threads may not observe the updated future and can keep returning the
underlying job status instead of `DRAINING`/`DRAINED`. Make
`drainSavepointFuture` `volatile`, or guard reads in `getState()` with the same
lock (or switch to `AtomicReference`).
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -82,6 +90,31 @@ public State cancel() throws IOException {
return getState();
}
+ @Override
+ public synchronized State drain() throws IOException {
+ CompletableFuture<String> drainFuture = drainSavepointFuture;
+ if (drainFuture == null) {
+ drainFuture = this.jobClient.stopWithSavepoint(true, null,
SavepointFormatType.DEFAULT);
+ drainSavepointFuture = drainFuture;
+ }
+ return getDrainState(drainFuture);
+ }
Review Comment:
`drainSavepointFuture` is written under `synchronized` in `drain()` but read
without synchronization (and without `volatile`) in `getState()`, so other
threads may not observe the updated future and can keep returning the
underlying job status instead of `DRAINING`/`DRAINED`. Make
`drainSavepointFuture` `volatile`, or guard reads in `getState()` with the same
lock (or switch to `AtomicReference`).
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java:
##########
@@ -432,69 +433,82 @@ private Exception processJobMessages(
}
private AtomicReference<FutureTask<State>> cancelState = new
AtomicReference<>();
+ private AtomicReference<FutureTask<State>> drainState = new
AtomicReference<>();
@SuppressWarnings("Slf4jFormatShouldBeConst")
@Override
public State cancel() throws IOException {
- // Enforce that a cancel() call on the job is done at most once - as
- // a workaround for Dataflow service's current bugs with multiple
- // cancellation, where it may sometimes return an error when cancelling
- // a job that was already cancelled, but still report the job state as
- // RUNNING.
- // To partially work around these issues, we absorb duplicate cancel()
- // calls. This, of course, doesn't address the case when the job terminates
- // externally almost concurrently to calling cancel(), but at least it
- // makes it possible to safely call cancel() multiple times and from
- // multiple threads in one program.
- FutureTask<State> tentativeCancelTask =
+ return requestJobState(cancelState, "JOB_STATE_CANCELLED", "cancel",
"Cancel");
+ }
+
+ @Override
+ public State drain() throws IOException {
+ return requestJobState(drainState, "JOB_STATE_DRAINED", "drain", "Drain");
+ }
+
+ @SuppressWarnings("Slf4jFormatShouldBeConst")
+ private State requestJobState(
+ AtomicReference<FutureTask<State>> requestedState,
+ String dataflowRequestedState,
+ String action,
+ String capitalizedAction)
+ throws IOException {
+ // Enforce that a lifecycle request on the job is done at most once. This
preserves the
+ // existing cancel() behavior and keeps duplicate drain() calls idempotent
from one client.
+ FutureTask<State> tentativeTask =
new FutureTask<>(
() -> {
Job content = new Job();
content.setProjectId(getProjectId());
String currentJobId = getJobId();
content.setId(currentJobId);
- content.setRequestedState("JOB_STATE_CANCELLED");
+ content.setRequestedState(dataflowRequestedState);
try {
Job job = dataflowClient.updateJob(currentJobId, content);
return MonitoringUtil.toState(job.getCurrentState());
} catch (IOException e) {
State state = getState();
if (state.isTerminal()) {
- LOG.warn("Cancel failed because job is already terminated.
State is {}", state);
+ LOG.warn(
+ "{} failed because job is already terminated. State is
{}",
+ capitalizedAction,
+ state);
return state;
} else if (e.getMessage().contains("has terminated")) {
Review Comment:
`IOException#getMessage()` can be `null`, so `e.getMessage().contains(...)`
can throw a `NullPointerException` while handling an `IOException` (masking the
original failure). Use a null-safe check (e.g., store the message in a local
variable and check it for null before calling `contains`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]