TisonKun commented on a change in pull request #10346: [FLINK-14972] Make 
Remote(Stream)Environment use Executors.
URL: https://github.com/apache/flink/pull/10346#discussion_r352153355
 
 

 ##########
 File path: 
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
 ##########
 @@ -111,58 +93,59 @@ public RemoteEnvironment(String host, int port, 
Configuration clientConfig, Stri
         *                 protocol (e.g. file://) and be accessible on all 
nodes (e.g. by means of a NFS share).
         *                 The protocol must be supported by the {@link 
java.net.URLClassLoader}.
         */
-       public RemoteEnvironment(String host, int port, Configuration 
clientConfig,
-                       String[] jarFiles, URL[] globalClasspaths) {
-               if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
-                       throw new InvalidProgramException(
-                                       "The RemoteEnvironment cannot be 
instantiated when running in a pre-defined context " +
-                                                       "(such as Command Line 
Client, Scala Shell, or TestEnvironment)");
-               }
-               if (host == null) {
-                       throw new NullPointerException("Host must not be 
null.");
-               }
-               if (port < 1 || port >= 0xffff) {
-                       throw new IllegalArgumentException("Port out of range");
-               }
-
-               this.host = host;
-               this.port = port;
-               this.clientConfiguration = clientConfig == null ? new 
Configuration() : clientConfig;
-               if (jarFiles != null) {
-                       this.jarFiles = new ArrayList<>(jarFiles.length);
-                       for (String jarFile : jarFiles) {
-                               try {
-                                       this.jarFiles.add(new 
File(jarFile).getAbsoluteFile().toURI().toURL());
-                               } catch (MalformedURLException e) {
-                                       throw new IllegalArgumentException("JAR 
file path invalid", e);
-                               }
-                       }
-               }
-               else {
-                       this.jarFiles = Collections.emptyList();
-               }
-
-               if (globalClasspaths == null) {
-                       this.globalClasspaths = Collections.emptyList();
-               } else {
-                       this.globalClasspaths = Arrays.asList(globalClasspaths);
-               }
+       public RemoteEnvironment(String host, int port, Configuration 
clientConfig, String[] jarFiles, URL[] globalClasspaths) {
+               super(validateAndGetEffectiveConfiguration(clientConfig, host, 
port, jarFiles, globalClasspaths));
        }
 
-       // 
------------------------------------------------------------------------
+       private static Configuration validateAndGetEffectiveConfiguration(
+                       final Configuration configuration,
+                       final String host,
+                       final int port,
+                       final String[] jarFiles,
+                       final URL[] globalClasspaths) {
+               RemoteEnvironmentConfigUtils.validate(host, port);
+               return getEffectiveConfiguration(
+                               getClientConfiguration(configuration),
+                               host,
+                               port,
+                               jarFiles,
+                               getClasspathURLs(globalClasspaths));
+       }
 
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               final Plan p = createProgramPlan(jobName);
+       private static Configuration getClientConfiguration(final Configuration 
configuration) {
+               return configuration == null ? new Configuration() : 
configuration;
+       }
 
-               final PlanExecutor executor = 
PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
-               lastJobExecutionResult = executor.executePlan(p, jarFiles, 
globalClasspaths);
-               return lastJobExecutionResult;
+       private static List<URL> getClasspathURLs(final URL[] classpaths) {
+               return classpaths == null ? Collections.emptyList() : 
Arrays.asList(classpaths);
+       }
+
+       private static Configuration getEffectiveConfiguration(
+                       final Configuration baseConfiguration,
+                       final String host,
+                       final int port,
+                       final String[] jars,
+                       final List<URL> classpaths) {
+
+               final Configuration effectiveConfiguration = new 
Configuration(baseConfiguration);
+
+               RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host, 
port, effectiveConfiguration);
+               RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars, 
effectiveConfiguration);
+               ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, 
PipelineOptions.CLASSPATHS, classpaths, URL::toString);
+
+               // these should be set in the end to overwrite any values from 
the client config provided in the constructor.
+               effectiveConfiguration.setString(DeploymentOptions.TARGET, 
"remote-cluster");
 
 Review comment:
   TBH it is a bit brittle we use string literal here, instead of 
RemoteExecutor.NAME. I'm thinking what should a proper module dependency look 
like 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to