This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit d29867f427f132f46ebedee61702c2969048f7de Author: Kostas Kloudas <[email protected]> AuthorDate: Mon Nov 18 11:27:35 2019 +0100 [FLINK-XXXXX] Make DefaultExecutorServiceLoader a singleton. --- .../src/main/java/org/apache/flink/client/cli/CliFrontend.java | 2 +- .../flink/core/execution/DefaultExecutorServiceLoader.java | 9 ++++++++- .../java/org/apache/flink/api/java/ExecutionEnvironment.java | 2 +- .../streaming/api/environment/StreamExecutionEnvironment.java | 2 +- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 8dfe306..408d478 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -669,7 +669,7 @@ public class CliFrontend { logAndSysout("Starting execution of program"); - final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader(); + final ExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE; final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program); if (result.isJobExecutionResult()) { diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java index 297b17e..7122167 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -18,6 +18,7 @@ package org.apache.flink.core.execution; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; @@ -34,8 +35,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses * Java service discovery to find the available {@link ExecutorFactory executor factories}. - * MAKE IT A SINGLETON. */ +@Internal public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { // TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation. @@ -46,6 +47,12 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class); + public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader(); + + private DefaultExecutorServiceLoader() { + // make sure nobody instantiates us explicitly. + } + @Override public ExecutorFactory getExecutorFactory(final Configuration configuration) { checkNotNull(configuration); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index c600eb9..df2cb0c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -143,7 +143,7 @@ public class ExecutionEnvironment { } protected ExecutionEnvironment(final Configuration executorConfiguration) { - this(new DefaultExecutorServiceLoader(), executorConfiguration); + this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration); } protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index fdaaae0..c51064f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -166,7 +166,7 @@ public class StreamExecutionEnvironment { } public StreamExecutionEnvironment(final Configuration executorConfiguration) { - this(new DefaultExecutorServiceLoader(), executorConfiguration); + this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration); } public StreamExecutionEnvironment(
