kkdoon commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1253904078
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,119 @@
package org.apache.beam.runners.flink;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline}
with Flink. In detached
* execution, results and job execution are currently unavailable.
*/
public class FlinkDetachedRunnerResult implements PipelineResult {
- FlinkDetachedRunnerResult() {}
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+ private static final int TEN_YEAR_DAYS = 365 * 10;
+
+ private JobClient jobClient;
+ private int jobCheckIntervalInSecs;
+
+ FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+ this.jobClient = jobClient;
+ this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+ }
@Override
public State getState() {
- return State.UNKNOWN;
+ try {
+ return toBeamJobState(jobClient.getJobStatus().get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Fail to get flink job state", e);
+ }
}
@Override
public MetricResults metrics() {
- throw new UnsupportedOperationException("The FlinkRunner does not
currently support metrics.");
+ return
MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+ }
+
+ private MetricsContainerStepMap getMetricsContainerStepMap() {
+ try {
+ return (MetricsContainerStepMap)
+ jobClient
+ .getAccumulators()
+ .get()
+ .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new
MetricsContainerStepMap());
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Fail to get flink job accumulators", e);
+ return new MetricsContainerStepMap();
+ }
}
@Override
public State cancel() throws IOException {
- throw new UnsupportedOperationException("Cancelling is not yet
supported.");
+ try {
+ this.jobClient.cancel().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Fail to cancel flink job", e);
+ }
+ return getState();
}
@Override
public State waitUntilFinish() {
- return State.UNKNOWN;
+ return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));
}
@Override
public State waitUntilFinish(Duration duration) {
- return State.UNKNOWN;
+ long start = System.currentTimeMillis();
+ long durationInMillis = duration.getMillis();
+ State state = State.UNKNOWN;
+ while ((System.currentTimeMillis() - start) < durationInMillis) {
+ state = getState();
+ if (state.isTerminal()) {
+ return state;
+ }
+ try {
+ Thread.sleep(jobCheckIntervalInSecs * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (state != null && !state.isTerminal()) {
+ LOG.warn("Job is not finished in {} seconds",
duration.getStandardSeconds());
+ }
+ return state;
+ }
+
+ private State toBeamJobState(JobStatus flinkJobStatus) {
+ switch (flinkJobStatus) {
+ case CANCELLING:
+ case CREATED:
+ case INITIALIZING:
+ case FAILING:
+ case RECONCILING:
+ case RESTARTING:
+ case RUNNING:
+ return State.RUNNING;
+ case FINISHED:
+ return State.DONE;
+ case CANCELED:
+ return State.CANCELLED;
+ case FAILED:
+ return State.FAILED;
+ case SUSPENDED:
Review Comment:
should this return
[STOPPED](https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/PipelineResult.State.html#STOPPED)
beam state instead, where the job is in paused state and not in a terminal
state? Flink's
[suspended](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/JobStatus.html#SUSPENDED)
state also corresponds to paused state where the job may transition to
[restarting
state](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/internals/job_scheduling/)
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@ public interface FlinkPipelineOptions
void setNumberOfExecutionRetries(Integer retries);
+ @Description(
+ "Set job check interval in seconds under detached mode in method
waitUntilFinish, "
+ + "by default it is 5 seconds")
+ @Default.Integer(5)
+ int getJobCheckIntervalInSecs();
+
+ void setJobCheckIntervalInSecs(int seconds);
+
+ @Description("Set the attached mode")
Review Comment:
nit: probably can re-phrase the comment as "Specifies if the pipeline is
submitted in attached or detached mode"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]