This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6825f803de17f87007e0531d82baf14bdbab2fd4
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Fri Dec 6 19:30:41 2019 +0100

    [FLINK-15116] Return a ClusterClientProvider from ClusterDescriptor methods
    
    This allows the consumer of the methods to create a new ClusterClient
    with a separate lifecycle whenever necessary.
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  2 +-
 .../deployment/AbstractJobClusterExecutor.java     |  4 +-
 .../deployment/AbstractSessionClusterExecutor.java |  5 +-
 .../flink/client/deployment/ClusterDescriptor.java |  8 +--
 .../deployment/StandaloneClusterDescriptor.java    | 19 +++---
 .../client/program/ClusterClientProvider.java      | 35 ++++++++++
 .../apache/flink/client/cli/DefaultCLITest.java    |  2 +-
 .../client/cli/util/DummyClusterDescriptor.java    | 13 ++--
 .../client/program/rest/RestClusterClientTest.java |  4 +-
 .../kubernetes/KubernetesClusterDescriptor.java    | 75 +++++++++++++---------
 .../kubernetes/cli/FlinkKubernetesCustomCli.java   |  8 ++-
 .../KubernetesClusterDescriptorTest.java           |  5 +-
 .../org/apache/flink/api/scala/FlinkShell.scala    |  4 +-
 .../environment/StreamExecutionEnvironment.java    |  4 --
 .../table/client/gateway/local/LocalExecutor.java  |  2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java     | 13 ++--
 .../java/org/apache/flink/yarn/YARNITCase.java     | 10 +--
 .../apache/flink/yarn/YarnConfigurationITCase.java |  4 +-
 .../flink/yarn/YarnDistributedCacheITCase.java     | 10 +--
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 26 ++++++--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 24 +++----
 21 files changed, 179 insertions(+), 98 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2ef7c77..3566772 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -862,7 +862,7 @@ public class CliFrontend {
                                "you would like to connect.");
                } else {
                        try {
-                               final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterId);
+                               final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterId).getClusterClient();
 
                                try {
                                        clusterAction.runAction(clusterClient);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index 6a18c2f..fb603bc 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -63,7 +63,9 @@ public class AbstractJobClusterExecutor<ClusterID, 
ClientFactory extends Cluster
 
                        final ClusterSpecification clusterSpecification = 
clusterClientFactory.getClusterSpecification(configuration);
 
-                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, 
configAccessor.getDetachedMode());
+                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor
+                                       .deployJobCluster(clusterSpecification, 
jobGraph, configAccessor.getDetachedMode())
+                                       .getClusterClient();
                        LOG.info("Job has been submitted with JobID " + 
jobGraph.getJobID());
 
                        final boolean withShutdownHook = 
!configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit();
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
index 93b815a..784edcd 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -56,7 +56,10 @@ public class AbstractSessionClusterExecutor<ClusterID, 
ClientFactory extends Clu
                        final ClusterID clusterID = 
clusterClientFactory.getClusterId(configuration);
                        checkState(clusterID != null);
 
-                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor.retrieve(clusterID);
+                       final ClusterClient<ClusterID> clusterClient = 
clusterDescriptor
+                                       .retrieve(clusterID)
+                                       .getClusterClient();
+
                        return clusterClient
                                        .submitJob(jobGraph)
                                        .thenApply(jobID -> new 
ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobID) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index e6b3922..ca5db45 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
 
@@ -41,7 +41,7 @@ public interface ClusterDescriptor<T> extends AutoCloseable {
         * @return Client for the cluster
         * @throws ClusterRetrieveException if the cluster client could not be 
retrieved
         */
-       ClusterClient<T> retrieve(T clusterId) throws ClusterRetrieveException;
+       ClusterClientProvider<T> retrieve(T clusterId) throws 
ClusterRetrieveException;
 
        /**
         * Triggers deployment of a cluster.
@@ -49,7 +49,7 @@ public interface ClusterDescriptor<T> extends AutoCloseable {
         * @return Client for the cluster
         * @throws ClusterDeploymentException if the cluster could not be 
deployed
         */
-       ClusterClient<T> deploySessionCluster(ClusterSpecification 
clusterSpecification) throws ClusterDeploymentException;
+       ClusterClientProvider<T> deploySessionCluster(ClusterSpecification 
clusterSpecification) throws ClusterDeploymentException;
 
        /**
         * Deploys a per-job cluster with the given job on the cluster.
@@ -61,7 +61,7 @@ public interface ClusterDescriptor<T> extends AutoCloseable {
         * @return Cluster client to talk to the Flink cluster
         * @throws ClusterDeploymentException if the cluster could not be 
deployed
         */
-       ClusterClient<T> deployJobCluster(
+       ClusterClientProvider<T> deployJobCluster(
                final ClusterSpecification clusterSpecification,
                final JobGraph jobGraph,
                final boolean detached) throws ClusterDeploymentException;
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index bdf8faf..a79ff2f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -44,21 +45,23 @@ public class StandaloneClusterDescriptor implements 
ClusterDescriptor<Standalone
        }
 
        @Override
-       public RestClusterClient<StandaloneClusterId> 
retrieve(StandaloneClusterId standaloneClusterId) throws 
ClusterRetrieveException {
-               try {
-                       return new RestClusterClient<>(config, 
standaloneClusterId);
-               } catch (Exception e) {
-                       throw new ClusterRetrieveException("Couldn't retrieve 
standalone cluster", e);
-               }
+       public ClusterClientProvider<StandaloneClusterId> 
retrieve(StandaloneClusterId standaloneClusterId) throws 
ClusterRetrieveException {
+               return () -> {
+                       try {
+                               return new RestClusterClient<>(config, 
standaloneClusterId);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Couldn't retrieve 
standalone cluster", e);
+                       }
+               };
        }
 
        @Override
-       public RestClusterClient<StandaloneClusterId> 
deploySessionCluster(ClusterSpecification clusterSpecification) {
+       public ClusterClientProvider<StandaloneClusterId> 
deploySessionCluster(ClusterSpecification clusterSpecification) {
                throw new UnsupportedOperationException("Can't deploy a 
standalone cluster.");
        }
 
        @Override
-       public RestClusterClient<StandaloneClusterId> deployJobCluster(
+       public ClusterClientProvider<StandaloneClusterId> deployJobCluster(
                        ClusterSpecification clusterSpecification,
                        JobGraph jobGraph,
                        boolean detached) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClientProvider.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClientProvider.java
new file mode 100644
index 0000000..23d08ad
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClientProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Factory for {@link ClusterClient ClusterClients}.
+ */
+@Internal
+public interface ClusterClientProvider<T> {
+
+       /**
+        * Creates and returns a new {@link ClusterClient}. The returned client 
needs to be closed via
+        * {@link ClusterClient#close()} after use.
+        */
+       ClusterClient<T> getClusterClient();
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java 
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index ba15088..8a0a1f2 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -109,6 +109,6 @@ public class DefaultCLITest extends CliFrontendTestBase {
                checkState(clusterFactory != null);
 
                final ClusterDescriptor<StandaloneClusterId> clusterDescriptor 
= clusterFactory.createClusterDescriptor(executorConfig);
-               return 
clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig));
+               return 
clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig)).getClusterClient();
        }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index a7af09b..a4c754f 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.cli.util;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -41,21 +42,21 @@ public class DummyClusterDescriptor<T> implements 
ClusterDescriptor<T> {
        }
 
        @Override
-       public ClusterClient<T> retrieve(T clusterId) {
-               return clusterClient;
+       public ClusterClientProvider<T> retrieve(T clusterId) {
+               return () -> clusterClient;
        }
 
        @Override
-       public ClusterClient<T> deploySessionCluster(ClusterSpecification 
clusterSpecification) {
-               return clusterClient;
+       public ClusterClientProvider<T> 
deploySessionCluster(ClusterSpecification clusterSpecification) {
+               return () -> clusterClient;
        }
 
        @Override
-       public ClusterClient<T> deployJobCluster(
+       public ClusterClientProvider<T> deployJobCluster(
                        ClusterSpecification clusterSpecification,
                        JobGraph jobGraph,
                        boolean detached) {
-               return clusterClient;
+               return () -> clusterClient;
        }
 
        @Override
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index c0ffae6..ff1f5ff 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -555,7 +555,9 @@ public class RestClusterClientTest extends TestLogger {
                checkState(clusterFactory != null);
 
                final ClusterDescriptor<StandaloneClusterId> clusterDescriptor 
= clusterFactory.createClusterDescriptor(executorConfig);
-               final RestClusterClient<?> clusterClient = 
(RestClusterClient<?>) 
clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig));
+               final RestClusterClient<?> clusterClient = 
(RestClusterClient<?>) clusterDescriptor
+                               
.retrieve(clusterFactory.getClusterId(executorConfig))
+                               .getClusterClient();
 
                URL webMonitorBaseUrl = 
clusterClient.getWebMonitorBaseUrl().get();
                assertThat(webMonitorBaseUrl.getHost(), 
equalTo(manualHostname));
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index 62086fd..ff04a59 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -74,60 +75,61 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor<String> {
                return CLUSTER_DESCRIPTION;
        }
 
-       private ClusterClient<String> createClusterClient(String clusterId) 
throws Exception {
-               final Configuration configuration = new 
Configuration(flinkConfig);
+       private ClusterClientProvider<String> 
createClusterClientProvider(String clusterId) {
+               return () -> {
+                       final Configuration configuration = new 
Configuration(flinkConfig);
 
-               final Endpoint restEndpoint = client.getRestEndpoint(clusterId);
+                       final Endpoint restEndpoint = 
client.getRestEndpoint(clusterId);
 
-               if (restEndpoint != null) {
-                       configuration.setString(RestOptions.ADDRESS, 
restEndpoint.getAddress());
-                       configuration.setInteger(RestOptions.PORT, 
restEndpoint.getPort());
-               } else {
-                       throw new ClusterRetrieveException("Could not get the 
rest endpoint of " + clusterId);
-               }
+                       if (restEndpoint != null) {
+                               configuration.setString(RestOptions.ADDRESS, 
restEndpoint.getAddress());
+                               configuration.setInteger(RestOptions.PORT, 
restEndpoint.getPort());
+                       } else {
+                               throw new RuntimeException(
+                                               new ClusterRetrieveException(
+                                                               "Could not get 
the rest endpoint of " + clusterId));
+                       }
 
-               return new RestClusterClient<>(configuration, clusterId);
+                       try {
+
+                               RestClusterClient<String> resultClient = new 
RestClusterClient<>(
+                                               configuration, clusterId);
+                               LOG.info(
+                                               "Succesfully retrieved cluster 
client for cluster {}, JobManager Web Interface : {}",
+                                               clusterId,
+                                               
resultClient.getWebInterfaceURL());
+                               return resultClient;
+                       } catch (Exception e) {
+                               client.handleException(e);
+                               throw new RuntimeException(new 
ClusterRetrieveException("Could not create the RestClusterClient.", e));
+                       }
+               };
        }
 
        @Override
-       public ClusterClient<String> retrieve(String clusterId) throws 
ClusterRetrieveException {
-               try {
-                       final ClusterClient<String> retrievedClient = 
createClusterClient(clusterId);
-                       LOG.info(
-                               "Retrieve flink cluster {} successfully, 
JobManager Web Interface : {}",
-                               clusterId,
-                               retrievedClient.getWebInterfaceURL());
-                       return retrievedClient;
-               } catch (Exception e) {
-                       client.handleException(e);
-                       throw new ClusterRetrieveException("Could not create 
the RestClusterClient.", e);
-               }
+       public ClusterClientProvider<String> retrieve(String clusterId) {
+               return createClusterClientProvider(clusterId);
        }
 
        @Override
-       public ClusterClient<String> deploySessionCluster(ClusterSpecification 
clusterSpecification) throws ClusterDeploymentException {
-               final ClusterClient<String> clusterClient = 
deployClusterInternal(
+       public ClusterClientProvider<String> 
deploySessionCluster(ClusterSpecification clusterSpecification) throws 
ClusterDeploymentException {
+               final ClusterClientProvider<String> clusterClient = 
deployClusterInternal(
                        KubernetesSessionClusterEntrypoint.class.getName(),
                        clusterSpecification,
                        false);
 
-               LOG.info(
-                       "Create flink session cluster {} successfully, 
JobManager Web Interface: {}",
-                       clusterId,
-                       clusterClient.getWebInterfaceURL());
-
                return clusterClient;
        }
 
        @Override
-       public ClusterClient<String> deployJobCluster(
+       public ClusterClientProvider<String> deployJobCluster(
                        ClusterSpecification clusterSpecification,
                        JobGraph jobGraph,
                        boolean detached) throws ClusterDeploymentException {
                throw new ClusterDeploymentException("Per job could not be 
supported now.");
        }
 
-       private ClusterClient<String> deployClusterInternal(
+       private ClusterClientProvider<String> deployClusterInternal(
                        String entryPoint,
                        ClusterSpecification clusterSpecification,
                        boolean detached) throws ClusterDeploymentException {
@@ -178,7 +180,16 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor<String> {
                        client.createConfigMap();
                        
client.createFlinkMasterDeployment(clusterSpecification);
 
-                       return createClusterClient(clusterId);
+                       ClusterClientProvider<String> clusterClientProvider = 
createClusterClientProvider(clusterId);
+
+                       try (ClusterClient<String> clusterClient = 
clusterClientProvider.getClusterClient()) {
+                               LOG.info(
+                                               "Create flink session cluster 
{} successfully, JobManager Web Interface: {}",
+                                               clusterId,
+                                               
clusterClient.getWebInterfaceURL());
+                       }
+
+                       return clusterClientProvider;
                } catch (Exception e) {
                        client.handleException(e);
                        throw new ClusterDeploymentException("Could not create 
Kubernetes cluster " + clusterId, e);
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
index 3f2f480..620abb1 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java
@@ -242,10 +242,12 @@ public class FlinkKubernetesCustomCli extends 
AbstractCustomCommandLine {
 
                        // Retrieve or create a session cluster.
                        if (clusterId != null && 
kubeClient.getInternalService(clusterId) != null) {
-                               clusterClient = 
kubernetesClusterDescriptor.retrieve(clusterId);
+                               clusterClient = 
kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
                        } else {
-                               clusterClient = 
kubernetesClusterDescriptor.deploySessionCluster(
-                                       
kubernetesClusterClientFactory.getClusterSpecification(configuration));
+                               clusterClient = kubernetesClusterDescriptor
+                                               .deploySessionCluster(
+                                                               
kubernetesClusterClientFactory.getClusterSpecification(configuration))
+                                               .getClusterClient();
                                clusterId = clusterClient.getClusterId();
                        }
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
index ae15265..c1bdc6d 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
@@ -55,7 +55,8 @@ public class KubernetesClusterDescriptorTest extends 
KubernetesTestBase {
                        .setSlotsPerTaskManager(1)
                        .createClusterSpecification();
 
-               final ClusterClient<String> clusterClient = 
descriptor.deploySessionCluster(clusterSpecification);
+               final ClusterClient<String> clusterClient =
+                               
descriptor.deploySessionCluster(clusterSpecification).getClusterClient();
 
                assertEquals(CLUSTER_ID, clusterClient.getClusterId());
                assertEquals(String.format("http://%s:8081";, MOCK_SERVICE_IP), 
clusterClient.getWebInterfaceURL());
@@ -91,6 +92,8 @@ public class KubernetesClusterDescriptorTest extends 
KubernetesTestBase {
                assertEquals(
                        clusterSpecification.getMasterMemoryMB() + 
Constants.RESOURCE_UNIT_MB,
                        
jmContainer.getResources().getLimits().get(Constants.RESOURCE_NAME_MEMORY).getAmount());
+
+               clusterClient.close();
        }
 
        @Test
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index ef7d12d..9494502 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -251,7 +251,9 @@ object FlinkShell {
     val clusterSpecification = 
clientFactory.getClusterSpecification(executorConfig)
 
     val clusterClient = try {
-      clusterDescriptor.deploySessionCluster(clusterSpecification)
+      clusterDescriptor
+        .deploySessionCluster(clusterSpecification)
+        .getClusterClient
     } finally {
       clusterDescriptor.close()
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8654dd2..5d02355 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1671,10 +1671,6 @@ public class StreamExecutionEnvironment {
         * the program that have resulted in a "sink" operation. Sink 
operations are
         * for example printing results or forwarding them to a message queue.
         *
-        * <p><b>ATTENTION:</b> The caller of this method is responsible for 
managing the lifecycle of
-        * the returned {@link JobClient}. This means calling {@link 
JobClient#close()} at the end of
-        * its usage. In other case, there may be resource leaks depending on 
the JobClient implementation.
-        *
         * @param streamGraph the stream graph representing the transformations
         * @return A future of {@link JobClient} that can be used to 
communicate with the submitted job, completed on submission succeeded.
         * @throws Exception which occurs during job execution.
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 0e4c986..8e2b5de 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -547,7 +547,7 @@ public class LocalExecutor implements Executor {
                        ClusterClient<T> clusterClient = null;
                        try {
                                // retrieve existing cluster
-                               clusterClient = 
clusterDescriptor.retrieve(context.getClusterId());
+                               clusterClient = 
clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
                                try {
                                        clusterClient.cancel(new 
JobID(StringUtils.hexStringToByte(resultId))).get();
                                } catch (Throwable t) {
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 2697b03..5ba1671 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -289,12 +289,13 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
        private RestClusterClient<ApplicationId> 
deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws 
ClusterDeploymentException {
                final int masterMemory = 256;
                final int taskManagerMemory = 1024;
-               final ClusterClient<ApplicationId> yarnClusterClient = 
yarnClusterDescriptor.deploySessionCluster(
-                       new ClusterSpecification.ClusterSpecificationBuilder()
-                               .setMasterMemoryMB(masterMemory)
-                               .setTaskManagerMemoryMB(taskManagerMemory)
-                               .setSlotsPerTaskManager(1)
-                               .createClusterSpecification());
+               final ClusterClient<ApplicationId> yarnClusterClient = 
yarnClusterDescriptor
+                               .deploySessionCluster(new 
ClusterSpecification.ClusterSpecificationBuilder()
+                                               .setMasterMemoryMB(masterMemory)
+                                               
.setTaskManagerMemoryMB(taskManagerMemory)
+                                               .setSlotsPerTaskManager(1)
+                                               .createClusterSpecification())
+                               .getClusterClient();
 
                assertThat(yarnClusterClient, 
is(instanceOf(RestClusterClient.class)));
                return (RestClusterClient<ApplicationId>) yarnClusterClient;
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 43eca61..97ff389 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -112,10 +112,12 @@ public class YARNITCase extends YarnTestBase {
                        File testingJar = YarnTestBase.findFile("..", new 
YarnTestUtils.TestJarFinder("flink-yarn-tests"));
 
                        jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
-                       try (ClusterClient<ApplicationId> clusterClient = 
yarnClusterDescriptor.deployJobCluster(
-                               clusterSpecification,
-                               jobGraph,
-                               false)) {
+                       try (ClusterClient<ApplicationId> clusterClient = 
yarnClusterDescriptor
+                                       .deployJobCluster(
+                                                       clusterSpecification,
+                                                       jobGraph,
+                                                       false)
+                                       .getClusterClient()) {
 
                                ApplicationId applicationId = 
clusterClient.getClusterId();
 
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 913c5ac..7a3cfd8 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -114,7 +114,9 @@ public class YarnConfigurationITCase extends YarnTestBase {
                                        
.setSlotsPerTaskManager(slotsPerTaskManager)
                                        .createClusterSpecification();
 
-                               final ClusterClient<ApplicationId> 
clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, 
jobGraph, true);
+                               final ClusterClient<ApplicationId> 
clusterClient = clusterDescriptor
+                                               
.deployJobCluster(clusterSpecification, jobGraph, true)
+                                               .getClusterClient();
 
                                final ApplicationId clusterId = 
clusterClient.getClusterId();
 
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java
index d69f34e..f9d39db 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java
@@ -87,10 +87,12 @@ public class YarnDistributedCacheITCase extends 
YarnTestBase {
 
                                jobGraph.addJar(new 
org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-                               try (ClusterClient<ApplicationId> clusterClient 
= yarnClusterDescriptor.deployJobCluster(
-                                       clusterSpecification,
-                                       jobGraph,
-                                       false)) {
+                               try (ClusterClient<ApplicationId> clusterClient 
= yarnClusterDescriptor
+                                               .deployJobCluster(
+                                                               
clusterSpecification,
+                                                               jobGraph,
+                                                               false)
+                                               .getClusterClient()) {
 
                                        ApplicationId applicationId = 
clusterClient.getClusterId();
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 35a2711..bc729c7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
 import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigUtils;
@@ -364,7 +364,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
        // -------------------------------------------------------------
 
        @Override
-       public ClusterClient<ApplicationId> retrieve(ApplicationId 
applicationId) throws ClusterRetrieveException {
+       public ClusterClientProvider<ApplicationId> retrieve(ApplicationId 
applicationId) throws ClusterRetrieveException {
 
                try {
                        // check if required Hadoop environment variables are 
set. If not, warn user
@@ -386,14 +386,20 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
                        setClusterEntrypointInfoToConfig(report);
 
-                       return new RestClusterClient<>(flinkConfiguration, 
report.getApplicationId());
+                       return () -> {
+                               try {
+                                       return new 
RestClusterClient<>(flinkConfiguration, report.getApplicationId());
+                               } catch (Exception e) {
+                                       throw new RuntimeException("Couldn't 
retrieve Yarn cluster", e);
+                               }
+                       };
                } catch (Exception e) {
                        throw new ClusterRetrieveException("Couldn't retrieve 
Yarn cluster", e);
                }
        }
 
        @Override
-       public ClusterClient<ApplicationId> 
deploySessionCluster(ClusterSpecification clusterSpecification) throws 
ClusterDeploymentException {
+       public ClusterClientProvider<ApplicationId> 
deploySessionCluster(ClusterSpecification clusterSpecification) throws 
ClusterDeploymentException {
                try {
                        return deployInternal(
                                        clusterSpecification,
@@ -407,7 +413,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
        }
 
        @Override
-       public ClusterClient<ApplicationId> deployJobCluster(
+       public ClusterClientProvider<ApplicationId> deployJobCluster(
                ClusterSpecification clusterSpecification,
                JobGraph jobGraph,
                boolean detached) throws ClusterDeploymentException {
@@ -461,7 +467,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         * @param jobGraph A job graph which is deployed with the Flink 
cluster, {@code null} if none
         * @param detached True if the cluster should be started in detached 
mode
         */
-       private ClusterClient<ApplicationId> deployInternal(
+       private ClusterClientProvider<ApplicationId> deployInternal(
                        ClusterSpecification clusterSpecification,
                        String applicationName,
                        String yarnClusterEntrypoint,
@@ -555,7 +561,13 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
 
                setClusterEntrypointInfoToConfig(report);
 
-               return new RestClusterClient<>(flinkConfiguration, 
report.getApplicationId());
+               return () -> {
+                       try {
+                               return new 
RestClusterClient<>(flinkConfiguration, report.getApplicationId());
+                       } catch (Exception e) {
+                               throw new RuntimeException("Error while 
creating RestClusterClient.", e);
+                       }
+               };
        }
 
        private ClusterSpecification validateClusterResources(
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fd97496..da5c68f 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -545,17 +546,18 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
                                System.out.println(description);
                                return 0;
                        } else {
-                               final ClusterClient<ApplicationId> 
clusterClient;
+                               final ClusterClientProvider<ApplicationId> 
clusterClientProvider;
                                final ApplicationId yarnApplicationId;
 
                                if (cmd.hasOption(applicationId.getOpt())) {
                                        yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
 
-                                       clusterClient = 
yarnClusterDescriptor.retrieve(yarnApplicationId);
+                                       clusterClientProvider = 
yarnClusterDescriptor.retrieve(yarnApplicationId);
                                } else {
                                        final ClusterSpecification 
clusterSpecification = 
yarnClusterClientFactory.getClusterSpecification(configuration);
 
-                                       clusterClient = 
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
+                                       clusterClientProvider = 
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
+                                       ClusterClient<ApplicationId> 
clusterClient = clusterClientProvider.getClusterClient();
 
                                        //------------------ ClusterClient 
deployed, handle connection details
                                        yarnApplicationId = 
clusterClient.getClusterId();
@@ -597,20 +599,20 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
                                                new 
ScheduledExecutorServiceAdapter(scheduledExecutorService));
                                        Thread shutdownHook = 
ShutdownHookUtil.addShutdownHook(
                                                () -> shutdownCluster(
-                                                       clusterClient,
-                                                       
scheduledExecutorService,
-                                                       
yarnApplicationStatusMonitor),
-                                               getClass().getSimpleName(),
-                                               LOG);
+                                                               
clusterClientProvider.getClusterClient(),
+                                                               
scheduledExecutorService,
+                                                               
yarnApplicationStatusMonitor),
+                                                               
getClass().getSimpleName(),
+                                                               LOG);
                                        try {
                                                runInteractiveCli(
                                                        
yarnApplicationStatusMonitor,
                                                        acceptInteractiveInput);
                                        } finally {
                                                shutdownCluster(
-                                                       clusterClient,
-                                                       
scheduledExecutorService,
-                                                       
yarnApplicationStatusMonitor);
+                                                               
clusterClientProvider.getClusterClient(),
+                                                               
scheduledExecutorService,
+                                                               
yarnApplicationStatusMonitor);
 
                                                if (shutdownHook != null) {
                                                        // we do not need the 
hook anymore as we have just tried to shutdown the cluster.

Reply via email to