This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d29867f427f132f46ebedee61702c2969048f7de
Author: Kostas Kloudas <[email protected]>
AuthorDate: Mon Nov 18 11:27:35 2019 +0100

    [FLINK-XXXXX] Make DefaultExecutorServiceLoader a singleton.
---
 .../src/main/java/org/apache/flink/client/cli/CliFrontend.java   | 2 +-
 .../flink/core/execution/DefaultExecutorServiceLoader.java       | 9 ++++++++-
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java     | 2 +-
 .../streaming/api/environment/StreamExecutionEnvironment.java    | 2 +-
 4 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 8dfe306..408d478 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -669,7 +669,7 @@ public class CliFrontend {
 
                logAndSysout("Starting execution of program");
 
-               final ExecutorServiceLoader executorServiceLoader = new 
DefaultExecutorServiceLoader();
+               final ExecutorServiceLoader executorServiceLoader = 
DefaultExecutorServiceLoader.INSTANCE;
                final JobSubmissionResult result = 
ClientUtils.executeProgram(executorServiceLoader, configuration, program);
 
                if (result.isJobExecutionResult()) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 297b17e..7122167 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
@@ -34,8 +35,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The default implementation of the {@link ExecutorServiceLoader}. This 
implementation uses
  * Java service discovery to find the available {@link ExecutorFactory 
executor factories}.
- * MAKE IT A SINGLETON.
  */
+@Internal
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
        // TODO: This code is almost identical to the 
ClusterClientServiceLoader and its default implementation.
@@ -46,6 +47,12 @@ public class DefaultExecutorServiceLoader implements 
ExecutorServiceLoader {
 
        private static final ServiceLoader<ExecutorFactory> defaultLoader = 
ServiceLoader.load(ExecutorFactory.class);
 
+       public static final DefaultExecutorServiceLoader INSTANCE = new 
DefaultExecutorServiceLoader();
+
+       private DefaultExecutorServiceLoader() {
+               // make sure nobody instantiates us explicitly.
+       }
+
        @Override
        public ExecutorFactory getExecutorFactory(final Configuration 
configuration) {
                checkNotNull(configuration);
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index c600eb9..df2cb0c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -143,7 +143,7 @@ public class ExecutionEnvironment {
        }
 
        protected ExecutionEnvironment(final Configuration 
executorConfiguration) {
-               this(new DefaultExecutorServiceLoader(), executorConfiguration);
+               this(DefaultExecutorServiceLoader.INSTANCE, 
executorConfiguration);
        }
 
        protected ExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index fdaaae0..c51064f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -166,7 +166,7 @@ public class StreamExecutionEnvironment {
        }
 
        public StreamExecutionEnvironment(final Configuration 
executorConfiguration) {
-               this(new DefaultExecutorServiceLoader(), executorConfiguration);
+               this(DefaultExecutorServiceLoader.INSTANCE, 
executorConfiguration);
        }
 
        public StreamExecutionEnvironment(

Reply via email to