This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new dcd7574  [FLINK-18352] Make 
DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
dcd7574 is described below

commit dcd7574daccfbbacf99d101a3f0f852e332e92a7
Author: Kostas Kloudas <[email protected]>
AuthorDate: Mon Jun 22 16:05:01 2020 +0200

    [FLINK-18352] Make 
DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  |  2 +-
 .../java/org/apache/flink/client/cli/ExecutorCLI.java  |  2 +-
 .../deployment/DefaultClusterClientServiceLoader.java  |  9 ++++++---
 .../core/execution/DefaultExecutorServiceLoader.java   | 18 ++++++++----------
 .../apache/flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../api/environment/RemoteStreamEnvironment.java       |  2 +-
 .../api/environment/StreamExecutionEnvironment.java    |  2 +-
 .../table/client/gateway/local/ProgramDeployer.java    |  2 +-
 8 files changed, 20 insertions(+), 19 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 57b7a30..5e24b45 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -667,7 +667,7 @@ public class CliFrontend {
        // 
--------------------------------------------------------------------------------------------
 
        protected void executeProgram(final Configuration configuration, final 
PackagedProgram program) throws ProgramInvocationException {
-               
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, 
configuration, program);
+               ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), 
configuration, program);
        }
 
        /**
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index 4b808a8..a9af6af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -127,7 +127,7 @@ public class ExecutorCLI implements CustomCommandLine {
        }
 
        private static String getExecutorFactoryNames() {
-               return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames()
+               return new DefaultExecutorServiceLoader().getExecutorNames()
                                .map(name -> String.format("\"%s\"", name))
                                .collect(Collectors.joining(", "));
        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index b2c43a2..0ba773b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
@@ -34,18 +35,20 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A service provider for {@link ClusterClientFactory cluster client 
factories}.
  */
+@Internal
 public class DefaultClusterClientServiceLoader implements 
ClusterClientServiceLoader {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
 
-       private static final ServiceLoader<ClusterClientFactory> defaultLoader 
= ServiceLoader.load(ClusterClientFactory.class);
-
        @Override
        public <ClusterID> ClusterClientFactory<ClusterID> 
getClusterClientFactory(final Configuration configuration) {
                checkNotNull(configuration);
 
+               final ServiceLoader<ClusterClientFactory> loader =
+                               ServiceLoader.load(ClusterClientFactory.class);
+
                final List<ClusterClientFactory> compatibleFactories = new 
ArrayList<>();
-               final Iterator<ClusterClientFactory> factories = 
defaultLoader.iterator();
+               final Iterator<ClusterClientFactory> factories = 
loader.iterator();
                while (factories.hasNext()) {
                        try {
                                final ClusterClientFactory factory = 
factories.next();
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 6b07f7f..b5e9f08 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements 
PipelineExecutorServiceLoad
 
        private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
 
-       private static final ServiceLoader<PipelineExecutorFactory> 
defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
-
-       public static final DefaultExecutorServiceLoader INSTANCE = new 
DefaultExecutorServiceLoader();
-
        @Override
        public PipelineExecutorFactory getExecutorFactory(final Configuration 
configuration) {
                checkNotNull(configuration);
 
+               final ServiceLoader<PipelineExecutorFactory> loader =
+                               
ServiceLoader.load(PipelineExecutorFactory.class);
+
                final List<PipelineExecutorFactory> compatibleFactories = new 
ArrayList<>();
-               final Iterator<PipelineExecutorFactory> factories = 
defaultLoader.iterator();
+               final Iterator<PipelineExecutorFactory> factories = 
loader.iterator();
                while (factories.hasNext()) {
                        try {
                                final PipelineExecutorFactory factory = 
factories.next();
@@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements 
PipelineExecutorServiceLoad
 
        @Override
        public Stream<String> getExecutorNames() {
-               return StreamSupport.stream(defaultLoader.spliterator(), false)
-                               .map(PipelineExecutorFactory::getName);
-       }
+               final ServiceLoader<PipelineExecutorFactory> loader =
+                               
ServiceLoader.load(PipelineExecutorFactory.class);
 
-       private DefaultExecutorServiceLoader() {
-               // make sure nobody instantiates us explicitly.
+               return StreamSupport.stream(loader.spliterator(), false)
+                               .map(PipelineExecutorFactory::getName);
        }
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 07e6516..f7ddce1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -160,7 +160,7 @@ public class ExecutionEnvironment {
         */
        @PublicEvolving
        public ExecutionEnvironment(final Configuration configuration, final 
ClassLoader userClassloader) {
-               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
userClassloader);
+               this(new DefaultExecutorServiceLoader(), configuration, 
userClassloader);
        }
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 82800fa..2a9a306 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -142,7 +142,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
         */
        @PublicEvolving
        public RemoteStreamEnvironment(String host, int port, Configuration 
clientConfiguration, String[] jarFiles, URL[] globalClasspaths, 
SavepointRestoreSettings savepointRestoreSettings) {
-               this(DefaultExecutorServiceLoader.INSTANCE, host, port, 
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
+               this(new DefaultExecutorServiceLoader(), host, port, 
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
        }
 
        @PublicEvolving
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a8de8ac..70f2cc1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -187,7 +187,7 @@ public class StreamExecutionEnvironment {
         */
        @PublicEvolving
        public StreamExecutionEnvironment(final Configuration configuration) {
-               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
null);
+               this(new DefaultExecutorServiceLoader(), configuration, null);
        }
 
        /**
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index be86a7a..b9a48ef 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -68,7 +68,7 @@ public class ProgramDeployer {
                        throw new RuntimeException("No execution.target 
specified in your configuration file.");
                }
 
-               PipelineExecutorServiceLoader executorServiceLoader = 
DefaultExecutorServiceLoader.INSTANCE;
+               PipelineExecutorServiceLoader executorServiceLoader = new 
DefaultExecutorServiceLoader();
                final PipelineExecutorFactory executorFactory;
                try {
                        executorFactory = 
executorServiceLoader.getExecutorFactory(configuration);

Reply via email to