This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit eeeff7a5fa8ec0acb67bac8eb7dbd9a4165a91e7 Author: Kostas Kloudas <[email protected]> AuthorDate: Fri Jun 19 14:10:02 2020 +0200 [FLINK-18352] Make DefaultClusterClientServiceLoader/DefaultExecutorServiceLoader thread-safe --- .../java/org/apache/flink/client/cli/CliFrontend.java | 2 +- .../java/org/apache/flink/client/cli/GenericCLI.java | 2 +- .../deployment/DefaultClusterClientServiceLoader.java | 7 ++++--- .../core/execution/DefaultExecutorServiceLoader.java | 18 ++++++++---------- .../apache/flink/api/java/ExecutionEnvironment.java | 2 +- .../api/environment/RemoteStreamEnvironment.java | 2 +- .../api/environment/StreamExecutionEnvironment.java | 2 +- .../table/client/gateway/local/ProgramDeployer.java | 2 +- 8 files changed, 18 insertions(+), 19 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 159507e..ef7d7c9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -696,7 +696,7 @@ public class CliFrontend { // -------------------------------------------------------------------------------------------- protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException { - ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program, false, false); + ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false); } /** diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java index 945779b..37628b3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java @@ -146,7 +146,7 @@ public class GenericCLI implements CustomCommandLine { } private static String getExecutorFactoryNames() { - return DefaultExecutorServiceLoader.INSTANCE.getExecutorNames() + return new DefaultExecutorServiceLoader().getExecutorNames() .map(name -> String.format("\"%s\"", name)) .collect(Collectors.joining(", ")); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java index b2c43a2..ec56abe 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/DefaultClusterClientServiceLoader.java @@ -38,14 +38,15 @@ public class DefaultClusterClientServiceLoader implements ClusterClientServiceLo private static final Logger LOG = LoggerFactory.getLogger(DefaultClusterClientServiceLoader.class); - private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class); - @Override public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) { checkNotNull(configuration); + final ServiceLoader<ClusterClientFactory> loader = + ServiceLoader.load(ClusterClientFactory.class); + final List<ClusterClientFactory> compatibleFactories = new ArrayList<>(); - final Iterator<ClusterClientFactory> factories = defaultLoader.iterator(); + final Iterator<ClusterClientFactory> factories = loader.iterator(); while (factories.hasNext()) { try { final ClusterClientFactory factory = factories.next(); diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java index 6b07f7f..b5e9f08 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -47,16 +47,15 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceLoader.class); - private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class); - - public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader(); - @Override public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) { checkNotNull(configuration); + final ServiceLoader<PipelineExecutorFactory> loader = + ServiceLoader.load(PipelineExecutorFactory.class); + final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>(); - final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator(); + final Iterator<PipelineExecutorFactory> factories = loader.iterator(); while (factories.hasNext()) { try { final PipelineExecutorFactory factory = factories.next(); @@ -90,11 +89,10 @@ public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoad @Override public Stream<String> getExecutorNames() { - return StreamSupport.stream(defaultLoader.spliterator(), false) - .map(PipelineExecutorFactory::getName); - } + final ServiceLoader<PipelineExecutorFactory> loader = + ServiceLoader.load(PipelineExecutorFactory.class); - private DefaultExecutorServiceLoader() { - // make sure nobody instantiates us explicitly. + return StreamSupport.stream(loader.spliterator(), false) + .map(PipelineExecutorFactory::getName); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 9f0089d..910c419 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -157,7 +157,7 @@ public class ExecutionEnvironment { */ @PublicEvolving public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) { - this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader); + this(new DefaultExecutorServiceLoader(), configuration, userClassloader); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 560633e..8dd09f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -141,7 +141,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { */ @PublicEvolving public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) { - this(DefaultExecutorServiceLoader.INSTANCE, host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings); + this(new DefaultExecutorServiceLoader(), host, port, clientConfiguration, jarFiles, globalClasspaths, savepointRestoreSettings); } @PublicEvolving diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index c688506..7de2e97 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -203,7 +203,7 @@ public class StreamExecutionEnvironment { public StreamExecutionEnvironment( final Configuration configuration, final ClassLoader userClassloader) { - this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader); + this(new DefaultExecutorServiceLoader(), configuration, userClassloader); } /** diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java index 0070b1f..31b5143 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java @@ -68,7 +68,7 @@ public class ProgramDeployer { throw new RuntimeException("No execution.target specified in your configuration file."); } - PipelineExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE; + PipelineExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader(); final PipelineExecutorFactory executorFactory; try { executorFactory = executorServiceLoader.getExecutorFactory(configuration);
