This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b1fd1fcc80657215ad325a9191ae5e3e77fffda
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 18 14:20:46 2019 +0100

    [hotfix] Make the DefaultExecutorServiceLoader a singleton
---
 .../flink/core/execution/DefaultExecutorServiceLoader.java   | 12 +++++++++---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java |  2 +-
 .../api/environment/StreamExecutionEnvironment.java          |  2 +-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 241feab..8bde967 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -45,6 +45,8 @@ public class DefaultExecutorServiceLoader implements 
ExecutorServiceLoader {
 
        private static final ServiceLoader<ExecutorFactory> defaultLoader = 
ServiceLoader.load(ExecutorFactory.class);
 
+       public static final DefaultExecutorServiceLoader INSTANCE = new 
DefaultExecutorServiceLoader();
+
        @Override
        public ExecutorFactory getExecutorFactory(final Configuration 
configuration) {
                checkNotNull(configuration);
@@ -67,14 +69,18 @@ public class DefaultExecutorServiceLoader implements 
ExecutorServiceLoader {
                }
 
                if (compatibleFactories.size() > 1) {
-                       final List<String> configStr =
+                       final String configStr =
                                        
configuration.toMap().entrySet().stream()
                                                        .map(e -> e.getKey() + 
"=" + e.getValue())
-                                                       
.collect(Collectors.toList());
+                                                       
.collect(Collectors.joining("\n"));
 
-                       throw new IllegalStateException("Multiple compatible 
client factories found for:\n" + String.join("\n", configStr) + ".");
+                       throw new IllegalStateException("Multiple compatible 
client factories found for:\n" + configStr + ".");
                }
 
                return compatibleFactories.isEmpty() ? null : 
compatibleFactories.get(0);
        }
+
+       private DefaultExecutorServiceLoader() {
+               // make sure nobody instantiates us explicitly.
+       }
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..26632e6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -140,7 +140,7 @@ public class ExecutionEnvironment {
        }
 
        protected ExecutionEnvironment(final Configuration 
executorConfiguration) {
-               this(new DefaultExecutorServiceLoader(), executorConfiguration);
+               this(DefaultExecutorServiceLoader.INSTANCE, 
executorConfiguration);
        }
 
        protected ExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..ba702ea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,7 +163,7 @@ public class StreamExecutionEnvironment {
        }
 
        public StreamExecutionEnvironment(final Configuration 
executorConfiguration) {
-               this(new DefaultExecutorServiceLoader(), executorConfiguration);
+               this(DefaultExecutorServiceLoader.INSTANCE, 
executorConfiguration);
        }
 
        public StreamExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {

Reply via email to