This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b05ab524836c1c07d128610248fe372c8d697974
Author: Kostas Kloudas <[email protected]>
AuthorDate: Fri Jun 19 14:10:02 2020 +0200

    [FLINK-18352] Make 
DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
    
    This closes #12719.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  |  2 +-
 .../java/org/apache/flink/client/cli/GenericCLI.java   |  2 +-
 .../deployment/DefaultClusterClientServiceLoader.java  |  7 ++++---
 .../core/execution/DefaultExecutorServiceLoader.java   | 18 ++++++++----------
 .../apache/flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../api/environment/RemoteStreamEnvironment.java       |  2 +-
 .../api/environment/StreamExecutionEnvironment.java    |  2 +-
 .../table/client/gateway/local/ProgramDeployer.java    |  2 +-
 8 files changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 159507e..ef7d7c9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -696,7 +696,7 @@ public class CliFrontend {
        // 
--------------------------------------------------------------------------------------------
 
        protected void executeProgram(final Configuration configuration, final 
PackagedProgram program) throws ProgramInvocationException {
-               
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, 
configuration, program, false, false);
+               ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), 
configuration, program, false, false);
        }
 
        /**
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
index 945779b..37628b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java
@@ -146,7 +146,7 @@ public class GenericCLI implements CustomCommandLine {
        }
 
        private static String getExecutorFactoryNames() {
-               return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames()
+               return new DefaultExecutorServiceLoader().getExecutorNames()
                                .map(name -> String.format("\"%s\"", name))
                                .collect(Collectors.joining(", "));
        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index b2c43a2..ec56abe 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -38,14 +38,15 @@ public class DefaultClusterClientServiceLoader implements 
ClusterClientServiceLo
 
        private static final Logger LOG = 
LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
 
-       private static final ServiceLoader<ClusterClientFactory> defaultLoader 
= ServiceLoader.load(ClusterClientFactory.class);
-
        @Override
        public <ClusterID> ClusterClientFactory<ClusterID> 
getClusterClientFactory(final Configuration configuration) {
                checkNotNull(configuration);
 
+               final ServiceLoader<ClusterClientFactory> loader =
+                               ServiceLoader.load(ClusterClientFactory.class);
+
                final List<ClusterClientFactory> compatibleFactories = new 
ArrayList<>();
-               final Iterator<ClusterClientFactory> factories = 
defaultLoader.iterator();
+               final Iterator<ClusterClientFactory> factories = 
loader.iterator();
                while (factories.hasNext()) {
                        try {
                                final ClusterClientFactory factory = 
factories.next();
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 6b07f7f..b5e9f08 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements 
PipelineExecutorServiceLoad
 
        private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
 
-       private static final ServiceLoader<PipelineExecutorFactory> 
defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
-
-       public static final DefaultExecutorServiceLoader INSTANCE = new 
DefaultExecutorServiceLoader();
-
        @Override
        public PipelineExecutorFactory getExecutorFactory(final Configuration 
configuration) {
                checkNotNull(configuration);
 
+               final ServiceLoader<PipelineExecutorFactory> loader =
+                               
ServiceLoader.load(PipelineExecutorFactory.class);
+
                final List<PipelineExecutorFactory> compatibleFactories = new 
ArrayList<>();
-               final Iterator<PipelineExecutorFactory> factories = 
defaultLoader.iterator();
+               final Iterator<PipelineExecutorFactory> factories = 
loader.iterator();
                while (factories.hasNext()) {
                        try {
                                final PipelineExecutorFactory factory = 
factories.next();
@@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements 
PipelineExecutorServiceLoad
 
        @Override
        public Stream<String> getExecutorNames() {
-               return StreamSupport.stream(defaultLoader.spliterator(), false)
-                               .map(PipelineExecutorFactory::getName);
-       }
+               final ServiceLoader<PipelineExecutorFactory> loader =
+                               
ServiceLoader.load(PipelineExecutorFactory.class);
 
-       private DefaultExecutorServiceLoader() {
-               // make sure nobody instantiates us explicitly.
+               return StreamSupport.stream(loader.spliterator(), false)
+                               .map(PipelineExecutorFactory::getName);
        }
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9f0089d..910c419 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -157,7 +157,7 @@ public class ExecutionEnvironment {
         */
        @PublicEvolving
        public ExecutionEnvironment(final Configuration configuration, final 
ClassLoader userClassloader) {
-               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
userClassloader);
+               this(new DefaultExecutorServiceLoader(), configuration, 
userClassloader);
        }
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 560633e..8dd09f9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -141,7 +141,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
         */
        @PublicEvolving
        public RemoteStreamEnvironment(String host, int port, Configuration 
clientConfiguration, String[] jarFiles, URL[] globalClasspaths, 
SavepointRestoreSettings savepointRestoreSettings) {
-               this(DefaultExecutorServiceLoader.INSTANCE, host, port, 
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
+               this(new DefaultExecutorServiceLoader(), host, port, 
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
        }
 
        @PublicEvolving
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c688506..7de2e97 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -203,7 +203,7 @@ public class StreamExecutionEnvironment {
        public StreamExecutionEnvironment(
                        final Configuration configuration,
                        final ClassLoader userClassloader) {
-               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
userClassloader);
+               this(new DefaultExecutorServiceLoader(), configuration, 
userClassloader);
        }
 
        /**
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 0070b1f..31b5143 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -68,7 +68,7 @@ public class ProgramDeployer {
                        throw new RuntimeException("No execution.target 
specified in your configuration file.");
                }
 
-               PipelineExecutorServiceLoader executorServiceLoader = 
DefaultExecutorServiceLoader.INSTANCE;
+               PipelineExecutorServiceLoader executorServiceLoader = new 
DefaultExecutorServiceLoader();
                final PipelineExecutorFactory executorFactory;
                try {
                        executorFactory = 
executorServiceLoader.getExecutorFactory(configuration);

Reply via email to