This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new dcd7574 [FLINK-18352] Make
DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
dcd7574 is described below
commit dcd7574daccfbbacf99d101a3f0f852e332e92a7
Author: Kostas Kloudas <[email protected]>
AuthorDate: Mon Jun 22 16:05:01 2020 +0200
[FLINK-18352] Make
DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe
---
.../java/org/apache/flink/client/cli/CliFrontend.java | 2 +-
.../java/org/apache/flink/client/cli/ExecutorCLI.java | 2 +-
.../deployment/DefaultClusterClientServiceLoader.java | 9 ++++++---
.../core/execution/DefaultExecutorServiceLoader.java | 18 ++++++++----------
.../apache/flink/api/java/ExecutionEnvironment.java | 2 +-
.../api/environment/RemoteStreamEnvironment.java | 2 +-
.../api/environment/StreamExecutionEnvironment.java | 2 +-
.../table/client/gateway/local/ProgramDeployer.java | 2 +-
8 files changed, 20 insertions(+), 19 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 57b7a30..5e24b45 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -667,7 +667,7 @@ public class CliFrontend {
//
--------------------------------------------------------------------------------------------
protected void executeProgram(final Configuration configuration, final
PackagedProgram program) throws ProgramInvocationException {
-
ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE,
configuration, program);
+ ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
configuration, program);
}
/**
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index 4b808a8..a9af6af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -127,7 +127,7 @@ public class ExecutorCLI implements CustomCommandLine {
}
private static String getExecutorFactoryNames() {
- return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames()
+ return new DefaultExecutorServiceLoader().getExecutorNames()
.map(name -> String.format("\"%s\"", name))
.collect(Collectors.joining(", "));
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
index b2c43a2..0ba773b 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java
@@ -18,6 +18,7 @@
package org.apache.flink.client.deployment;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
@@ -34,18 +35,20 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/**
* A service provider for {@link ClusterClientFactory cluster client
factories}.
*/
+@Internal
public class DefaultClusterClientServiceLoader implements
ClusterClientServiceLoader {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class);
- private static final ServiceLoader<ClusterClientFactory> defaultLoader
= ServiceLoader.load(ClusterClientFactory.class);
-
@Override
public <ClusterID> ClusterClientFactory<ClusterID>
getClusterClientFactory(final Configuration configuration) {
checkNotNull(configuration);
+ final ServiceLoader<ClusterClientFactory> loader =
+ ServiceLoader.load(ClusterClientFactory.class);
+
final List<ClusterClientFactory> compatibleFactories = new
ArrayList<>();
- final Iterator<ClusterClientFactory> factories =
defaultLoader.iterator();
+ final Iterator<ClusterClientFactory> factories =
loader.iterator();
while (factories.hasNext()) {
try {
final ClusterClientFactory factory =
factories.next();
diff --git
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 6b07f7f..b5e9f08 100644
---
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements
PipelineExecutorServiceLoad
private static final Logger LOG =
LoggerFactory.getLogger(DefaultExecutorServiceLoader.class);
- private static final ServiceLoader<PipelineExecutorFactory>
defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
-
- public static final DefaultExecutorServiceLoader INSTANCE = new
DefaultExecutorServiceLoader();
-
@Override
public PipelineExecutorFactory getExecutorFactory(final Configuration
configuration) {
checkNotNull(configuration);
+ final ServiceLoader<PipelineExecutorFactory> loader =
+
ServiceLoader.load(PipelineExecutorFactory.class);
+
final List<PipelineExecutorFactory> compatibleFactories = new
ArrayList<>();
- final Iterator<PipelineExecutorFactory> factories =
defaultLoader.iterator();
+ final Iterator<PipelineExecutorFactory> factories =
loader.iterator();
while (factories.hasNext()) {
try {
final PipelineExecutorFactory factory =
factories.next();
@@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements
PipelineExecutorServiceLoad
@Override
public Stream<String> getExecutorNames() {
- return StreamSupport.stream(defaultLoader.spliterator(), false)
- .map(PipelineExecutorFactory::getName);
- }
+ final ServiceLoader<PipelineExecutorFactory> loader =
+
ServiceLoader.load(PipelineExecutorFactory.class);
- private DefaultExecutorServiceLoader() {
- // make sure nobody instantiates us explicitly.
+ return StreamSupport.stream(loader.spliterator(), false)
+ .map(PipelineExecutorFactory::getName);
}
}
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 07e6516..f7ddce1 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -160,7 +160,7 @@ public class ExecutionEnvironment {
*/
@PublicEvolving
public ExecutionEnvironment(final Configuration configuration, final
ClassLoader userClassloader) {
- this(DefaultExecutorServiceLoader.INSTANCE, configuration,
userClassloader);
+ this(new DefaultExecutorServiceLoader(), configuration,
userClassloader);
}
/**
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 82800fa..2a9a306 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -142,7 +142,7 @@ public class RemoteStreamEnvironment extends
StreamExecutionEnvironment {
*/
@PublicEvolving
public RemoteStreamEnvironment(String host, int port, Configuration
clientConfiguration, String[] jarFiles, URL[] globalClasspaths,
SavepointRestoreSettings savepointRestoreSettings) {
- this(DefaultExecutorServiceLoader.INSTANCE, host, port,
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
+ this(new DefaultExecutorServiceLoader(), host, port,
clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings);
}
@PublicEvolving
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a8de8ac..70f2cc1 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -187,7 +187,7 @@ public class StreamExecutionEnvironment {
*/
@PublicEvolving
public StreamExecutionEnvironment(final Configuration configuration) {
- this(DefaultExecutorServiceLoader.INSTANCE, configuration,
null);
+ this(new DefaultExecutorServiceLoader(), configuration, null);
}
/**
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index be86a7a..b9a48ef 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -68,7 +68,7 @@ public class ProgramDeployer {
throw new RuntimeException("No execution.target
specified in your configuration file.");
}
- PipelineExecutorServiceLoader executorServiceLoader =
DefaultExecutorServiceLoader.INSTANCE;
+ PipelineExecutorServiceLoader executorServiceLoader = new
DefaultExecutorServiceLoader();
final PipelineExecutorFactory executorFactory;
try {
executorFactory =
executorServiceLoader.getExecutorFactory(configuration);