TisonKun commented on a change in pull request #10346: [FLINK-14972] Make
Remote(Stream)Environment use Executors.
URL: https://github.com/apache/flink/pull/10346#discussion_r352151318
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
##########
@@ -171,116 +142,82 @@ public RemoteStreamEnvironment(String host, int port,
Configuration clientConfig
*/
@PublicEvolving
public RemoteStreamEnvironment(String host, int port, Configuration
clientConfiguration, String[] jarFiles, URL[] globalClasspaths,
SavepointRestoreSettings savepointRestoreSettings) {
- if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
- throw new InvalidProgramException(
- "The RemoteEnvironment cannot be used
when submitting a program through a client, " +
- "or running in a
TestEnvironment context.");
- }
-
- if (host == null) {
- throw new NullPointerException("Host must not be
null.");
- }
- if (port < 1 || port >= 0xffff) {
- throw new IllegalArgumentException("Port out of range");
- }
-
- this.host = host;
- this.port = port;
- this.clientConfiguration = clientConfiguration == null ? new
Configuration() : clientConfiguration;
- this.jarFiles = new ArrayList<>(jarFiles.length);
- for (String jarFile : jarFiles) {
- try {
- URL jarFileUrl = new
File(jarFile).getAbsoluteFile().toURI().toURL();
- this.jarFiles.add(jarFileUrl);
- ClientUtils.checkJarFile(jarFileUrl);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("JAR file
path is invalid '" + jarFile + "'", e);
- } catch (IOException e) {
- throw new RuntimeException("Problem with jar
file " + jarFile, e);
- }
- }
- if (globalClasspaths == null) {
- this.globalClasspaths = Collections.emptyList();
- }
- else {
- this.globalClasspaths = Arrays.asList(globalClasspaths);
- }
- this.savepointRestoreSettings = savepointRestoreSettings;
+ this(DefaultExecutorServiceLoader.INSTANCE, host, port,
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
}
- /**
- * Executes the job remotely.
- *
- * <p>This method can be used independent of the {@link
StreamExecutionEnvironment} type.
- * @return The result of the job execution, containing elapsed time and
accumulators.
- */
@PublicEvolving
- public static JobExecutionResult
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
- List<URL> jarFiles,
- String host,
- int port,
- Configuration clientConfiguration,
- List<URL> globalClasspaths,
- String jobName,
- SavepointRestoreSettings savepointRestoreSettings
- ) throws ProgramInvocationException {
- StreamGraph streamGraph =
streamExecutionEnvironment.getStreamGraph(jobName);
- return executeRemotely(streamGraph,
- streamExecutionEnvironment.getConfig(),
- jarFiles,
- host,
- port,
- clientConfiguration,
- globalClasspaths,
- savepointRestoreSettings);
+ public RemoteStreamEnvironment(
+ final ExecutorServiceLoader executorServiceLoader,
+ final String host,
+ final int port,
+ final Configuration clientConfiguration,
+ final String[] jarFiles,
+ final URL[] globalClasspaths,
+ final SavepointRestoreSettings
savepointRestoreSettings) {
+ super(
+ executorServiceLoader,
+
validateAndGetEffectiveConfiguration(clientConfiguration, host, port, jarFiles,
globalClasspaths, savepointRestoreSettings),
+ null
+ );
}
- /**
- * Execute the given stream graph remotely.
- *
- * <p>Method for internal use since it exposes stream graph and other
implementation details that are subject to change.
- * @throws ProgramInvocationException
- */
- private static JobExecutionResult executeRemotely(StreamGraph
streamGraph,
- ExecutionConfig executionConfig,
- List<URL> jarFiles,
- String host,
- int port,
- Configuration clientConfiguration,
- List<URL> globalClasspaths,
- SavepointRestoreSettings savepointRestoreSettings
- ) throws ProgramInvocationException {
- if (LOG.isInfoEnabled()) {
- LOG.info("Running remotely at {}:{}", host, port);
- }
+ private static Configuration getClientConfiguration(final Configuration
configuration) {
+ return configuration == null ? new Configuration() :
configuration;
+ }
- Configuration configuration = new Configuration();
- configuration.addAll(clientConfiguration);
+ private static List<URL> getClasspathURLs(final URL[] classpaths) {
+ return classpaths == null ? Collections.emptyList() :
Arrays.asList(classpaths);
+ }
- configuration.setString(JobManagerOptions.ADDRESS, host);
- configuration.setInteger(JobManagerOptions.PORT, port);
+ private static Configuration validateAndGetEffectiveConfiguration(
+ final Configuration configuration,
+ final String host,
+ final int port,
+ final String[] jarFiles,
+ final URL[] classpaths,
+ final SavepointRestoreSettings
savepointRestoreSettings) {
+ RemoteEnvironmentConfigUtils.validate(host, port);
+ return getEffectiveConfiguration(
+ getClientConfiguration(configuration),
+ host,
+ port,
+ jarFiles,
+ getClasspathURLs(classpaths),
+ savepointRestoreSettings);
+ }
- configuration.setInteger(RestOptions.PORT, port);
+ private static Configuration getEffectiveConfiguration(
+ final Configuration baseConfiguration,
+ final String host,
+ final int port,
+ final String[] jars,
+ final List<URL> classpaths,
+ final SavepointRestoreSettings
savepointRestoreSettings) {
- final ClusterClient<?> client;
- try {
- client = new RestClusterClient<>(configuration,
"RemoteStreamEnvironment");
- }
- catch (Exception e) {
- throw new ProgramInvocationException("Cannot establish
connection to JobManager: " + e.getMessage(), e);
- }
+ final Configuration effectiveConfiguration = new
Configuration(baseConfiguration);
+
+ RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host,
port, effectiveConfiguration);
+ RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars,
effectiveConfiguration);
+ ConfigUtils.encodeCollectionToConfig(effectiveConfiguration,
PipelineOptions.CLASSPATHS, classpaths, URL::toString);
if (savepointRestoreSettings != null) {
-
streamGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
effectiveConfiguration);
+ } else {
+
SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(),
effectiveConfiguration);
}
- try {
- final PlanExecutor executor =
PlanExecutor.createRemoteExecutor(
- host,
- port,
- clientConfiguration);
+ // these should be set in the end to overwrite any values from
the client config provided in the constructor.
+ effectiveConfiguration.setString(DeploymentOptions.TARGET,
"remote-cluster");
+ effectiveConfiguration.setBoolean(DeploymentOptions.ATTACHED,
true);
+
+ return effectiveConfiguration;
+ }
- return executor.executePlan(streamGraph, jarFiles,
globalClasspaths).getJobExecutionResult();
+ @Override
+ public JobExecutionResult execute(StreamGraph streamGraph) throws
Exception {
+ transformations.clear();
+ try {
+ return super.execute(streamGraph);
}
catch (ProgramInvocationException e) {
Review comment:
Maybe we just
```java
return super.execute(steamGraph);
```
which keep the same as batch codepath. The concern here is that
`ProgramInvocationException` is inside flink-client and in the near further we
hopefully reverse the dependency from flink-streaming-java to flink-client.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services