Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1258725656
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,120 @@
package org.apache.beam.runners.flink;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline}
with Flink. In detached
* execution, results and job execution are currently unavailable.
*/
public class FlinkDetachedRunnerResult implements PipelineResult {
- FlinkDetachedRunnerResult() {}
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+ private static final int TEN_YEAR_DAYS = 365 * 10;
+
+ private JobClient jobClient;
+ private int jobCheckIntervalInSecs;
+
+ FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+ this.jobClient = jobClient;
+ this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+ }
@Override
public State getState() {
- return State.UNKNOWN;
+ try {
+ return toBeamJobState(jobClient.getJobStatus().get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Fail to get flink job state", e);
+ }
}
@Override
public MetricResults metrics() {
- throw new UnsupportedOperationException("The FlinkRunner does not
currently support metrics.");
+ return
MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+ }
+
+ private MetricsContainerStepMap getMetricsContainerStepMap() {
+ try {
+ return (MetricsContainerStepMap)
+ jobClient
+ .getAccumulators()
+ .get()
+ .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new
MetricsContainerStepMap());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Fail to get flink job accumulators", e);
+ return new MetricsContainerStepMap();
+ }
}
@Override
public State cancel() throws IOException {
- throw new UnsupportedOperationException("Cancelling is not yet
supported.");
+ try {
+ this.jobClient.cancel().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Fail to cancel flink job", e);
+ }
+ return getState();
}
@Override
public State waitUntilFinish() {
- return State.UNKNOWN;
+ return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));
Review Comment:
consider Duration.millis(Long.MAX_VALUE) which is conventionally used
throughout beam
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]