This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6825f803de17f87007e0531d82baf14bdbab2fd4 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Fri Dec 6 19:30:41 2019 +0100 [FLINK-15116] Return a ClusterClientProvider from ClusterDescriptor methods This allows the consumer of the methods to create a new ClusterClient with a separate lifecycle whenever necessary. --- .../org/apache/flink/client/cli/CliFrontend.java | 2 +- .../deployment/AbstractJobClusterExecutor.java | 4 +- .../deployment/AbstractSessionClusterExecutor.java | 5 +- .../flink/client/deployment/ClusterDescriptor.java | 8 +-- .../deployment/StandaloneClusterDescriptor.java | 19 +++--- .../client/program/ClusterClientProvider.java | 35 ++++++++++ .../apache/flink/client/cli/DefaultCLITest.java | 2 +- .../client/cli/util/DummyClusterDescriptor.java | 13 ++-- .../client/program/rest/RestClusterClientTest.java | 4 +- .../kubernetes/KubernetesClusterDescriptor.java | 75 +++++++++++++--------- .../kubernetes/cli/FlinkKubernetesCustomCli.java | 8 ++- .../KubernetesClusterDescriptorTest.java | 5 +- .../org/apache/flink/api/scala/FlinkShell.scala | 4 +- .../environment/StreamExecutionEnvironment.java | 4 -- .../table/client/gateway/local/LocalExecutor.java | 2 +- .../flink/yarn/YARNHighAvailabilityITCase.java | 13 ++-- .../java/org/apache/flink/yarn/YARNITCase.java | 10 +-- .../apache/flink/yarn/YarnConfigurationITCase.java | 4 +- .../flink/yarn/YarnDistributedCacheITCase.java | 10 +-- .../apache/flink/yarn/YarnClusterDescriptor.java | 26 ++++++-- .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 24 +++---- 21 files changed, 179 insertions(+), 98 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 2ef7c77..3566772 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -862,7 +862,7 @@ public class CliFrontend { "you would like to connect."); } else { try { - final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterId); + final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterId).getClusterClient(); try { clusterAction.runAction(clusterClient); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java index 6a18c2f..fb603bc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java @@ -63,7 +63,9 @@ public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends Cluster final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); - final ClusterClient<ClusterID> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()); + final ClusterClient<ClusterID> clusterClient = clusterDescriptor + .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode()) + .getClusterClient(); LOG.info("Job has been submitted with JobID " + jobGraph.getJobID()); final boolean withShutdownHook = !configAccessor.getDetachedMode() && configAccessor.isShutdownOnAttachedExit(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java index 93b815a..784edcd 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java @@ -56,7 +56,10 @@ public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends Clu final ClusterID clusterID = clusterClientFactory.getClusterId(configuration); checkState(clusterID != null); - final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID); + final ClusterClient<ClusterID> clusterClient = clusterDescriptor + .retrieve(clusterID) + .getClusterClient(); + return clusterClient .submitJob(jobGraph) .thenApply(jobID -> new ClusterClientJobClientAdapter<ClusterID>(clusterClient, jobID) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java index e6b3922..ca5db45 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java @@ -18,7 +18,7 @@ package org.apache.flink.client.deployment; -import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.FlinkException; @@ -41,7 +41,7 @@ public interface ClusterDescriptor<T> extends AutoCloseable { * @return Client for the cluster * @throws ClusterRetrieveException if the cluster client could not be retrieved */ - ClusterClient<T> retrieve(T clusterId) throws ClusterRetrieveException; + ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException; /** * Triggers deployment of a cluster. @@ -49,7 +49,7 @@ public interface ClusterDescriptor<T> extends AutoCloseable { * @return Client for the cluster * @throws ClusterDeploymentException if the cluster could not be deployed */ - ClusterClient<T> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; + ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException; /** * Deploys a per-job cluster with the given job on the cluster. @@ -61,7 +61,7 @@ public interface ClusterDescriptor<T> extends AutoCloseable { * @return Cluster client to talk to the Flink cluster * @throws ClusterDeploymentException if the cluster could not be deployed */ - ClusterClient<T> deployJobCluster( + ClusterClientProvider<T> deployJobCluster( final ClusterSpecification clusterSpecification, final JobGraph jobGraph, final boolean detached) throws ClusterDeploymentException; diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java index bdf8faf..a79ff2f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment; +import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; @@ -44,21 +45,23 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone } @Override - public RestClusterClient<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException { - try { - return new RestClusterClient<>(config, standaloneClusterId); - } catch (Exception e) { - throw new ClusterRetrieveException("Couldn't retrieve standalone cluster", e); - } + public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException { + return () -> { + try { + return new RestClusterClient<>(config, standaloneClusterId); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve standalone cluster", e); + } + }; } @Override - public RestClusterClient<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) { + public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) { throw new UnsupportedOperationException("Can't deploy a standalone cluster."); } @Override - public RestClusterClient<StandaloneClusterId> deployJobCluster( + public ClusterClientProvider<StandaloneClusterId> deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClientProvider.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClientProvider.java new file mode 100644 index 0000000..23d08ad --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClientProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.client.program; + +import org.apache.flink.annotation.Internal; + +/** + * Factory for {@link ClusterClient ClusterClients}. + */ +@Internal +public interface ClusterClientProvider<T> { + + /** + * Creates and returns a new {@link ClusterClient}. The returned client needs to be closed via + * {@link ClusterClient#close()} after use. + */ + ClusterClient<T> getClusterClient(); +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java index ba15088..8a0a1f2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java @@ -109,6 +109,6 @@ public class DefaultCLITest extends CliFrontendTestBase { checkState(clusterFactory != null); final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = clusterFactory.createClusterDescriptor(executorConfig); - return clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig)); + return clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig)).getClusterClient(); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java index a7af09b..a4c754f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java @@ -21,6 +21,7 @@ package org.apache.flink.client.cli.util; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.Preconditions; @@ -41,21 +42,21 @@ public class DummyClusterDescriptor<T> implements ClusterDescriptor<T> { } @Override - public ClusterClient<T> retrieve(T clusterId) { - return clusterClient; + public ClusterClientProvider<T> retrieve(T clusterId) { + return () -> clusterClient; } @Override - public ClusterClient<T> deploySessionCluster(ClusterSpecification clusterSpecification) { - return clusterClient; + public ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification) { + return () -> clusterClient; } @Override - public ClusterClient<T> deployJobCluster( + public ClusterClientProvider<T> deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { - return clusterClient; + return () -> clusterClient; } @Override diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index c0ffae6..ff1f5ff 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -555,7 +555,9 @@ public class RestClusterClientTest extends TestLogger { checkState(clusterFactory != null); final ClusterDescriptor<StandaloneClusterId> clusterDescriptor = clusterFactory.createClusterDescriptor(executorConfig); - final RestClusterClient<?> clusterClient = (RestClusterClient<?>) clusterDescriptor.retrieve(clusterFactory.getClusterId(executorConfig)); + final RestClusterClient<?> clusterClient = (RestClusterClient<?>) clusterDescriptor + .retrieve(clusterFactory.getClusterId(executorConfig)) + .getClusterClient(); URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get(); assertThat(webMonitorBaseUrl.getHost(), equalTo(manualHostname)); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java index 62086fd..ff04a59 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -23,6 +23,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterRetrieveException; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; @@ -74,60 +75,61 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> { return CLUSTER_DESCRIPTION; } - private ClusterClient<String> createClusterClient(String clusterId) throws Exception { - final Configuration configuration = new Configuration(flinkConfig); + private ClusterClientProvider<String> createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); - final Endpoint restEndpoint = client.getRestEndpoint(clusterId); + final Endpoint restEndpoint = client.getRestEndpoint(clusterId); - if (restEndpoint != null) { - configuration.setString(RestOptions.ADDRESS, restEndpoint.getAddress()); - configuration.setInteger(RestOptions.PORT, restEndpoint.getPort()); - } else { - throw new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId); - } + if (restEndpoint != null) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException( + "Could not get the rest endpoint of " + clusterId)); + } - return new RestClusterClient<>(configuration, clusterId); + try { + + RestClusterClient<String> resultClient = new RestClusterClient<>( + configuration, clusterId); + LOG.info( + "Succesfully retrieved cluster client for cluster {}, JobManager Web Interface : {}", + clusterId, + resultClient.getWebInterfaceURL()); + return resultClient; + } catch (Exception e) { + client.handleException(e); + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; } @Override - public ClusterClient<String> retrieve(String clusterId) throws ClusterRetrieveException { - try { - final ClusterClient<String> retrievedClient = createClusterClient(clusterId); - LOG.info( - "Retrieve flink cluster {} successfully, JobManager Web Interface : {}", - clusterId, - retrievedClient.getWebInterfaceURL()); - return retrievedClient; - } catch (Exception e) { - client.handleException(e); - throw new ClusterRetrieveException("Could not create the RestClusterClient.", e); - } + public ClusterClientProvider<String> retrieve(String clusterId) { + return createClusterClientProvider(clusterId); } @Override - public ClusterClient<String> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { - final ClusterClient<String> clusterClient = deployClusterInternal( + public ClusterClientProvider<String> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { + final ClusterClientProvider<String> clusterClient = deployClusterInternal( KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); - LOG.info( - "Create flink session cluster {} successfully, JobManager Web Interface: {}", - clusterId, - clusterClient.getWebInterfaceURL()); - return clusterClient; } @Override - public ClusterClient<String> deployJobCluster( + public ClusterClientProvider<String> deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException { throw new ClusterDeploymentException("Per job could not be supported now."); } - private ClusterClient<String> deployClusterInternal( + private ClusterClientProvider<String> deployClusterInternal( String entryPoint, ClusterSpecification clusterSpecification, boolean detached) throws ClusterDeploymentException { @@ -178,7 +180,16 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> { client.createConfigMap(); client.createFlinkMasterDeployment(clusterSpecification); - return createClusterClient(clusterId); + ClusterClientProvider<String> clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + + return clusterClientProvider; } catch (Exception e) { client.handleException(e); throw new ClusterDeploymentException("Could not create Kubernetes cluster " + clusterId, e); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java index 3f2f480..620abb1 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/FlinkKubernetesCustomCli.java @@ -242,10 +242,12 @@ public class FlinkKubernetesCustomCli extends AbstractCustomCommandLine { // Retrieve or create a session cluster. if (clusterId != null && kubeClient.getInternalService(clusterId) != null) { - clusterClient = kubernetesClusterDescriptor.retrieve(clusterId); + clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient(); } else { - clusterClient = kubernetesClusterDescriptor.deploySessionCluster( - kubernetesClusterClientFactory.getClusterSpecification(configuration)); + clusterClient = kubernetesClusterDescriptor + .deploySessionCluster( + kubernetesClusterClientFactory.getClusterSpecification(configuration)) + .getClusterClient(); clusterId = clusterClient.getClusterId(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java index ae15265..c1bdc6d 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java @@ -55,7 +55,8 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase { .setSlotsPerTaskManager(1) .createClusterSpecification(); - final ClusterClient<String> clusterClient = descriptor.deploySessionCluster(clusterSpecification); + final ClusterClient<String> clusterClient = + descriptor.deploySessionCluster(clusterSpecification).getClusterClient(); assertEquals(CLUSTER_ID, clusterClient.getClusterId()); assertEquals(String.format("http://%s:8081", MOCK_SERVICE_IP), clusterClient.getWebInterfaceURL()); @@ -91,6 +92,8 @@ public class KubernetesClusterDescriptorTest extends KubernetesTestBase { assertEquals( clusterSpecification.getMasterMemoryMB() + Constants.RESOURCE_UNIT_MB, jmContainer.getResources().getLimits().get(Constants.RESOURCE_NAME_MEMORY).getAmount()); + + clusterClient.close(); } @Test diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index ef7d12d..9494502 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -251,7 +251,9 @@ object FlinkShell { val clusterSpecification = clientFactory.getClusterSpecification(executorConfig) val clusterClient = try { - clusterDescriptor.deploySessionCluster(clusterSpecification) + clusterDescriptor + .deploySessionCluster(clusterSpecification) + .getClusterClient } finally { clusterDescriptor.close() } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 8654dd2..5d02355 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -1671,10 +1671,6 @@ public class StreamExecutionEnvironment { * the program that have resulted in a "sink" operation. Sink operations are * for example printing results or forwarding them to a message queue. * - * <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of - * the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of - * its usage. In other case, there may be resource leaks depending on the JobClient implementation. - * * @param streamGraph the stream graph representing the transformations * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. * @throws Exception which occurs during job execution. diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 0e4c986..8e2b5de 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -547,7 +547,7 @@ public class LocalExecutor implements Executor { ClusterClient<T> clusterClient = null; try { // retrieve existing cluster - clusterClient = clusterDescriptor.retrieve(context.getClusterId()); + clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient(); try { clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get(); } catch (Throwable t) { diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 2697b03..5ba1671 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -289,12 +289,13 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException { final int masterMemory = 256; final int taskManagerMemory = 1024; - final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor.deploySessionCluster( - new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(masterMemory) - .setTaskManagerMemoryMB(taskManagerMemory) - .setSlotsPerTaskManager(1) - .createClusterSpecification()); + final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor + .deploySessionCluster(new ClusterSpecification.ClusterSpecificationBuilder() + .setMasterMemoryMB(masterMemory) + .setTaskManagerMemoryMB(taskManagerMemory) + .setSlotsPerTaskManager(1) + .createClusterSpecification()) + .getClusterClient(); assertThat(yarnClusterClient, is(instanceOf(RestClusterClient.class))); return (RestClusterClient<ApplicationId>) yarnClusterClient; diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index 43eca61..97ff389 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -112,10 +112,12 @@ public class YARNITCase extends YarnTestBase { File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests")); jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); - try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster( - clusterSpecification, - jobGraph, - false)) { + try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor + .deployJobCluster( + clusterSpecification, + jobGraph, + false) + .getClusterClient()) { ApplicationId applicationId = clusterClient.getClusterId(); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index 913c5ac..7a3cfd8 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -114,7 +114,9 @@ public class YarnConfigurationITCase extends YarnTestBase { .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification(); - final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true); + final ClusterClient<ApplicationId> clusterClient = clusterDescriptor + .deployJobCluster(clusterSpecification, jobGraph, true) + .getClusterClient(); final ApplicationId clusterId = clusterClient.getClusterId(); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java index d69f34e..f9d39db 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnDistributedCacheITCase.java @@ -87,10 +87,12 @@ public class YarnDistributedCacheITCase extends YarnTestBase { jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI())); - try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster( - clusterSpecification, - jobGraph, - false)) { + try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor + .deployJobCluster( + clusterSpecification, + jobGraph, + false) + .getClusterClient()) { ApplicationId applicationId = clusterClient.getClusterId(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 35a2711..bc729c7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -25,7 +25,7 @@ import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterRetrieveException; import org.apache.flink.client.deployment.ClusterSpecification; -import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigUtils; @@ -364,7 +364,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { // ------------------------------------------------------------- @Override - public ClusterClient<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException { + public ClusterClientProvider<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException { try { // check if required Hadoop environment variables are set. If not, warn user @@ -386,14 +386,20 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { setClusterEntrypointInfoToConfig(report); - return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()); + return () -> { + try { + return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve Yarn cluster", e); + } + }; } catch (Exception e) { throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", e); } } @Override - public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { + public ClusterClientProvider<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, @@ -407,7 +413,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { } @Override - public ClusterClient<ApplicationId> deployJobCluster( + public ClusterClientProvider<ApplicationId> deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException { @@ -461,7 +467,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { * @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none * @param detached True if the cluster should be started in detached mode */ - private ClusterClient<ApplicationId> deployInternal( + private ClusterClientProvider<ApplicationId> deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @@ -555,7 +561,13 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { setClusterEntrypointInfoToConfig(report); - return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()); + return () -> { + try { + return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()); + } catch (Exception e) { + throw new RuntimeException("Error while creating RestClusterClient.", e); + } + }; } private ClusterSpecification validateClusterResources( diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index fd97496..da5c68f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -27,6 +27,7 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -545,17 +546,18 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { System.out.println(description); return 0; } else { - final ClusterClient<ApplicationId> clusterClient; + final ClusterClientProvider<ApplicationId> clusterClientProvider; final ApplicationId yarnApplicationId; if (cmd.hasOption(applicationId.getOpt())) { yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); - clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId); + clusterClientProvider = yarnClusterDescriptor.retrieve(yarnApplicationId); } else { final ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(configuration); - clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification); + clusterClientProvider = yarnClusterDescriptor.deploySessionCluster(clusterSpecification); + ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient(); //------------------ ClusterClient deployed, handle connection details yarnApplicationId = clusterClient.getClusterId(); @@ -597,20 +599,20 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { new ScheduledExecutorServiceAdapter(scheduledExecutorService)); Thread shutdownHook = ShutdownHookUtil.addShutdownHook( () -> shutdownCluster( - clusterClient, - scheduledExecutorService, - yarnApplicationStatusMonitor), - getClass().getSimpleName(), - LOG); + clusterClientProvider.getClusterClient(), + scheduledExecutorService, + yarnApplicationStatusMonitor), + getClass().getSimpleName(), + LOG); try { runInteractiveCli( yarnApplicationStatusMonitor, acceptInteractiveInput); } finally { shutdownCluster( - clusterClient, - scheduledExecutorService, - yarnApplicationStatusMonitor); + clusterClientProvider.getClusterClient(), + scheduledExecutorService, + yarnApplicationStatusMonitor); if (shutdownHook != null) { // we do not need the hook anymore as we have just tried to shutdown the cluster.