gemini-code-assist[bot] commented on code in PR #38786:
URL: https://github.com/apache/beam/pull/38786#discussion_r3413410535


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java:
##########
@@ -432,69 +432,83 @@ private Exception processJobMessages(
   }
 
   private AtomicReference<FutureTask<State>> cancelState = new 
AtomicReference<>();
+  private AtomicReference<FutureTask<State>> drainState = new 
AtomicReference<>();
 
   @SuppressWarnings("Slf4jFormatShouldBeConst")
   @Override
   public State cancel() throws IOException {
-    // Enforce that a cancel() call on the job is done at most once - as
-    // a workaround for Dataflow service's current bugs with multiple
-    // cancellation, where it may sometimes return an error when cancelling
-    // a job that was already cancelled, but still report the job state as
-    // RUNNING.
-    // To partially work around these issues, we absorb duplicate cancel()
-    // calls. This, of course, doesn't address the case when the job terminates
-    // externally almost concurrently to calling cancel(), but at least it
-    // makes it possible to safely call cancel() multiple times and from
-    // multiple threads in one program.
-    FutureTask<State> tentativeCancelTask =
+    return requestJobState(cancelState, "JOB_STATE_CANCELLED", "cancel", 
"Cancel");
+  }
+
+  @Override
+  public State drain() throws IOException {
+    return requestJobState(drainState, "JOB_STATE_DRAINED", "drain", "Drain");
+  }
+
+  @SuppressWarnings("Slf4jFormatShouldBeConst")
+  private State requestJobState(
+      AtomicReference<FutureTask<State>> requestedState,
+      String dataflowRequestedState,
+      String action,
+      String capitalizedAction)
+      throws IOException {
+    // Enforce that a lifecycle request on the job is done at most once. This 
preserves the
+    // existing cancel() behavior and keeps duplicate drain() calls idempotent 
from one client.
+    FutureTask<State> tentativeTask =
         new FutureTask<>(
             () -> {
               Job content = new Job();
               content.setProjectId(getProjectId());
               String currentJobId = getJobId();
               content.setId(currentJobId);
-              content.setRequestedState("JOB_STATE_CANCELLED");
+              content.setRequestedState(dataflowRequestedState);
               try {
                 Job job = dataflowClient.updateJob(currentJobId, content);
                 return MonitoringUtil.toState(job.getCurrentState());
               } catch (IOException e) {
                 State state = getState();
+                String message = e.getMessage();
                 if (state.isTerminal()) {
-                  LOG.warn("Cancel failed because job is already terminated. 
State is {}", state);
+                  LOG.warn(
+                      "{} failed because job is already terminated. State is 
{}",
+                      capitalizedAction,
+                      state);
                   return state;
-                } else if (e.getMessage().contains("has terminated")) {
+                } else if (message != null && message.contains("has 
terminated")) {
                   // This handles the case where the getState() call above 
returns RUNNING but the
-                  // cancel was rejected because the job is in fact done. 
Hopefully, someday we can
+                  // request was rejected because the job is in fact done. 
Hopefully, someday we can
                   // delete this code if there is better consistency between 
the State and whether
-                  // Cancel succeeds.
+                  // lifecycle requests succeed.
                   //
                   // Example message:
                   //    Workflow modification failed. Causes: 
(7603adc9e9bff51e): Cannot perform
                   //    operation 'cancel' on Job: 
2017-04-01_22_50_59-9269855660514862348. Job has
                   //    terminated in state SUCCESS: Workflow job:
                   //    2017-04-01_22_50_59-9269855660514862348 succeeded.
-                  LOG.warn("Cancel failed because job is already terminated.", 
e);
+                  LOG.warn("{} failed because job is already terminated.", 
capitalizedAction, e);
                   return state;
                 } else {
                   String errorMsg =
                       String.format(
-                          "Failed to cancel job in state %s, "
-                              + "please go to the Developers Console to cancel 
it manually: %s",
+                          "Failed to %s job in state %s, "
+                              + "please go to the Developers Console to %s it 
manually: %s",
+                          action,
                           state,
+                          action,
                           MonitoringUtil.getJobMonitoringPageURL(
                               getProjectId(), getRegion(), getJobId()));
                   LOG.warn(errorMsg);
                   throw new IOException(errorMsg, e);
                 }
               }
             });
-    if (cancelState.compareAndSet(null, tentativeCancelTask)) {
-      // This thread should perform cancellation, while others will
-      // only wait for the result.
-      cancelState.get().run();
+    if (requestedState.compareAndSet(null, tentativeTask)) {
+      // This thread should perform the lifecycle request, while others will 
only wait for the
+      // result.
+      requestedState.get().run();
     }
     try {
-      return cancelState.get().get();
+      return requestedState.get().get();
     } catch (InterruptedException | ExecutionException e) {
       throw new IOException(e);
     }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In `requestJobState()`, catching `InterruptedException` and wrapping it in 
an `IOException` without restoring the thread's interrupted status can cause 
the interruption to be lost. It is highly recommended to call 
`Thread.currentThread().interrupt()` before throwing the exception.
   
   ```java
       try {
         return requestedState.get().get();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException(e);
       } catch (ExecutionException e) {
         throw new IOException(e);
       }
   ```



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -47,10 +51,22 @@ public class FlinkDetachedRunnerResult implements 
PipelineResult {
 
   @Override
   public State getState() {
+    CompletableFuture<String> drainFuture = drainSavepointFuture;
+    if (drainFuture != null) {
+      try {
+        return getDrainState(drainFuture);
+      } catch (IOException e) {
+        LOG.warn("Failed to drain Flink job. Querying Flink job state 
instead.", e);
+      }
+    }
+    return getFlinkJobState();
+  }
+
+  private State getFlinkJobState() {
     try {
       return toBeamJobState(jobClient.getJobStatus().get());
     } catch (InterruptedException | ExecutionException e) {
-      throw new RuntimeException("Fail to get flink job state", e);
+      throw new RuntimeException("Failed to get Flink job state", e);
     }
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In `getFlinkJobState()`, when catching `InterruptedException`, the 
interrupted status of the current thread is swallowed. It is a best practice to 
restore the interrupted status by calling `Thread.currentThread().interrupt()` 
so that higher-level call stacks or thread pools are aware of the interruption.
   
   ```java
     private State getFlinkJobState() {
       try {
         return toBeamJobState(jobClient.getJobStatus().get());
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException("Failed to get Flink job state", e);
       } catch (ExecutionException e) {
         throw new RuntimeException("Failed to get Flink job state", e);
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to