This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98e93d522dba8218b44024290862e5424fc35b39 Author: Kostas Kloudas <[email protected]> AuthorDate: Mon Nov 18 16:31:04 2019 +0100 [FLINK-XXXXX] Update the Executor interface and introduce the JobClient --- .../deployment/AbstractJobClusterExecutor.java | 4 ++ .../deployment/AbstractSessionClusterExecutor.java | 4 ++ .../flink/client/deployment/ExecutorUtils.java | 4 ++ .../flink/client/deployment/JobClientImpl.java | 4 ++ .../org/apache/flink/core/execution/Executor.java | 13 +++-- .../execution/{Executor.java => JobClient.java} | 31 ++++++++---- .../flink/api/java/ExecutionEnvironment.java | 19 ++++++- ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++---- ...org.apache.flink.core.execution.ExecutorFactory | 2 +- .../environment/StreamExecutionEnvironment.java | 20 +++++++- ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++---- ...org.apache.flink.core.execution.ExecutorFactory | 2 +- 12 files changed, 180 insertions(+), 41 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java new file mode 100644 index 0000000..14a93bc --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java @@ -0,0 +1,4 @@ +package org.apache.flink.client.deployment; + +public class AbstractJobClusterExecutor { +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java new file mode 100644 index 0000000..ab8cc01 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java @@ -0,0 +1,4 @@ +package org.apache.flink.client.deployment; + +public class AbstractSessionClusterExecutor { +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java new file mode 100644 index 0000000..e134206 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java @@ -0,0 +1,4 @@ +package org.apache.flink.client.deployment; + +public class ExecutorUtils { +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java new file mode 100644 index 0000000..e811fc8 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java @@ -0,0 +1,4 @@ +package org.apache.flink.client.deployment; + +public class JobClientImpl { +} diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java index 7069e70..1d606e8 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java @@ -19,12 +19,13 @@ package org.apache.flink.core.execution; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.configuration.Configuration; import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; + /** * The entity responsible for executing a {@link Pipeline}, i.e. a user job. */ @@ -32,11 +33,15 @@ import javax.annotation.Nonnull; public interface Executor { /** - * Executes a {@link Pipeline} based on the provided configuration. + * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to + * interact with the job being executed, e.g. cancel it or take a savepoint. + * + * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This + * means that e.g. {@code close()} should be called explicitly at the call-site. * * @param pipeline the {@link Pipeline} to execute * @param configuration the {@link Configuration} with the required execution parameters - * @return the {@link JobExecutionResult} corresponding to the pipeline execution. + * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline. */ - JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception; + CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception; } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java similarity index 53% copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index 7069e70..06310bd 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -18,25 +18,34 @@ package org.apache.flink.core.execution; -import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.JobID; import javax.annotation.Nonnull; +import java.util.concurrent.CompletableFuture; + /** - * The entity responsible for executing a {@link Pipeline}, i.e. a user job. + * A client that is scoped to a specific job. */ -@Internal -public interface Executor { +@PublicEvolving +public interface JobClient extends AutoCloseable { + + /** + * Returns the {@link JobID} that uniquely identifies the job this client is scoped to. + */ + JobID getJobID(); + + /** + * Returns the result of the job submission which will also contain the job id of the submitted job. + */ + CompletableFuture<JobExecutionResult> getJobSubmissionResult(); /** - * Executes a {@link Pipeline} based on the provided configuration. + * Returns the {@link JobExecutionResult result of the job execution} of the submitted job. * - * @param pipeline the {@link Pipeline} to execute - * @param configuration the {@link Configuration} with the required execution parameters - * @return the {@link JobExecutionResult} corresponding to the pipeline execution. + * @param userClassloader the classloader used to de-serialize the accumulators of the job. */ - JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception; + CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 26632e6..d485105 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; import org.apache.flink.core.execution.ExecutorServiceLoader; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; @@ -132,6 +133,8 @@ public class ExecutionEnvironment { private final Configuration configuration; + private ClassLoader userClassloader; + /** * Creates a new Execution Environment. */ @@ -146,6 +149,11 @@ public class ExecutionEnvironment { protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = checkNotNull(executorConfiguration); + this.userClassloader = getClass().getClassLoader(); + } + + protected void setUserClassloader(final ClassLoader userClassloader) { + this.userClassloader = checkNotNull(userClassloader); } protected Configuration getConfiguration() { @@ -796,8 +804,15 @@ public class ExecutionEnvironment { executorServiceLoader.getExecutorFactory(configuration); final Executor executor = executorFactory.getExecutor(configuration); - lastJobExecutionResult = executor.execute(plan, configuration); - return lastJobExecutionResult; + + try (final JobClient jobClient = executor.execute(plan, configuration).get()) { + + lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : jobClient.getJobSubmissionResult().get(); + + return lastJobExecutionResult; + } } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java similarity index 60% rename from flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java rename to flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java index 2d46915..c674c7e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.util.OptionalFailure; import org.junit.Test; @@ -34,25 +35,43 @@ import javax.annotation.Nonnull; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; /** - * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment}. + * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment} and the calls of the {@link JobClient}. */ -public class ExecutorDiscoveryTest { +public class ExecutorDiscoveryAndJobClientTest { + + private static final String EXEC_NAME = "test-executor"; + private static final long ATTACHED_RUNTIME = 42L; + private static final long DETACHED_RUNTIME = 11L; + + @Test + public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception { + testHelper(true, ATTACHED_RUNTIME); + } @Test - public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception { + public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception { + testHelper(false, DETACHED_RUNTIME); + } + + private void testHelper(final boolean attached, final long expectedRuntime) throws Exception { final Configuration configuration = new Configuration(); - configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID); + configuration.set(DeploymentOptions.TARGET, EXEC_NAME); + configuration.set(DeploymentOptions.ATTACHED, attached); final JobExecutionResult result = executeTestJobBasedOnConfig(configuration); final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString(); - assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID))); + assertThat(executorName, is(equalTo(EXEC_NAME))); + + final long runtime = result.getNetRuntime(); + assertThat(runtime, is(equalTo(expectedRuntime))); } private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception { @@ -68,19 +87,39 @@ public class ExecutorDiscoveryTest { */ public static class IDReportingExecutorFactory implements ExecutorFactory { - public static final String ID = "test-executor-A"; - @Override public boolean isCompatibleWith(@Nonnull Configuration configuration) { - return ID.equals(configuration.get(DeploymentOptions.TARGET)); + return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET)); } @Override public Executor getExecutor(@Nonnull Configuration configuration) { return (pipeline, executionConfig) -> { final Map<String, OptionalFailure<Object>> res = new HashMap<>(); - res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID)); - return new JobExecutionResult(new JobID(), 12L, res); + res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME)); + + return CompletableFuture.completedFuture(new JobClient(){ + + @Override + public JobID getJobID() { + return new JobID(); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobSubmissionResult() { + return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res)); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) { + return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res)); + } + + @Override + public void close() { + + } + }); }; } } diff --git a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory index fcfaa55..c09254a 100644 --- a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory +++ b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.api.java.ExecutorDiscoveryTest$IDReportingExecutorFactory \ No newline at end of file +org.apache.flink.api.java.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory \ No newline at end of file diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index ba702ea..176a7ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; import org.apache.flink.core.execution.ExecutorServiceLoader; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -154,6 +155,8 @@ public class StreamExecutionEnvironment { private final Configuration configuration; + private ClassLoader userClassloader; + // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- @@ -166,9 +169,16 @@ public class StreamExecutionEnvironment { this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration); } - public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { + public StreamExecutionEnvironment( + final ExecutorServiceLoader executorServiceLoader, + final Configuration executorConfiguration) { this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = checkNotNull(executorConfiguration); + this.userClassloader = getClass().getClassLoader(); + } + + protected void setUserClassloader(final ClassLoader userClassloader) { + this.userClassloader = checkNotNull(userClassloader); } protected Configuration getConfiguration() { @@ -1552,7 +1562,13 @@ public class StreamExecutionEnvironment { executorServiceLoader.getExecutorFactory(configuration); final Executor executor = executorFactory.getExecutor(configuration); - return executor.execute(streamGraph, configuration); + + try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) { + + return configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : jobClient.getJobSubmissionResult().get(); + } } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java similarity index 60% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java index 2a1bb4a..7caf531 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.util.OptionalFailure; @@ -35,25 +36,43 @@ import javax.annotation.Nonnull; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; /** - * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment}. + * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment} and the calls of the {@link JobClient}. */ -public class ExecutorDiscoveryTest { +public class ExecutorDiscoveryAndJobClientTest { + + private static final String EXEC_NAME = "test-executor"; + private static final long ATTACHED_RUNTIME = 42L; + private static final long DETACHED_RUNTIME = 11L; + + @Test + public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception { + testHelper(true, ATTACHED_RUNTIME); + } @Test - public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception { + public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception { + testHelper(false, DETACHED_RUNTIME); + } + + private void testHelper(final boolean attached, final long expectedRuntime) throws Exception { final Configuration configuration = new Configuration(); - configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID); + configuration.set(DeploymentOptions.TARGET, EXEC_NAME); + configuration.set(DeploymentOptions.ATTACHED, attached); final JobExecutionResult result = executeTestJobBasedOnConfig(configuration); final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString(); - assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID))); + assertThat(executorName, is(equalTo(EXEC_NAME))); + + final long runtime = result.getNetRuntime(); + assertThat(runtime, is(equalTo(expectedRuntime))); } private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception { @@ -69,19 +88,39 @@ public class ExecutorDiscoveryTest { */ public static class IDReportingExecutorFactory implements ExecutorFactory { - public static final String ID = "test-executor-A"; - @Override public boolean isCompatibleWith(@Nonnull Configuration configuration) { - return ID.equals(configuration.get(DeploymentOptions.TARGET)); + return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET)); } @Override public Executor getExecutor(@Nonnull Configuration configuration) { return (pipeline, executionConfig) -> { final Map<String, OptionalFailure<Object>> res = new HashMap<>(); - res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID)); - return new JobExecutionResult(new JobID(), 12L, res); + res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME)); + + return CompletableFuture.completedFuture(new JobClient(){ + + @Override + public JobID getJobID() { + return new JobID(); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobSubmissionResult() { + return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res)); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) { + return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res)); + } + + @Override + public void close() { + + } + }); }; } } diff --git a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory index 68ddbcb..a5186ae 100644 --- a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory +++ b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.streaming.environment.ExecutorDiscoveryTest$IDReportingExecutorFactory \ No newline at end of file +org.apache.flink.streaming.environment.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory \ No newline at end of file
