This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit 719bf99de9655c8c47b6e5c6effdad1b8a93ee14 Author: Kostas Kloudas <[email protected]> AuthorDate: Mon Nov 18 14:22:03 2019 +0100 [hotfix] Annotate all the Executor-related interfaces as @Internal --- .../apache/flink/core/execution/DefaultExecutorServiceLoader.java | 6 +++++- .../src/main/java/org/apache/flink/core/execution/Executor.java | 7 ++++++- .../java/org/apache/flink/core/execution/ExecutorFactory.java | 8 ++++++-- .../org/apache/flink/core/execution/ExecutorServiceLoader.java | 6 +++++- .../java/org/apache/flink/api/java/ExecutorDiscoveryTest.java | 6 ++++-- .../apache/flink/streaming/environment/ExecutorDiscoveryTest.java | 6 ++++-- 6 files changed, 30 insertions(+), 9 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java index 8bde967..a088f8d 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -18,11 +18,14 @@ package org.apache.flink.core.execution; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -35,6 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses * Java service discovery to find the available {@link ExecutorFactory executor factories}. */ +@Internal public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { // TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation. @@ -48,7 +52,7 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader(); @Override - public ExecutorFactory getExecutorFactory(final Configuration configuration) { + public ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) { checkNotNull(configuration); final List<ExecutorFactory> compatibleFactories = new ArrayList<>(); diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java index 3476742..7069e70 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java @@ -18,20 +18,25 @@ package org.apache.flink.core.execution; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nonnull; + /** * The entity responsible for executing a {@link Pipeline}, i.e. a user job. */ +@Internal public interface Executor { /** * Executes a {@link Pipeline} based on the provided configuration. + * * @param pipeline the {@link Pipeline} to execute * @param configuration the {@link Configuration} with the required execution parameters * @return the {@link JobExecutionResult} corresponding to the pipeline execution. */ - JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception; + JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception; } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java index 8d6687b..9d6860a 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java @@ -18,23 +18,27 @@ package org.apache.flink.core.execution; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nonnull; + /** * A factory for selecting and instantiating the adequate {@link Executor} * based on a provided {@link Configuration}. */ +@Internal public interface ExecutorFactory { /** * Returns {@code true} if this factory is compatible with the options in the * provided configuration, {@code false} otherwise. */ - boolean isCompatibleWith(Configuration configuration); + boolean isCompatibleWith(@Nonnull final Configuration configuration); /** * Instantiates an {@link Executor} compatible with the provided configuration. * @return the executor instance. */ - Executor getExecutor(Configuration configuration); + Executor getExecutor(@Nonnull Configuration configuration); } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java index 0b959eb7..5aee4ee 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java @@ -18,12 +18,16 @@ package org.apache.flink.core.execution; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nonnull; + /** * An interface to be implemented by the entity responsible for finding the correct {@link Executor} to * execute a given {@link org.apache.flink.api.dag.Pipeline}. */ +@Internal public interface ExecutorServiceLoader { /** @@ -35,5 +39,5 @@ public interface ExecutorServiceLoader { * @throws Exception if there is more than one compatible factories, or something went wrong when * loading the registered factories. */ - ExecutorFactory getExecutorFactory(Configuration configuration) throws Exception; + ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) throws Exception; } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java index 49013b8..2d46915 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java @@ -29,6 +29,8 @@ import org.apache.flink.util.OptionalFailure; import org.junit.Test; +import javax.annotation.Nonnull; + import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -69,12 +71,12 @@ public class ExecutorDiscoveryTest { public static final String ID = "test-executor-A"; @Override - public boolean isCompatibleWith(Configuration configuration) { + public boolean isCompatibleWith(@Nonnull Configuration configuration) { return ID.equals(configuration.get(DeploymentOptions.TARGET)); } @Override - public Executor getExecutor(Configuration configuration) { + public Executor getExecutor(@Nonnull Configuration configuration) { return (pipeline, executionConfig) -> { final Map<String, OptionalFailure<Object>> res = new HashMap<>(); res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java index 9c11fdf..2a1bb4a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java @@ -30,6 +30,8 @@ import org.apache.flink.util.OptionalFailure; import org.junit.Test; +import javax.annotation.Nonnull; + import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -70,12 +72,12 @@ public class ExecutorDiscoveryTest { public static final String ID = "test-executor-A"; @Override - public boolean isCompatibleWith(Configuration configuration) { + public boolean isCompatibleWith(@Nonnull Configuration configuration) { return ID.equals(configuration.get(DeploymentOptions.TARGET)); } @Override - public Executor getExecutor(Configuration configuration) { + public Executor getExecutor(@Nonnull Configuration configuration) { return (pipeline, executionConfig) -> { final Map<String, OptionalFailure<Object>> res = new HashMap<>(); res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
