zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1271184006


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableRunnerResult.java:
##########
@@ -44,13 +50,44 @@ public JobApi.MetricResults portableMetrics() throws 
UnsupportedOperationExcepti
         .build();
   }
 
-  static class Detached extends FlinkDetachedRunnerResult implements 
PortablePipelineResult {
+  static class Detached implements PortablePipelineResult {
+
+    Detached() {
+      super();
+    }
 
     @Override
     public JobApi.MetricResults portableMetrics() throws 
UnsupportedOperationException {
       LOG.warn(
           "Collecting monitoring infos is not implemented yet in Flink 
portable runner (detached mode).");
       return JobApi.MetricResults.newBuilder().build();
     }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized State getState() {

Review Comment:
   Fixed



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,120 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} 
with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+  private static final int TEN_YEAR_DAYS = 365 * 10;
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not 
currently support metrics.");
+    return 
MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new 
MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet 
supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to