zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1256809704


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,119 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} 
with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+  private static final int TEN_YEAR_DAYS = 365 * 10;
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not 
currently support metrics.");
+    return 
MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new 
MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet 
supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    return State.UNKNOWN;
+    long start = System.currentTimeMillis();
+    long durationInMillis = duration.getMillis();
+    State state = State.UNKNOWN;
+    while ((System.currentTimeMillis() - start) < durationInMillis) {
+      state = getState();
+      if (state.isTerminal()) {
+        return state;
+      }
+      try {
+        Thread.sleep(jobCheckIntervalInSecs * 1000);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (state != null && !state.isTerminal()) {
+      LOG.warn("Job is not finished in {} seconds", 
duration.getStandardSeconds());
+    }
+    return state;
+  }
+
+  private State toBeamJobState(JobStatus flinkJobStatus) {
+    switch (flinkJobStatus) {
+      case CANCELLING:
+      case CREATED:
+      case INITIALIZING:
+      case FAILING:
+      case RECONCILING:
+      case RESTARTING:
+      case RUNNING:
+        return State.RUNNING;
+      case FINISHED:
+        return State.DONE;
+      case CANCELED:
+        return State.CANCELLED;
+      case FAILED:
+        return State.FAILED;
+      case SUSPENDED:

Review Comment:
   Thanks for the review, @kkdoon I have updated this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to