This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98e93d522dba8218b44024290862e5424fc35b39
Author: Kostas Kloudas <[email protected]>
AuthorDate: Mon Nov 18 16:31:04 2019 +0100

    [FLINK-XXXXX] Update the Executor interface and introduce the JobClient
---
 .../deployment/AbstractJobClusterExecutor.java     |  4 ++
 .../deployment/AbstractSessionClusterExecutor.java |  4 ++
 .../flink/client/deployment/ExecutorUtils.java     |  4 ++
 .../flink/client/deployment/JobClientImpl.java     |  4 ++
 .../org/apache/flink/core/execution/Executor.java  | 13 +++--
 .../execution/{Executor.java => JobClient.java}    | 31 ++++++++----
 .../flink/api/java/ExecutionEnvironment.java       | 19 ++++++-
 ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
 ...org.apache.flink.core.execution.ExecutorFactory |  2 +-
 .../environment/StreamExecutionEnvironment.java    | 20 +++++++-
 ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
 ...org.apache.flink.core.execution.ExecutorFactory |  2 +-
 12 files changed, 180 insertions(+), 41 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..14a93bc
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractJobClusterExecutor {
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..ab8cc01
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractSessionClusterExecutor {
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..e134206
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class ExecutorUtils {
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
new file mode 100644
index 0000000..e811fc8
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class JobClientImpl {
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 7069e70..1d606e8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -19,12 +19,13 @@
 package org.apache.flink.core.execution;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nonnull;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
@@ -32,11 +33,15 @@ import javax.annotation.Nonnull;
 public interface Executor {
 
        /**
-        * Executes a {@link Pipeline} based on the provided configuration.
+        * Executes a {@link Pipeline} based on the provided configuration and 
returns a {@link JobClient} which allows to
+        * interact with the job being executed, e.g. cancel it or take a 
savepoint.
+        *
+        * <p><b>ATTENTION:</b> The caller is responsible for managing the 
lifecycle of the returned {@link JobClient}. This
+        * means that e.g. {@code close()} should be called explicitly at the 
call-site.
         *
         * @param pipeline the {@link Pipeline} to execute
         * @param configuration the {@link Configuration} with the required 
execution parameters
-        * @return the {@link JobExecutionResult} corresponding to the pipeline 
execution.
+        * @return a {@link CompletableFuture} with the {@link JobClient} 
corresponding to the pipeline.
         */
-       JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull 
final Configuration configuration) throws Exception;
+       CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, 
@Nonnull final Configuration configuration) throws Exception;
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 53%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 7069e70..06310bd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,25 +18,34 @@
 
 package org.apache.flink.core.execution;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.JobID;
 
 import javax.annotation.Nonnull;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
  */
-@Internal
-public interface Executor {
+@PublicEvolving
+public interface JobClient extends AutoCloseable {
+
+       /**
+        * Returns the {@link JobID} that uniquely identifies the job this 
client is scoped to.
+        */
+       JobID getJobID();
+
+       /**
+        * Returns the result of the job submission which will also contain the 
job id of the submitted job.
+        */
+       CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
        /**
-        * Executes a {@link Pipeline} based on the provided configuration.
+        * Returns the {@link JobExecutionResult result of the job execution} 
of the submitted job.
         *
-        * @param pipeline the {@link Pipeline} to execute
-        * @param configuration the {@link Configuration} with the required 
execution parameters
-        * @return the {@link JobExecutionResult} corresponding to the pipeline 
execution.
+        * @param userClassloader the classloader used to de-serialize the 
accumulators of the job.
         */
-       JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull 
final Configuration configuration) throws Exception;
+       CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
final ClassLoader userClassloader);
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 26632e6..d485105 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
 
        private final Configuration configuration;
 
+       private ClassLoader userClassloader;
+
        /**
         * Creates a new Execution Environment.
         */
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
        protected ExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
                this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
                this.configuration = checkNotNull(executorConfiguration);
+               this.userClassloader = getClass().getClassLoader();
+       }
+
+       protected void setUserClassloader(final ClassLoader userClassloader) {
+               this.userClassloader = checkNotNull(userClassloader);
        }
 
        protected Configuration getConfiguration() {
@@ -796,8 +804,15 @@ public class ExecutionEnvironment {
                                
executorServiceLoader.getExecutorFactory(configuration);
 
                final Executor executor = 
executorFactory.getExecutor(configuration);
-               lastJobExecutionResult = executor.execute(plan, configuration);
-               return lastJobExecutionResult;
+
+               try (final JobClient jobClient = executor.execute(plan, 
configuration).get()) {
+
+                       lastJobExecutionResult = 
configuration.getBoolean(DeploymentOptions.ATTACHED)
+                                       ? 
jobClient.getJobExecutionResult(userClassloader).get()
+                                       : 
jobClient.getJobSubmissionResult().get();
+
+                       return lastJobExecutionResult;
+               }
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from 
flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
rename to 
flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index 2d46915..c674c7e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
@@ -34,25 +35,43 @@ import javax.annotation.Nonnull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 /**
- * Tests the {@link ExecutorFactory} discovery in the {@link 
ExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link 
ExecutionEnvironment} and the calls of the {@link JobClient}.
  */
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+       private static final String EXEC_NAME = "test-executor";
+       private static final long ATTACHED_RUNTIME = 42L;
+       private static final long DETACHED_RUNTIME = 11L;
+
+       @Test
+       public void 
jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws 
Exception {
+               testHelper(true, ATTACHED_RUNTIME);
+       }
 
        @Test
-       public void 
correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws 
Exception {
+       public void 
jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws 
Exception {
+               testHelper(false, DETACHED_RUNTIME);
+       }
+
+       private void testHelper(final boolean attached, final long 
expectedRuntime) throws Exception {
                final Configuration configuration = new Configuration();
-               configuration.set(DeploymentOptions.TARGET, 
IDReportingExecutorFactory.ID);
+               configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+               configuration.set(DeploymentOptions.ATTACHED, attached);
 
                final JobExecutionResult result = 
executeTestJobBasedOnConfig(configuration);
 
                final String executorName = 
result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
-               assertThat(executorName, 
is(equalTo(IDReportingExecutorFactory.ID)));
+               assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+               final long runtime = result.getNetRuntime();
+               assertThat(runtime, is(equalTo(expectedRuntime)));
        }
 
        private JobExecutionResult executeTestJobBasedOnConfig(final 
Configuration configuration) throws Exception {
@@ -68,19 +87,39 @@ public class ExecutorDiscoveryTest {
         */
        public static class IDReportingExecutorFactory implements 
ExecutorFactory {
 
-               public static final String ID = "test-executor-A";
-
                @Override
                public boolean isCompatibleWith(@Nonnull Configuration 
configuration) {
-                       return 
ID.equals(configuration.get(DeploymentOptions.TARGET));
+                       return 
EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
                }
 
                @Override
                public Executor getExecutor(@Nonnull Configuration 
configuration) {
                        return (pipeline, executionConfig) -> {
                                final Map<String, OptionalFailure<Object>> res 
= new HashMap<>();
-                               res.put(DeploymentOptions.TARGET.key(), 
OptionalFailure.of(ID));
-                               return new JobExecutionResult(new JobID(), 12L, 
res);
+                               res.put(DeploymentOptions.TARGET.key(), 
OptionalFailure.of(EXEC_NAME));
+
+                               return CompletableFuture.completedFuture(new 
JobClient(){
+
+                                       @Override
+                                       public JobID getJobID() {
+                                               return new JobID();
+                                       }
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+                                               return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 
DETACHED_RUNTIME, res));
+                                       }
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
ClassLoader userClassloader) {
+                                               return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 
ATTACHED_RUNTIME, res));
+                                       }
+
+                                       @Override
+                                       public void close() {
+
+                                       }
+                               });
                        };
                }
        }
diff --git 
a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
 
b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index fcfaa55..c09254a 100644
--- 
a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ 
b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.api.java.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.api.java.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ba702ea..176a7ab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
 
        private final Configuration configuration;
 
+       private ClassLoader userClassloader;
+
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
                this(DefaultExecutorServiceLoader.INSTANCE, 
executorConfiguration);
        }
 
-       public StreamExecutionEnvironment(final ExecutorServiceLoader 
executorServiceLoader, final Configuration executorConfiguration) {
+       public StreamExecutionEnvironment(
+                       final ExecutorServiceLoader executorServiceLoader,
+                       final Configuration executorConfiguration) {
                this.executorServiceLoader = 
checkNotNull(executorServiceLoader);
                this.configuration = checkNotNull(executorConfiguration);
+               this.userClassloader = getClass().getClassLoader();
+       }
+
+       protected void setUserClassloader(final ClassLoader userClassloader) {
+               this.userClassloader = checkNotNull(userClassloader);
        }
 
        protected Configuration getConfiguration() {
@@ -1552,7 +1562,13 @@ public class StreamExecutionEnvironment {
                                
executorServiceLoader.getExecutorFactory(configuration);
 
                final Executor executor = 
executorFactory.getExecutor(configuration);
-               return executor.execute(streamGraph, configuration);
+
+               try (final JobClient jobClient = executor.execute(streamGraph, 
configuration).get()) {
+
+                       return 
configuration.getBoolean(DeploymentOptions.ATTACHED)
+                                       ? 
jobClient.getJobExecutionResult(userClassloader).get()
+                                       : 
jobClient.getJobSubmissionResult().get();
+               }
        }
 
        private void consolidateParallelismDefinitionsInConfiguration() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 2a1bb4a..7caf531 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.OptionalFailure;
@@ -35,25 +36,43 @@ import javax.annotation.Nonnull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 /**
- * Tests the {@link ExecutorFactory} discovery in the {@link 
StreamExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link 
StreamExecutionEnvironment} and the calls of the {@link JobClient}.
  */
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+       private static final String EXEC_NAME = "test-executor";
+       private static final long ATTACHED_RUNTIME = 42L;
+       private static final long DETACHED_RUNTIME = 11L;
+
+       @Test
+       public void 
jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws 
Exception {
+               testHelper(true, ATTACHED_RUNTIME);
+       }
 
        @Test
-       public void 
correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws 
Exception {
+       public void 
jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws 
Exception {
+               testHelper(false, DETACHED_RUNTIME);
+       }
+
+       private void testHelper(final boolean attached, final long 
expectedRuntime) throws Exception {
                final Configuration configuration = new Configuration();
-               configuration.set(DeploymentOptions.TARGET, 
IDReportingExecutorFactory.ID);
+               configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+               configuration.set(DeploymentOptions.ATTACHED, attached);
 
                final JobExecutionResult result = 
executeTestJobBasedOnConfig(configuration);
 
                final String executorName = 
result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
-               assertThat(executorName, 
is(equalTo(IDReportingExecutorFactory.ID)));
+               assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+               final long runtime = result.getNetRuntime();
+               assertThat(runtime, is(equalTo(expectedRuntime)));
        }
 
        private JobExecutionResult executeTestJobBasedOnConfig(final 
Configuration configuration) throws Exception {
@@ -69,19 +88,39 @@ public class ExecutorDiscoveryTest {
         */
        public static class IDReportingExecutorFactory implements 
ExecutorFactory {
 
-               public static final String ID = "test-executor-A";
-
                @Override
                public boolean isCompatibleWith(@Nonnull Configuration 
configuration) {
-                       return 
ID.equals(configuration.get(DeploymentOptions.TARGET));
+                       return 
EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
                }
 
                @Override
                public Executor getExecutor(@Nonnull Configuration 
configuration) {
                        return (pipeline, executionConfig) -> {
                                final Map<String, OptionalFailure<Object>> res 
= new HashMap<>();
-                               res.put(DeploymentOptions.TARGET.key(), 
OptionalFailure.of(ID));
-                               return new JobExecutionResult(new JobID(), 12L, 
res);
+                               res.put(DeploymentOptions.TARGET.key(), 
OptionalFailure.of(EXEC_NAME));
+
+                               return CompletableFuture.completedFuture(new 
JobClient(){
+
+                                       @Override
+                                       public JobID getJobID() {
+                                               return new JobID();
+                                       }
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+                                               return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 
DETACHED_RUNTIME, res));
+                                       }
+
+                                       @Override
+                                       public 
CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull 
ClassLoader userClassloader) {
+                                               return 
CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 
ATTACHED_RUNTIME, res));
+                                       }
+
+                                       @Override
+                                       public void close() {
+
+                                       }
+                               });
                        };
                }
        }
diff --git 
a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
 
b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 68ddbcb..a5186ae 100644
--- 
a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ 
b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.environment.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.streaming.environment.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file

Reply via email to