Repository: flink
Updated Branches:
  refs/heads/master 5017cb48c -> 831349a61


[FLINK-8705] [flip6] Add Flip-6 support to Remote(Stream)Environment

This commit enables the Remote(Stream)Environment to submit jobs to a Flip-6
based cluster. It achieves this by instantiating a RestClusterClient instead
of a StandaloneClusterClient.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/facf2ac6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/facf2ac6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/facf2ac6

Branch: refs/heads/master
Commit: facf2ac67ae3ffd4cffd7c6f8536fd7953795fb9
Parents: 5017cb4
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Feb 19 13:28:41 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Feb 21 22:49:04 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/client/RemoteExecutor.java  |  8 +++++++-
 .../org/apache/flink/configuration/CoreOptions.java   |  5 +++++
 .../api/environment/RemoteStreamEnvironment.java      | 14 +++++++++++---
 ...JobManagerHAProcessFailureBatchRecoveryITCase.java |  2 ++
 .../TaskManagerProcessFailureBatchRecoveryITCase.java |  6 +++++-
 ...kManagerProcessFailureStreamingRecoveryITCase.java |  9 +++++++--
 6 files changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index fcf8bab..7551da0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -25,7 +25,9 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -147,7 +149,11 @@ public class RemoteExecutor extends PlanExecutor {
        public void start() throws Exception {
                synchronized (lock) {
                        if (client == null) {
-                               client = new 
StandaloneClusterClient(clientConfiguration);
+                               if 
(CoreOptions.OLD_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
+                                       client = new 
StandaloneClusterClient(clientConfiguration);
+                               } else {
+                                       client = new 
RestClusterClient<>(clientConfiguration, "RemoteExecutor");
+                               }
                                
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 9bd7fab..30c0cd6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -235,6 +235,11 @@ public class CoreOptions {
        public static final String FLIP6_MODE = "flip6";
 
        /**
+        * Constant value for the old execution mode.
+        */
+       public static final String OLD_MODE = "old";
+
+       /**
         * Switch to select the execution mode. Possible values are 'flip6' and 
'old'.
         */
        public static final ConfigOption<String> MODE = ConfigOptions

http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index a0b4a40..036cf4d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -21,10 +21,13 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
@@ -198,15 +201,20 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
                configuration.setString(JobManagerOptions.ADDRESS, host);
                configuration.setInteger(JobManagerOptions.PORT, port);
 
-               StandaloneClusterClient client;
+               final ClusterClient<?> client;
                try {
-                       client = new StandaloneClusterClient(configuration);
-                       
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+                       if 
(CoreOptions.OLD_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+                               client = new 
StandaloneClusterClient(configuration);
+                       } else {
+                               client = new RestClusterClient<>(configuration, 
"RemoteStreamEnvironment");
+                       }
                }
                catch (Exception e) {
                        throw new ProgramInvocationException("Cannot establish 
connection to JobManager: " + e.getMessage(), e);
                }
 
+               
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+
                try {
                        return client.run(streamGraph, jarFiles, 
globalClasspaths, usercodeClassLoader).getJobExecutionResult();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 357f7af..2b97de8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -156,6 +157,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
         */
        public void testJobManagerFailure(String zkQuorum, final File 
coordinateDir) throws Exception {
                Configuration config = new Configuration();
+               config.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkQuorum);
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
FileStateBackendBasePath.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index f700fb8..69fe7d6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -65,7 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase 
extends AbstractTaskMa
        @Override
        public void testTaskManagerFailure(int jobManagerPort, final File 
coordinateDir) throws Exception {
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
+               final Configuration configuration = new Configuration();
+               configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, 
configuration);
                env.setParallelism(PARALLELISM);
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10000));
                env.getConfig().setExecutionMode(executionMode);

http://git-wip-us.apache.org/repos/asf/flink/blob/facf2ac6/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index e3fa987..1ecbff3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -65,8 +66,12 @@ public class 
TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTa
 
                final File tempCheckpointDir = tempFolder.newFolder();
 
-               StreamExecutionEnvironment env = StreamExecutionEnvironment
-                               .createRemoteEnvironment("localhost", 
jobManagerPort);
+               final Configuration configuration = new Configuration();
+               configuration.setString(CoreOptions.MODE, CoreOptions.OLD_MODE);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                       "localhost",
+                       jobManagerPort,
+                       configuration);
                env.setParallelism(PARALLELISM);
                env.getConfig().disableSysoutLogging();
                env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
1000));

Reply via email to