TisonKun commented on a change in pull request #10346: [FLINK-14972] Make
Remote(Stream)Environment use Executors.
URL: https://github.com/apache/flink/pull/10346#discussion_r352153355
##########
File path:
flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
##########
@@ -111,58 +93,59 @@ public RemoteEnvironment(String host, int port,
Configuration clientConfig, Stri
* protocol (e.g. file://) and be accessible on all
nodes (e.g. by means of a NFS share).
* The protocol must be supported by the {@link
java.net.URLClassLoader}.
*/
- public RemoteEnvironment(String host, int port, Configuration
clientConfig,
- String[] jarFiles, URL[] globalClasspaths) {
- if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
- throw new InvalidProgramException(
- "The RemoteEnvironment cannot be
instantiated when running in a pre-defined context " +
- "(such as Command Line
Client, Scala Shell, or TestEnvironment)");
- }
- if (host == null) {
- throw new NullPointerException("Host must not be
null.");
- }
- if (port < 1 || port >= 0xffff) {
- throw new IllegalArgumentException("Port out of range");
- }
-
- this.host = host;
- this.port = port;
- this.clientConfiguration = clientConfig == null ? new
Configuration() : clientConfig;
- if (jarFiles != null) {
- this.jarFiles = new ArrayList<>(jarFiles.length);
- for (String jarFile : jarFiles) {
- try {
- this.jarFiles.add(new
File(jarFile).getAbsoluteFile().toURI().toURL());
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("JAR
file path invalid", e);
- }
- }
- }
- else {
- this.jarFiles = Collections.emptyList();
- }
-
- if (globalClasspaths == null) {
- this.globalClasspaths = Collections.emptyList();
- } else {
- this.globalClasspaths = Arrays.asList(globalClasspaths);
- }
+ public RemoteEnvironment(String host, int port, Configuration
clientConfig, String[] jarFiles, URL[] globalClasspaths) {
+ super(validateAndGetEffectiveConfiguration(clientConfig, host,
port, jarFiles, globalClasspaths));
}
- //
------------------------------------------------------------------------
+ private static Configuration validateAndGetEffectiveConfiguration(
+ final Configuration configuration,
+ final String host,
+ final int port,
+ final String[] jarFiles,
+ final URL[] globalClasspaths) {
+ RemoteEnvironmentConfigUtils.validate(host, port);
+ return getEffectiveConfiguration(
+ getClientConfiguration(configuration),
+ host,
+ port,
+ jarFiles,
+ getClasspathURLs(globalClasspaths));
+ }
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- final Plan p = createProgramPlan(jobName);
+ private static Configuration getClientConfiguration(final Configuration
configuration) {
+ return configuration == null ? new Configuration() :
configuration;
+ }
- final PlanExecutor executor =
PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
- lastJobExecutionResult = executor.executePlan(p, jarFiles,
globalClasspaths);
- return lastJobExecutionResult;
+ private static List<URL> getClasspathURLs(final URL[] classpaths) {
+ return classpaths == null ? Collections.emptyList() :
Arrays.asList(classpaths);
+ }
+
+ private static Configuration getEffectiveConfiguration(
+ final Configuration baseConfiguration,
+ final String host,
+ final int port,
+ final String[] jars,
+ final List<URL> classpaths) {
+
+ final Configuration effectiveConfiguration = new
Configuration(baseConfiguration);
+
+ RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host,
port, effectiveConfiguration);
+ RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars,
effectiveConfiguration);
+ ConfigUtils.encodeCollectionToConfig(effectiveConfiguration,
PipelineOptions.CLASSPATHS, classpaths, URL::toString);
+
+ // these should be set in the end to overwrite any values from
the client config provided in the constructor.
+ effectiveConfiguration.setString(DeploymentOptions.TARGET,
"remote-cluster");
Review comment:
TBH it is a bit brittle we use string literal here, instead of
RemoteExecutor.NAME. I'm thinking what should a proper module dependency look
like
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services