TisonKun commented on a change in pull request #10346: [FLINK-14972] Make 
Remote(Stream)Environment use Executors.
URL: https://github.com/apache/flink/pull/10346#discussion_r352151318
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##########
 @@ -171,116 +142,82 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
         */
        @PublicEvolving
        public RemoteStreamEnvironment(String host, int port, Configuration 
clientConfiguration, String[] jarFiles, URL[] globalClasspaths, 
SavepointRestoreSettings savepointRestoreSettings) {
-               if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
-                       throw new InvalidProgramException(
-                                       "The RemoteEnvironment cannot be used 
when submitting a program through a client, " +
-                                                       "or running in a 
TestEnvironment context.");
-               }
-
-               if (host == null) {
-                       throw new NullPointerException("Host must not be 
null.");
-               }
-               if (port < 1 || port >= 0xffff) {
-                       throw new IllegalArgumentException("Port out of range");
-               }
-
-               this.host = host;
-               this.port = port;
-               this.clientConfiguration = clientConfiguration == null ? new 
Configuration() : clientConfiguration;
-               this.jarFiles = new ArrayList<>(jarFiles.length);
-               for (String jarFile : jarFiles) {
-                       try {
-                               URL jarFileUrl = new 
File(jarFile).getAbsoluteFile().toURI().toURL();
-                               this.jarFiles.add(jarFileUrl);
-                               ClientUtils.checkJarFile(jarFileUrl);
-                       } catch (MalformedURLException e) {
-                               throw new IllegalArgumentException("JAR file 
path is invalid '" + jarFile + "'", e);
-                       } catch (IOException e) {
-                               throw new RuntimeException("Problem with jar 
file " + jarFile, e);
-                       }
-               }
-               if (globalClasspaths == null) {
-                       this.globalClasspaths = Collections.emptyList();
-               }
-               else {
-                       this.globalClasspaths = Arrays.asList(globalClasspaths);
-               }
-               this.savepointRestoreSettings = savepointRestoreSettings;
+               this(DefaultExecutorServiceLoader.INSTANCE, host, port, 
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
        }
 
-       /**
-        * Executes the job remotely.
-        *
-        * <p>This method can be used independent of the {@link 
StreamExecutionEnvironment} type.
-        * @return The result of the job execution, containing elapsed time and 
accumulators.
-        */
        @PublicEvolving
-       public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
-               List<URL> jarFiles,
-               String host,
-               int port,
-               Configuration clientConfiguration,
-               List<URL> globalClasspaths,
-               String jobName,
-               SavepointRestoreSettings savepointRestoreSettings
-       ) throws ProgramInvocationException {
-               StreamGraph streamGraph = 
streamExecutionEnvironment.getStreamGraph(jobName);
-               return executeRemotely(streamGraph,
-                       streamExecutionEnvironment.getConfig(),
-                       jarFiles,
-                       host,
-                       port,
-                       clientConfiguration,
-                       globalClasspaths,
-                       savepointRestoreSettings);
+       public RemoteStreamEnvironment(
+                       final ExecutorServiceLoader executorServiceLoader,
+                       final String host,
+                       final int port,
+                       final Configuration clientConfiguration,
+                       final String[] jarFiles,
+                       final URL[] globalClasspaths,
+                       final SavepointRestoreSettings 
savepointRestoreSettings) {
+               super(
+                               executorServiceLoader,
+                               
validateAndGetEffectiveConfiguration(clientConfiguration, host, port, jarFiles, 
globalClasspaths, savepointRestoreSettings),
+                               null
+               );
        }
 
-       /**
-        * Execute the given stream graph remotely.
-        *
-        * <p>Method for internal use since it exposes stream graph and other 
implementation details that are subject to change.
-        * @throws ProgramInvocationException
-        */
-       private static JobExecutionResult executeRemotely(StreamGraph 
streamGraph,
-               ExecutionConfig executionConfig,
-               List<URL> jarFiles,
-               String host,
-               int port,
-               Configuration clientConfiguration,
-               List<URL> globalClasspaths,
-               SavepointRestoreSettings savepointRestoreSettings
-       ) throws ProgramInvocationException {
-               if (LOG.isInfoEnabled()) {
-                       LOG.info("Running remotely at {}:{}", host, port);
-               }
+       private static Configuration getClientConfiguration(final Configuration 
configuration) {
+               return configuration == null ? new Configuration() : 
configuration;
+       }
 
-               Configuration configuration = new Configuration();
-               configuration.addAll(clientConfiguration);
+       private static List<URL> getClasspathURLs(final URL[] classpaths) {
+               return classpaths == null ? Collections.emptyList() : 
Arrays.asList(classpaths);
+       }
 
-               configuration.setString(JobManagerOptions.ADDRESS, host);
-               configuration.setInteger(JobManagerOptions.PORT, port);
+       private static Configuration validateAndGetEffectiveConfiguration(
+                       final Configuration configuration,
+                       final String host,
+                       final int port,
+                       final String[] jarFiles,
+                       final URL[] classpaths,
+                       final SavepointRestoreSettings 
savepointRestoreSettings) {
+               RemoteEnvironmentConfigUtils.validate(host, port);
+               return getEffectiveConfiguration(
+                               getClientConfiguration(configuration),
+                               host,
+                               port,
+                               jarFiles,
+                               getClasspathURLs(classpaths),
+                               savepointRestoreSettings);
+       }
 
-               configuration.setInteger(RestOptions.PORT, port);
+       private static Configuration getEffectiveConfiguration(
+                       final Configuration baseConfiguration,
+                       final String host,
+                       final int port,
+                       final String[] jars,
+                       final List<URL> classpaths,
+                       final SavepointRestoreSettings 
savepointRestoreSettings) {
 
-               final ClusterClient<?> client;
-               try {
-                       client = new RestClusterClient<>(configuration, 
"RemoteStreamEnvironment");
-               }
-               catch (Exception e) {
-                       throw new ProgramInvocationException("Cannot establish 
connection to JobManager: " + e.getMessage(), e);
-               }
+               final Configuration effectiveConfiguration = new 
Configuration(baseConfiguration);
+
+               RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host, 
port, effectiveConfiguration);
+               RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars, 
effectiveConfiguration);
+               ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, 
PipelineOptions.CLASSPATHS, classpaths, URL::toString);
 
                if (savepointRestoreSettings != null) {
-                       
streamGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+                       
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, 
effectiveConfiguration);
+               } else {
+                       
SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(), 
effectiveConfiguration);
                }
 
-               try {
-                       final PlanExecutor executor = 
PlanExecutor.createRemoteExecutor(
-                                       host,
-                                       port,
-                                       clientConfiguration);
+               // these should be set in the end to overwrite any values from 
the client config provided in the constructor.
+               effectiveConfiguration.setString(DeploymentOptions.TARGET, 
"remote-cluster");
+               effectiveConfiguration.setBoolean(DeploymentOptions.ATTACHED, 
true);
+
+               return effectiveConfiguration;
+       }
 
-                       return executor.executePlan(streamGraph, jarFiles, 
globalClasspaths).getJobExecutionResult();
+       @Override
+       public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
+               transformations.clear();
+               try {
+                       return super.execute(streamGraph);
                }
                catch (ProgramInvocationException e) {
 
 Review comment:
   Maybe we just
   
   ```java
   return super.execute(steamGraph);
   ```
   
   which keep the same as batch codepath. The concern here is that 
`ProgramInvocationException` is inside flink-client and in the near further we 
hopefully reverse the dependency from flink-streaming-java to flink-client.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to