Copilot commented on code in PR #38786:
URL: https://github.com/apache/beam/pull/38786#discussion_r3348985703


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -39,6 +42,7 @@ public class FlinkDetachedRunnerResult implements 
PipelineResult {
 
   private JobClient jobClient;
   private int jobCheckIntervalInSecs;
+  private @Nullable CompletableFuture<String> drainSavepointFuture;

Review Comment:
   `drainSavepointFuture` is written under `synchronized` in `drain()` but read 
without synchronization (and without `volatile`) in `getState()`, so other 
threads may not observe the updated future and can keep returning the 
underlying job status instead of `DRAINING`/`DRAINED`. Make 
`drainSavepointFuture` `volatile`, or guard reads in `getState()` with the same 
lock (or switch to `AtomicReference`).



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -82,6 +90,31 @@ public State cancel() throws IOException {
     return getState();
   }
 
+  @Override
+  public synchronized State drain() throws IOException {
+    CompletableFuture<String> drainFuture = drainSavepointFuture;
+    if (drainFuture == null) {
+      drainFuture = this.jobClient.stopWithSavepoint(true, null, 
SavepointFormatType.DEFAULT);
+      drainSavepointFuture = drainFuture;
+    }
+    return getDrainState(drainFuture);
+  }
+
+  private State getDrainState(CompletableFuture<String> drainFuture) {
+    if (!drainFuture.isDone()) {
+      return State.DRAINING;
+    }
+    try {
+      drainFuture.get();
+      return State.DRAINED;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Fail to drain flink job", e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Fail to drain flink job", e);
+    }

Review Comment:
   The message \"Fail to drain flink job\" is grammatically incorrect and the 
exception type is a generic `RuntimeException` even though the 
`PipelineResult#drain()` contract is `throws IOException`. Prefer a clearer 
message (e.g., \"Failed to drain Flink job\") and wrap failures in an 
`IOException` (or a Beam-specific checked exception, if used elsewhere) to keep 
lifecycle operations consistent for callers.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -47,6 +51,10 @@ public class FlinkDetachedRunnerResult implements 
PipelineResult {
 
   @Override
   public State getState() {
+    CompletableFuture<String> drainFuture = drainSavepointFuture;
+    if (drainFuture != null) {
+      return getDrainState(drainFuture);
+    }

Review Comment:
   `drainSavepointFuture` is written under `synchronized` in `drain()` but read 
without synchronization (and without `volatile`) in `getState()`, so other 
threads may not observe the updated future and can keep returning the 
underlying job status instead of `DRAINING`/`DRAINED`. Make 
`drainSavepointFuture` `volatile`, or guard reads in `getState()` with the same 
lock (or switch to `AtomicReference`).



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -82,6 +90,31 @@ public State cancel() throws IOException {
     return getState();
   }
 
+  @Override
+  public synchronized State drain() throws IOException {
+    CompletableFuture<String> drainFuture = drainSavepointFuture;
+    if (drainFuture == null) {
+      drainFuture = this.jobClient.stopWithSavepoint(true, null, 
SavepointFormatType.DEFAULT);
+      drainSavepointFuture = drainFuture;
+    }
+    return getDrainState(drainFuture);
+  }

Review Comment:
   `drainSavepointFuture` is written under `synchronized` in `drain()` but read 
without synchronization (and without `volatile`) in `getState()`, so other 
threads may not observe the updated future and can keep returning the 
underlying job status instead of `DRAINING`/`DRAINED`. Make 
`drainSavepointFuture` `volatile`, or guard reads in `getState()` with the same 
lock (or switch to `AtomicReference`).



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java:
##########
@@ -432,69 +433,82 @@ private Exception processJobMessages(
   }
 
   private AtomicReference<FutureTask<State>> cancelState = new 
AtomicReference<>();
+  private AtomicReference<FutureTask<State>> drainState = new 
AtomicReference<>();
 
   @SuppressWarnings("Slf4jFormatShouldBeConst")
   @Override
   public State cancel() throws IOException {
-    // Enforce that a cancel() call on the job is done at most once - as
-    // a workaround for Dataflow service's current bugs with multiple
-    // cancellation, where it may sometimes return an error when cancelling
-    // a job that was already cancelled, but still report the job state as
-    // RUNNING.
-    // To partially work around these issues, we absorb duplicate cancel()
-    // calls. This, of course, doesn't address the case when the job terminates
-    // externally almost concurrently to calling cancel(), but at least it
-    // makes it possible to safely call cancel() multiple times and from
-    // multiple threads in one program.
-    FutureTask<State> tentativeCancelTask =
+    return requestJobState(cancelState, "JOB_STATE_CANCELLED", "cancel", 
"Cancel");
+  }
+
+  @Override
+  public State drain() throws IOException {
+    return requestJobState(drainState, "JOB_STATE_DRAINED", "drain", "Drain");
+  }
+
+  @SuppressWarnings("Slf4jFormatShouldBeConst")
+  private State requestJobState(
+      AtomicReference<FutureTask<State>> requestedState,
+      String dataflowRequestedState,
+      String action,
+      String capitalizedAction)
+      throws IOException {
+    // Enforce that a lifecycle request on the job is done at most once. This 
preserves the
+    // existing cancel() behavior and keeps duplicate drain() calls idempotent 
from one client.
+    FutureTask<State> tentativeTask =
         new FutureTask<>(
             () -> {
               Job content = new Job();
               content.setProjectId(getProjectId());
               String currentJobId = getJobId();
               content.setId(currentJobId);
-              content.setRequestedState("JOB_STATE_CANCELLED");
+              content.setRequestedState(dataflowRequestedState);
               try {
                 Job job = dataflowClient.updateJob(currentJobId, content);
                 return MonitoringUtil.toState(job.getCurrentState());
               } catch (IOException e) {
                 State state = getState();
                 if (state.isTerminal()) {
-                  LOG.warn("Cancel failed because job is already terminated. 
State is {}", state);
+                  LOG.warn(
+                      "{} failed because job is already terminated. State is 
{}",
+                      capitalizedAction,
+                      state);
                   return state;
                 } else if (e.getMessage().contains("has terminated")) {

Review Comment:
   `IOException#getMessage()` can be `null`, so `e.getMessage().contains(...)` 
can throw a `NullPointerException` while handling an `IOException` (masking the 
original failure). Use a null-safe check (e.g., store the message in a local 
variable and check it for null before calling `contains`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to