Repository: flink Updated Branches: refs/heads/master 5017cb48c -> 831349a61
[FLINK-8705] [flip6] Add Flip-6 support to Remote(Stream)Environment This commit enables the Remote(Stream)Environment to submit jobs to a Flip-6 based cluster. It achieves this by instantiating a RestClusterClient instead of a StandaloneClusterClient. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/facf2ac6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/facf2ac6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/facf2ac6 Branch: refs/heads/master Commit: facf2ac67ae3ffd4cffd7c6f8536fd7953795fb9 Parents: 5017cb4 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Feb 19 13:28:41 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Feb 21 22:49:04 2018 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/client/RemoteExecutor.java | 8 +++++++- .../org/apache/flink/configuration/CoreOptions.java | 5 +++++ .../api/environment/RemoteStreamEnvironment.java | 14 +++++++++++--- ...JobManagerHAProcessFailureBatchRecoveryITCase.java | 2 ++ .../TaskManagerProcessFailureBatchRecoveryITCase.java | 6 +++++- ...kManagerProcessFailureStreamingRecoveryITCase.java | 9 +++++++-- 6 files changed, 37 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index fcf8bab..7551da0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -25,7 +25,9 @@ import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -147,7 +149,11 @@ public class RemoteExecutor extends PlanExecutor { public void start() throws Exception { synchronized (lock) { if (client == null) { - client = new StandaloneClusterClient(clientConfiguration); + if (CoreOptions.OLD_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) { + client = new StandaloneClusterClient(clientConfiguration); + } else { + client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor"); + } client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 9bd7fab..30c0cd6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -235,6 +235,11 @@ public class CoreOptions { public static final String FLIP6_MODE = "flip6"; /** + * Constant value for the old execution mode. + */ + public static final String OLD_MODE = "old"; + + /** * Switch to select the execution mode. Possible values are 'flip6' and 'old'. */ public static final ConfigOption<String> MODE = ConfigOptions http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index a0b4a40..036cf4d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -21,10 +21,13 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -198,15 +201,20 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { configuration.setString(JobManagerOptions.ADDRESS, host); configuration.setInteger(JobManagerOptions.PORT, port); - StandaloneClusterClient client; + final ClusterClient<?> client; try { - client = new StandaloneClusterClient(configuration); - client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); + if (CoreOptions.OLD_MODE.equals(configuration.getString(CoreOptions.MODE))) { + client = new StandaloneClusterClient(configuration); + } else { + client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment"); + } } catch (Exception e) { throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e); } + client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); + try { return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult(); } http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 357f7af..2b97de8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; @@ -156,6 +157,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { */ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception { Configuration config = new Configuration(); + config.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java index f700fb8..69fe7d6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java @@ -24,6 +24,8 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -65,7 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskMa @Override public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); + final Configuration configuration = new Configuration(); + configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration); env.setParallelism(PARALLELISM); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10000)); env.getConfig().setExecutionMode(executionMode); http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java index e3fa987..1ecbff3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -65,8 +66,12 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa final File tempCheckpointDir = tempFolder.newFolder(); - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createRemoteEnvironment("localhost", jobManagerPort); + final Configuration configuration = new Configuration(); + configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE); + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", + jobManagerPort, + configuration); env.setParallelism(PARALLELISM); env.getConfig().disableSysoutLogging(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));