This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 035453f  [FLINK-15790][k8s] Make some interfaces in FlinkKubeClient 
asynchronous which potentially blocks the execution of RpcEndpoint's main thread
035453f is described below

commit 035453fb1472b917972daafcd552b7648b34eb31
Author: wangyang0918 <danrtsey...@alibaba-inc.com>
AuthorDate: Tue Apr 7 16:23:38 2020 +0800

    [FLINK-15790][k8s] Make some interfaces in FlinkKubeClient asynchronous 
which potentially blocks the execution of RpcEndpoint's main thread
    
    The interfaces in FlinkKubeClient will be called both in Client and 
ResourceManager. To avoid potentially blocking the execution of RpcEndpoint's 
main thread, these interfaces #createTaskManagerPod, #stopPod should be 
implemented asynchronously.
    
    This closes #11733.
---
 .../kubernetes/KubernetesClusterDescriptor.java    |  10 +-
 .../kubernetes/KubernetesResourceManager.java      |  55 ++++++---
 .../flink/kubernetes/cli/KubernetesSessionCli.java |   2 +-
 .../KubernetesResourceManagerConfiguration.java    |  42 +++++++
 .../KubernetesResourceManagerFactory.java          |  14 ++-
 .../kubeclient/Fabric8FlinkKubeClient.java         | 130 +++++++++++++--------
 .../kubernetes/kubeclient/FlinkKubeClient.java     |  28 ++---
 .../kubernetes/kubeclient/KubeClientFactory.java   |   9 +-
 .../kubernetes/KubernetesResourceManagerTest.java  |  80 ++++++++++---
 .../flink/kubernetes/KubernetesTestBase.java       |   6 +-
 .../kubernetes/kubeclient/Fabric8ClientTest.java   |  34 +++---
 11 files changed, 293 insertions(+), 117 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index 0888bdb..8976d3e 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -49,6 +49,8 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -83,11 +85,11 @@ public class KubernetesClusterDescriptor implements 
ClusterDescriptor<String> {
                return () -> {
                        final Configuration configuration = new 
Configuration(flinkConfig);
 
-                       final Endpoint restEndpoint = 
client.getRestEndpoint(clusterId);
+                       final Optional<Endpoint> restEndpoint = 
client.getRestEndpoint(clusterId);
 
-                       if (restEndpoint != null) {
-                               configuration.setString(RestOptions.ADDRESS, 
restEndpoint.getAddress());
-                               configuration.setInteger(RestOptions.PORT, 
restEndpoint.getPort());
+                       if (restEndpoint.isPresent()) {
+                               configuration.setString(RestOptions.ADDRESS, 
restEndpoint.get().getAddress());
+                               configuration.setInteger(RestOptions.PORT, 
restEndpoint.get().getPort());
                        } else {
                                throw new RuntimeException(
                                                new ClusterRetrieveException(
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
index fdf3afc..04ac00b 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
@@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
-import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
 import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
@@ -91,6 +91,8 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
 
        private final List<String> taskManagerStartCommand;
 
+       private final KubernetesResourceManagerConfiguration configuration;
+
        /** The number of pods requested, but not yet granted. */
        private int numPendingPodRequests = 0;
 
@@ -105,7 +107,9 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
                        JobLeaderIdService jobLeaderIdService,
                        ClusterInformation clusterInformation,
                        FatalErrorHandler fatalErrorHandler,
-                       ResourceManagerMetricGroup resourceManagerMetricGroup) {
+                       ResourceManagerMetricGroup resourceManagerMetricGroup,
+                       FlinkKubeClient kubeClient,
+                       KubernetesResourceManagerConfiguration configuration) {
                super(
                        flinkConfig,
                        System.getenv(),
@@ -119,15 +123,16 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
                        clusterInformation,
                        fatalErrorHandler,
                        resourceManagerMetricGroup);
-               this.clusterId = 
flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
+               this.clusterId = configuration.getClusterId();
                this.defaultCpus = 
taskExecutorProcessSpec.getCpuCores().getValue().doubleValue();
 
-               this.kubeClient = createFlinkKubeClient();
+               this.kubeClient = kubeClient;
 
                this.taskManagerParameters =
                        ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec, numSlotsPerTaskManager);
 
                this.taskManagerStartCommand = getTaskManagerStartCommand();
+               this.configuration = configuration;
        }
 
        @Override
@@ -184,12 +189,7 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
        public boolean stopWorker(final KubernetesWorkerNode worker) {
                LOG.info("Stopping Worker {}.", worker.getResourceID());
                workerNodes.remove(worker.getResourceID());
-               try {
-                       kubeClient.stopPod(worker.getResourceID().toString());
-               } catch (Exception e) {
-                       kubeClient.handleException(e);
-                       return false;
-               }
+               internalStopPod(worker.getResourceID().toString());
                return true;
        }
 
@@ -271,7 +271,23 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
                        env);
 
                log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-               kubeClient.createTaskManagerPod(parameter);
+               kubeClient.createTaskManagerPod(parameter)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               log.error("Could not start 
TaskManager in pod {}.", podName, throwable);
+                                               scheduleRunAsync(
+                                                       
this::decreasePendingAndRequestKubernetesPodIfRequired,
+                                                       
configuration.getPodCreationRetryInterval());
+                                       }
+                               }
+                       );
+       }
+
+       private void decreasePendingAndRequestKubernetesPodIfRequired() {
+               validateRunsInMainThread();
+               numPendingPodRequests--;
+               requestKubernetesPodIfRequired();
        }
 
        /**
@@ -288,7 +304,7 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
 
        private void removePodIfTerminated(KubernetesPod pod) {
                if (pod.isTerminated()) {
-                       kubeClient.stopPod(pod.getName());
+                       internalStopPod(pod.getName());
                        final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(new ResourceID(pod.getName()));
                        if (kubernetesWorkerNode != null) {
                                requestKubernetesPodIfRequired();
@@ -333,12 +349,19 @@ public class KubernetesResourceManager extends 
ActiveResourceManager<KubernetesW
                return labels;
        }
 
-       protected FlinkKubeClient createFlinkKubeClient() {
-               return KubeClientFactory.fromConfiguration(flinkConfig);
-       }
-
        @Override
        protected double getCpuCores(Configuration configuration) {
                return 
TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, 
KubernetesConfigOptions.TASK_MANAGER_CPU);
        }
+
+       private void internalStopPod(String podName) {
+               kubeClient.stopPod(podName)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               log.error("Could not stop 
TaskManager in pod {}.", podName, throwable);
+                                       }
+                               }
+                       );
+       }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
index 568ff2e..746e207 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
@@ -101,7 +101,7 @@ public class KubernetesSessionCli {
                        final FlinkKubeClient kubeClient = 
KubeClientFactory.fromConfiguration(configuration);
 
                        // Retrieve or create a session cluster.
-                       if (clusterId != null && 
kubeClient.getInternalService(clusterId) != null) {
+                       if (clusterId != null && 
kubeClient.getInternalService(clusterId).isPresent()) {
                                clusterClient = 
kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
                        } else {
                                clusterClient = kubernetesClusterDescriptor
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesResourceManagerConfiguration.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesResourceManagerConfiguration.java
new file mode 100644
index 0000000..c79b1f8
--- /dev/null
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesResourceManagerConfiguration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.api.common.time.Time;
+
+/**
+ * Configuration specific to {@link 
org.apache.flink.kubernetes.KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerConfiguration {
+       private final String clusterId;
+       private final Time podCreationRetryInterval;
+
+       public KubernetesResourceManagerConfiguration(String clusterId, Time 
podCreationRetryInterval) {
+               this.clusterId = clusterId;
+               this.podCreationRetryInterval = podCreationRetryInterval;
+       }
+
+       public String getClusterId() {
+               return clusterId;
+       }
+
+       public Time getPodCreationRetryInterval() {
+               return podCreationRetryInterval;
+       }
+}
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
index 7d01db5..16dbd43 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.kubernetes.entrypoint;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.KubernetesResourceManager;
 import org.apache.flink.kubernetes.KubernetesWorkerNode;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -43,6 +47,8 @@ public class KubernetesResourceManagerFactory extends 
ActiveResourceManagerFacto
 
        private static final KubernetesResourceManagerFactory INSTANCE = new 
KubernetesResourceManagerFactory();
 
+       private static final Time POD_CREATION_RETRY_INTERVAL = 
Time.seconds(3L);
+
        private KubernetesResourceManagerFactory() {}
 
        public static KubernetesResourceManagerFactory getInstance() {
@@ -66,6 +72,10 @@ public class KubernetesResourceManagerFactory extends 
ActiveResourceManagerFacto
                        rmServicesConfiguration,
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor());
+               final KubernetesResourceManagerConfiguration 
kubernetesResourceManagerConfiguration =
+                       new KubernetesResourceManagerConfiguration(
+                               
configuration.getString(KubernetesConfigOptions.CLUSTER_ID),
+                               POD_CREATION_RETRY_INTERVAL);
 
                return new KubernetesResourceManager(
                        rpcService,
@@ -78,6 +88,8 @@ public class KubernetesResourceManagerFactory extends 
ActiveResourceManagerFacto
                        rmRuntimeServices.getJobLeaderIdService(),
                        clusterInformation,
                        fatalErrorHandler,
-                       resourceManagerMetricGroup);
+                       resourceManagerMetricGroup,
+                       KubeClientFactory.fromConfiguration(configuration),
+                       kubernetesResourceManagerConfiguration);
        }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index 5d8d5e1..7051b95 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -36,10 +36,12 @@ import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesDeployment;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
 import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServicePort;
@@ -51,15 +53,16 @@ import io.fabric8.kubernetes.client.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -82,13 +85,20 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
        private final List<Decorator<Deployment, KubernetesDeployment>> 
flinkMasterDeploymentDecorators = new ArrayList<>();
        private final List<Decorator<Pod, KubernetesPod>> 
taskManagerPodDecorators = new ArrayList<>();
 
-       public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client) {
+       private final ExecutorService kubeClientExecutorService;
+
+       public Fabric8FlinkKubeClient(
+                       Configuration flinkConfig,
+                       KubernetesClient client,
+                       Supplier<ExecutorService> asyncExecutorFactory) {
                this.flinkConfig = checkNotNull(flinkConfig);
                this.internalClient = checkNotNull(client);
                this.clusterId = 
checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
 
                this.nameSpace = 
flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
 
+               this.kubeClientExecutorService = asyncExecutorFactory.get();
+
                initialize();
        }
 
@@ -158,69 +168,49 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
        }
 
        @Override
-       public void createTaskManagerPod(TaskManagerPodParameter parameter) {
-               KubernetesPod pod = new KubernetesPod(this.flinkConfig);
+       public CompletableFuture<Void> 
createTaskManagerPod(TaskManagerPodParameter parameter) {
+               return CompletableFuture.runAsync(
+                       () -> {
+                               KubernetesPod pod = new 
KubernetesPod(this.flinkConfig);
 
-               for (Decorator<Pod, KubernetesPod> d : 
this.taskManagerPodDecorators) {
-                       pod = d.decorate(pod);
-               }
+                               for (Decorator<Pod, KubernetesPod> d : 
this.taskManagerPodDecorators) {
+                                       pod = d.decorate(pod);
+                               }
 
-               pod = new TaskManagerPodDecorator(parameter).decorate(pod);
+                               pod = new 
TaskManagerPodDecorator(parameter).decorate(pod);
 
-               LOG.debug("Create TaskManager pod with spec: {}", 
pod.getInternalResource().getSpec());
+                               LOG.debug("Create TaskManager pod with spec: 
{}", pod.getInternalResource().getSpec());
 
-               
this.internalClient.pods().inNamespace(this.nameSpace).create(pod.getInternalResource());
+                               
this.internalClient.pods().inNamespace(this.nameSpace).create(pod.getInternalResource());
+                       },
+                       kubeClientExecutorService);
        }
 
        @Override
-       public void stopPod(String podName) {
-               this.internalClient.pods().withName(podName).delete();
+       public CompletableFuture<Void> stopPod(String podName) {
+               return CompletableFuture.runAsync(
+                       () -> 
this.internalClient.pods().withName(podName).delete(),
+                       kubeClientExecutorService);
        }
 
        @Override
-       @Nullable
-       public Endpoint getRestEndpoint(String clusterId) {
+       public Optional<Endpoint> getRestEndpoint(String clusterId) {
                int restPort = this.flinkConfig.getInteger(RestOptions.PORT);
                String serviceExposedType = 
flinkConfig.getString(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
 
                // Return the service.namespace directly when use ClusterIP.
                if 
(serviceExposedType.equals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString()))
 {
-                       return new Endpoint(clusterId + "." + nameSpace, 
restPort);
-               }
-
-               KubernetesService restService = getRestService(clusterId);
-               if (restService == null) {
-                       return null;
-               }
-               Service service = restService.getInternalResource();
-
-               String address = null;
-
-               if (service.getStatus() != null && 
(service.getStatus().getLoadBalancer() != null ||
-                       service.getStatus().getLoadBalancer().getIngress() != 
null)) {
-                       if 
(service.getStatus().getLoadBalancer().getIngress().size() > 0) {
-                               address = 
service.getStatus().getLoadBalancer().getIngress().get(0).getIp();
-                               if (address == null || address.isEmpty()) {
-                                       address = 
service.getStatus().getLoadBalancer().getIngress().get(0).getHostname();
-                               }
-                       } else {
-                               address = 
this.internalClient.getMasterUrl().getHost();
-                               restPort = getServiceNodePort(service, 
RestOptions.PORT);
-                       }
-               } else if (service.getSpec().getExternalIPs() != null && 
service.getSpec().getExternalIPs().size() > 0) {
-                       address = service.getSpec().getExternalIPs().get(0);
+                       return Optional.of(new Endpoint(clusterId + "." + 
nameSpace, restPort));
                }
-               if (address == null || address.isEmpty()) {
-                       return null;
-               }
-               return new Endpoint(address, restPort);
+               return getRestService(clusterId)
+                       .flatMap(restService -> 
getRestEndPointFromService(restService.getInternalResource(), restPort));
        }
 
        @Override
        public List<KubernetesPod> getPodsWithLabels(Map<String, String> 
labels) {
                final List<Pod> podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
 
-               if (podList == null || podList.size() < 1) {
+               if (podList == null || podList.isEmpty()) {
                        return new ArrayList<>();
                }
 
@@ -241,14 +231,12 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
        }
 
        @Override
-       @Nullable
-       public KubernetesService getInternalService(String clusterId) {
+       public Optional<KubernetesService> getInternalService(String clusterId) 
{
                return getService(clusterId);
        }
 
        @Override
-       @Nullable
-       public KubernetesService getRestService(String clusterId) {
+       public Optional<KubernetesService> getRestService(String clusterId) {
                return getService(clusterId + 
Constants.FLINK_REST_SERVICE_SUFFIX);
        }
 
@@ -288,6 +276,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
        @Override
        public void close() {
                this.internalClient.close();
+               ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.kubeClientExecutorService);
        }
 
        private CompletableFuture<KubernetesService> createService(
@@ -324,7 +313,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                        }));
        }
 
-       private KubernetesService getService(String serviceName) {
+       private Optional<KubernetesService> getService(String serviceName) {
                final Service service = this
                        .internalClient
                        .services()
@@ -335,10 +324,10 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
 
                if (service == null) {
                        LOG.debug("Service {} does not exist", serviceName);
-                       return null;
+                       return Optional.empty();
                }
 
-               return new KubernetesService(this.flinkConfig, service);
+               return Optional.of(new KubernetesService(flinkConfig, service));
        }
 
        /**
@@ -355,4 +344,43 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                }
                return port;
        }
+
+       private Optional<Endpoint> getRestEndPointFromService(Service service, 
int restPort) {
+               if (service.getStatus() == null) {
+                       return Optional.empty();
+               }
+
+               LoadBalancerStatus loadBalancer = 
service.getStatus().getLoadBalancer();
+               boolean hasExternalIP = service.getSpec() != null &&
+                       service.getSpec().getExternalIPs() != null && 
!service.getSpec().getExternalIPs().isEmpty();
+
+               if (loadBalancer != null) {
+                       return getLoadBalancerRestEndpoint(loadBalancer, 
service, restPort);
+               } else if (hasExternalIP) {
+                       final String address = 
service.getSpec().getExternalIPs().get(0);
+                       if (address != null && !address.isEmpty()) {
+                               return Optional.of(new Endpoint(address, 
restPort));
+                       }
+               }
+               return Optional.empty();
+       }
+
+       private Optional<Endpoint> 
getLoadBalancerRestEndpoint(LoadBalancerStatus loadBalancer, Service svc, int 
restPort) {
+               boolean hasIngress = loadBalancer.getIngress() != null && 
!loadBalancer.getIngress().isEmpty();
+               String address;
+               int port = restPort;
+               if (hasIngress) {
+                       address = loadBalancer.getIngress().get(0).getIp();
+                       // Use hostname when the ip address is null
+                       if (address == null || address.isEmpty()) {
+                               address = 
loadBalancer.getIngress().get(0).getHostname();
+                       }
+               } else {
+                       // Use node port
+                       address = this.internalClient.getMasterUrl().getHost();
+                       port = getServiceNodePort(svc, RestOptions.PORT);
+               }
+               boolean noAddress = address == null || address.isEmpty();
+               return noAddress ? Optional.empty() : Optional.of(new 
Endpoint(address, port));
+       }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
index 6a84a3e..357f2af 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
@@ -22,14 +22,15 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The client to talk with kubernetes.
+ * The client to talk with kubernetes. The interfaces will be called both in 
Client and ResourceManager. To avoid
+ * potentially blocking the execution of RpcEndpoint's main thread, these 
interfaces
+ * {@link #createTaskManagerPod(TaskManagerPodParameter)}, {@link 
#stopPod(String)} should be implemented asynchronously.
  */
 public interface FlinkKubeClient extends AutoCloseable {
 
@@ -66,15 +67,17 @@ public interface FlinkKubeClient extends AutoCloseable {
         * Create task manager pod.
         *
         * @param parameter {@link TaskManagerPodParameter} to create a 
taskmanager pod.
+        * @return  Return the taskmanager pod creation future
         */
-       void createTaskManagerPod(TaskManagerPodParameter parameter);
+       CompletableFuture<Void> createTaskManagerPod(TaskManagerPodParameter 
parameter);
 
        /**
         * Stop a specified pod by name.
         *
         * @param podName pod name
+        * @return  Return the pod stop future
         */
-       void stopPod(String podName);
+       CompletableFuture<Void> stopPod(String podName);
 
        /**
         * Stop cluster and clean up all resources, include services, auxiliary 
services and all running pods.
@@ -87,28 +90,25 @@ public interface FlinkKubeClient extends AutoCloseable {
         * Get the kubernetes internal service of the given flink clusterId.
         *
         * @param clusterId cluster id
-        * @return Return the internal service of the specified cluster id. 
Return null if the service does not exist.
+        * @return Return the optional internal service of the specified 
cluster id.
         */
-       @Nullable
-       KubernetesService getInternalService(String clusterId);
+       Optional<KubernetesService> getInternalService(String clusterId);
 
        /**
         * Get the kubernetes rest service of the given flink clusterId.
         *
         * @param clusterId cluster id
-        * @return Return the rest service of the specified cluster id. Return 
null if the service does not exist.
+        * @return Return the optional rest service of the specified cluster id.
         */
-       @Nullable
-       KubernetesService getRestService(String clusterId);
+       Optional<KubernetesService> getRestService(String clusterId);
 
        /**
         * Get the rest endpoint for access outside cluster.
         *
         * @param clusterId cluster id
-        * @return Return null if the service does not exist or could not 
extract the Endpoint from the service.
+        * @return Return empty if the service does not exist or could not 
extract the Endpoint from the service.
         */
-       @Nullable
-       Endpoint getRestEndpoint(String clusterId);
+       Optional<Endpoint> getRestEndpoint(String clusterId);
 
        /**
         * List the pods with specified labels.
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
index 7ada3c4..56200c8 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.kubeclient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 
 import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
@@ -30,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Factory class to create {@link FlinkKubeClient}.
@@ -58,6 +61,10 @@ public class KubeClientFactory {
 
                final KubernetesClient client = new 
DefaultKubernetesClient(config);
 
-               return new Fabric8FlinkKubeClient(flinkConfig, client);
+               return new Fabric8FlinkKubeClient(flinkConfig, client, 
KubeClientFactory::createThreadPoolForAsyncIO);
+       }
+
+       private static ExecutorService createThreadPoolForAsyncIO() {
+               return Executors.newFixedThreadPool(2, new 
ExecutorThreadFactory("FlinkKubeClient-IO"));
        }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
index 08d8f51..c171402 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
@@ -23,7 +23,10 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
@@ -32,6 +35,8 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -76,6 +81,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static junit.framework.TestCase.assertEquals;
@@ -95,6 +102,8 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
 
        private final String jobManagerHost = "jm-host1";
 
+       private static final Time TESTING_POD_CREATION_RETRY_INTERVAL = 
Time.milliseconds(50L);
+
        private Configuration flinkConfig;
 
        private TestingKubernetesResourceManager resourceManager;
@@ -108,7 +117,7 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1024m"));
 
                flinkKubeClient = getFabric8FlinkKubeClient();
-               resourceManager = createAndStartResourceManager(flinkConfig);
+               resourceManager = createAndStartResourceManager(flinkConfig, 
getFabric8FlinkKubeClient());
        }
 
        @After
@@ -134,7 +143,9 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                                JobLeaderIdService jobLeaderIdService,
                                ClusterInformation clusterInformation,
                                FatalErrorHandler fatalErrorHandler,
-                               ResourceManagerMetricGroup 
resourceManagerMetricGroup) {
+                               ResourceManagerMetricGroup 
resourceManagerMetricGroup,
+                               FlinkKubeClient flinkKubeClient,
+                               KubernetesResourceManagerConfiguration 
configuration) {
                        super(
                                rpcService,
                                resourceManagerEndpointId,
@@ -146,7 +157,9 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                                jobLeaderIdService,
                                clusterInformation,
                                fatalErrorHandler,
-                               resourceManagerMetricGroup
+                               resourceManagerMetricGroup,
+                               flinkKubeClient,
+                               configuration
                        );
                        this.slotManager = slotManager;
                }
@@ -160,11 +173,6 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                        runnable.run();
                }
 
-               @Override
-               protected FlinkKubeClient createFlinkKubeClient() {
-                       return flinkKubeClient;
-               }
-
                MainThreadExecutor getMainThreadExecutorForTesting() {
                        return super.getMainThreadExecutor();
                }
@@ -266,7 +274,8 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                        new ArrayList<>(),
                        1024,
                        1,
-                       new HashMap<>()));
+                       new HashMap<>()))
+                       .get();
                final KubernetesClient client = getKubeClient();
                assertEquals(1, client.pods().list().getItems().size());
 
@@ -315,7 +324,30 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                assertThat(resourceManager.getCpuCores(configuration), is(3.0));
        }
 
-       private TestingKubernetesResourceManager 
createAndStartResourceManager(Configuration configuration) throws Exception {
+       @Test
+       public void testCreateTaskManagerPodFailedAndRetry() throws Exception {
+               final AtomicInteger retries = new AtomicInteger(0);
+               final int numOfFailedRetries = 3;
+               final OneShotLatch podCreated = new OneShotLatch();
+               final FlinkKubeClient flinkKubeClient =
+                       
createTestingFlinkKubeClientAllocatingPodsAfter(numOfFailedRetries, retries, 
podCreated);
+               final TestingKubernetesResourceManager testRM = 
createAndStartResourceManager(flinkConfig, flinkKubeClient);
+               registerSlotRequest(testRM);
+
+               podCreated.await();
+               // Creating taskmanager should retry 4 times (3 failed and then 
succeed)
+               assertThat(
+                       "Creating taskmanager should fail " + 
numOfFailedRetries + " times and then succeed",
+                       retries.get(),
+                       is(numOfFailedRetries + 1));
+
+               testRM.close();
+               flinkKubeClient.close();
+       }
+
+       private TestingKubernetesResourceManager createAndStartResourceManager(
+                       Configuration configuration,
+                       FlinkKubeClient flinkKubeClient) throws Exception {
 
                final TestingRpcService rpcService = new 
TestingRpcService(configuration);
                final MockResourceManagerRuntimeServices rmServices = new 
MockResourceManagerRuntimeServices(rpcService, TIMEOUT);
@@ -331,14 +363,15 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                        rmServices.jobLeaderIdService,
                        new ClusterInformation("localhost", 1234),
                        testingFatalErrorHandler,
-                       
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup()
-               );
+                       
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(),
+                       flinkKubeClient,
+                       new KubernetesResourceManagerConfiguration(CLUSTER_ID, 
TESTING_POD_CREATION_RETRY_INTERVAL));
                kubernetesResourceManager.start();
                rmServices.grantLeadership();
                return kubernetesResourceManager;
        }
 
-       private void registerSlotRequest() throws Exception {
+       private void registerSlotRequest(TestingKubernetesResourceManager 
resourceManager) throws Exception {
                CompletableFuture<?> registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
                        resourceManager.getSlotManager().registerSlotRequest(
                                new SlotRequest(new JobID(), new 
AllocationID(), ResourceProfile.UNKNOWN, jobManagerHost));
@@ -347,6 +380,10 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                registerSlotRequestFuture.get();
        }
 
+       private void registerSlotRequest() throws Exception {
+               registerSlotRequest(resourceManager);
+       }
+
        private void registerTaskExecutor(ResourceID resourceID) throws 
Exception {
                final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
                        .createTestingTaskExecutorGateway();
@@ -390,4 +427,21 @@ public class KubernetesResourceManagerTest extends 
KubernetesTestBase {
                                .build())
                        .build());
        }
+
+       private FlinkKubeClient createTestingFlinkKubeClientAllocatingPodsAfter(
+                       int numberOfRetries,
+                       AtomicInteger retries,
+                       OneShotLatch podCreated) {
+               ExecutorService kubeClientExecutorService = 
Executors.newDirectExecutorService();
+               return new Fabric8FlinkKubeClient(flinkConfig, getKubeClient(), 
() -> kubeClientExecutorService) {
+                       @Override
+                       public CompletableFuture<Void> 
createTaskManagerPod(TaskManagerPodParameter parameter) {
+                               if (retries.getAndIncrement() < 
numberOfRetries) {
+                                       return 
FutureUtils.completedExceptionally(new RuntimeException("Exception"));
+                               }
+                               podCreated.trigger();
+                               return super.createTaskManagerPod(parameter);
+                       }
+               };
+       }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
index 32ce9e8..436067e 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.kubernetes.kubeclient.decorators.ServiceDecorator;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -117,7 +118,10 @@ public class KubernetesTestBase extends TestLogger {
        }
 
        protected FlinkKubeClient getFabric8FlinkKubeClient(Configuration 
flinkConfig){
-               return new Fabric8FlinkKubeClient(flinkConfig, 
server.getClient().inNamespace(NAMESPACE));
+               return new Fabric8FlinkKubeClient(
+                       flinkConfig,
+                       server.getClient().inNamespace(NAMESPACE),
+                       Executors::newDirectExecutorService);
        }
 
        protected KubernetesClient getKubeClient() {
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
index 0a477b9..1e7c2f4 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java
@@ -44,12 +44,14 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -145,9 +147,10 @@ public class Fabric8ClientTest extends KubernetesTestBase {
                
assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()),
                        Matchers.hasItems(8081));
 
-               final Endpoint endpoint = 
flinkKubeClient.getRestEndpoint(CLUSTER_ID);
-               assertEquals(MOCK_SERVICE_IP, endpoint.getAddress());
-               assertEquals(8081, endpoint.getPort());
+               final Optional<Endpoint> endpoint = 
flinkKubeClient.getRestEndpoint(CLUSTER_ID);
+               assertThat(endpoint.isPresent(), is(true));
+               assertThat(endpoint.get().getAddress(), is(MOCK_SERVICE_IP));
+               assertThat(endpoint.get().getPort(), is(8081));
        }
 
        @Test
@@ -197,7 +200,7 @@ public class Fabric8ClientTest extends KubernetesTestBase {
        }
 
        @Test
-       public void testCreateTaskManagerPod() {
+       public void testCreateTaskManagerPod() throws ExecutionException, 
InterruptedException {
                final String podName = "taskmanager-1";
                final List<String> commands = Arrays.asList("/bin/bash", "-c", 
"start-command-of-taskmanager");
                final int tmMem = 1234;
@@ -210,7 +213,7 @@ public class Fabric8ClientTest extends KubernetesTestBase {
                        tmMem,
                        tmCpu,
                        env);
-               flinkKubeClient.createTaskManagerPod(parameter);
+               flinkKubeClient.createTaskManagerPod(parameter).get();
 
                final List<Pod> pods = kubeClient.pods().list().getItems();
                assertEquals(1, pods.size());
@@ -251,28 +254,29 @@ public class Fabric8ClientTest extends KubernetesTestBase 
{
                assertEquals(FLINK_CONF_FILENAME, 
tmContainer.getVolumeMounts().get(0).getSubPath());
 
                // Stop the pod
-               flinkKubeClient.stopPod(podName);
+               flinkKubeClient.stopPod(podName).get();
                assertEquals(0, kubeClient.pods().list().getItems().size());
        }
 
        @Test
        public void testServiceLoadBalancerWithNoIP() throws Exception {
                final String hostName = "test-host-name";
-               final Endpoint endpoint = getRestEndpoint(hostName, "");
-               assertEquals(hostName, endpoint.getAddress());
-               assertEquals(8081, endpoint.getPort());
+               final Optional<Endpoint> endpoint = getRestEndpoint(hostName, 
"");
+               assertThat(endpoint.isPresent(), is(true));
+               assertThat(endpoint.get().getAddress(), is(hostName));
+               assertThat(endpoint.get().getPort(), is(8081));
        }
 
        @Test
        public void testServiceLoadBalancerEmptyHostAndIP() throws Exception {
-               final Endpoint endpoint1 = getRestEndpoint("", "");
-               assertNull(endpoint1);
+               final Optional<Endpoint> endpoint1 = getRestEndpoint("", "");
+               assertThat(endpoint1.isPresent(), is(false));
 
-               final Endpoint endpoint2 = getRestEndpoint(null, null);
-               assertNull(endpoint2);
+               final Optional<Endpoint> endpoint2 = getRestEndpoint(null, 
null);
+               assertThat(endpoint2.isPresent(), is(false));
        }
 
-       private Endpoint getRestEndpoint(String hostName, String ip) throws 
Exception {
+       private Optional<Endpoint> getRestEndpoint(String hostName, String ip) 
throws Exception {
                final String clusterId = "flink-on-k8s-cluster-test";
                mockRestServiceActionWatcher(clusterId);
                mockGetRestService(clusterId, hostName, ip);

Reply via email to