This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 035453f [FLINK-15790][k8s] Make some interfaces in FlinkKubeClient asynchronous which potentially blocks the execution of RpcEndpoint's main thread 035453f is described below commit 035453fb1472b917972daafcd552b7648b34eb31 Author: wangyang0918 <danrtsey...@alibaba-inc.com> AuthorDate: Tue Apr 7 16:23:38 2020 +0800 [FLINK-15790][k8s] Make some interfaces in FlinkKubeClient asynchronous which potentially blocks the execution of RpcEndpoint's main thread The interfaces in FlinkKubeClient will be called both in Client and ResourceManager. To avoid potentially blocking the execution of RpcEndpoint's main thread, these interfaces #createTaskManagerPod, #stopPod should be implemented asynchronously. This closes #11733. --- .../kubernetes/KubernetesClusterDescriptor.java | 10 +- .../kubernetes/KubernetesResourceManager.java | 55 ++++++--- .../flink/kubernetes/cli/KubernetesSessionCli.java | 2 +- .../KubernetesResourceManagerConfiguration.java | 42 +++++++ .../KubernetesResourceManagerFactory.java | 14 ++- .../kubeclient/Fabric8FlinkKubeClient.java | 130 +++++++++++++-------- .../kubernetes/kubeclient/FlinkKubeClient.java | 28 ++--- .../kubernetes/kubeclient/KubeClientFactory.java | 9 +- .../kubernetes/KubernetesResourceManagerTest.java | 80 ++++++++++--- .../flink/kubernetes/KubernetesTestBase.java | 6 +- .../kubernetes/kubeclient/Fabric8ClientTest.java | 34 +++--- 11 files changed, 293 insertions(+), 117 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java index 0888bdb..8976d3e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -49,6 +49,8 @@ import org.apache.flink.util.FlinkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -83,11 +85,11 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> { return () -> { final Configuration configuration = new Configuration(flinkConfig); - final Endpoint restEndpoint = client.getRestEndpoint(clusterId); + final Optional<Endpoint> restEndpoint = client.getRestEndpoint(clusterId); - if (restEndpoint != null) { - configuration.setString(RestOptions.ADDRESS, restEndpoint.getAddress()); - configuration.setInteger(RestOptions.PORT, restEndpoint.getPort()); + if (restEndpoint.isPresent()) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort()); } else { throw new RuntimeException( new ClusterRetrieveException( diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java index fdf3afc..04ac00b 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java @@ -22,8 +22,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; -import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner; @@ -91,6 +91,8 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW private final List<String> taskManagerStartCommand; + private final KubernetesResourceManagerConfiguration configuration; + /** The number of pods requested, but not yet granted. */ private int numPendingPodRequests = 0; @@ -105,7 +107,9 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, - ResourceManagerMetricGroup resourceManagerMetricGroup) { + ResourceManagerMetricGroup resourceManagerMetricGroup, + FlinkKubeClient kubeClient, + KubernetesResourceManagerConfiguration configuration) { super( flinkConfig, System.getenv(), @@ -119,15 +123,16 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW clusterInformation, fatalErrorHandler, resourceManagerMetricGroup); - this.clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID); + this.clusterId = configuration.getClusterId(); this.defaultCpus = taskExecutorProcessSpec.getCpuCores().getValue().doubleValue(); - this.kubeClient = createFlinkKubeClient(); + this.kubeClient = kubeClient; this.taskManagerParameters = ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec, numSlotsPerTaskManager); this.taskManagerStartCommand = getTaskManagerStartCommand(); + this.configuration = configuration; } @Override @@ -184,12 +189,7 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW public boolean stopWorker(final KubernetesWorkerNode worker) { LOG.info("Stopping Worker {}.", worker.getResourceID()); workerNodes.remove(worker.getResourceID()); - try { - kubeClient.stopPod(worker.getResourceID().toString()); - } catch (Exception e) { - kubeClient.handleException(e); - return false; - } + internalStopPod(worker.getResourceID().toString()); return true; } @@ -271,7 +271,23 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW env); log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(parameter); + kubeClient.createTaskManagerPod(parameter) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + log.error("Could not start TaskManager in pod {}.", podName, throwable); + scheduleRunAsync( + this::decreasePendingAndRequestKubernetesPodIfRequired, + configuration.getPodCreationRetryInterval()); + } + } + ); + } + + private void decreasePendingAndRequestKubernetesPodIfRequired() { + validateRunsInMainThread(); + numPendingPodRequests--; + requestKubernetesPodIfRequired(); } /** @@ -288,7 +304,7 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { - kubeClient.stopPod(pod.getName()); + internalStopPod(pod.getName()); final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(new ResourceID(pod.getName())); if (kubernetesWorkerNode != null) { requestKubernetesPodIfRequired(); @@ -333,12 +349,19 @@ public class KubernetesResourceManager extends ActiveResourceManager<KubernetesW return labels; } - protected FlinkKubeClient createFlinkKubeClient() { - return KubeClientFactory.fromConfiguration(flinkConfig); - } - @Override protected double getCpuCores(Configuration configuration) { return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU); } + + private void internalStopPod(String podName) { + kubeClient.stopPod(podName) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + log.error("Could not stop TaskManager in pod {}.", podName, throwable); + } + } + ); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java index 568ff2e..746e207 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java @@ -101,7 +101,7 @@ public class KubernetesSessionCli { final FlinkKubeClient kubeClient = KubeClientFactory.fromConfiguration(configuration); // Retrieve or create a session cluster. - if (clusterId != null && kubeClient.getInternalService(clusterId) != null) { + if (clusterId != null && kubeClient.getInternalService(clusterId).isPresent()) { clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient(); } else { clusterClient = kubernetesClusterDescriptor diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesResourceManagerConfiguration.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesResourceManagerConfiguration.java new file mode 100644 index 0000000..c79b1f8 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesResourceManagerConfiguration.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.configuration; + +import org.apache.flink.api.common.time.Time; + +/** + * Configuration specific to {@link org.apache.flink.kubernetes.KubernetesResourceManager}. + */ +public class KubernetesResourceManagerConfiguration { + private final String clusterId; + private final Time podCreationRetryInterval; + + public KubernetesResourceManagerConfiguration(String clusterId, Time podCreationRetryInterval) { + this.clusterId = clusterId; + this.podCreationRetryInterval = podCreationRetryInterval; + } + + public String getClusterId() { + return clusterId; + } + + public Time getPodCreationRetryInterval() { + return podCreationRetryInterval; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java index 7d01db5..16dbd43 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java @@ -18,9 +18,13 @@ package org.apache.flink.kubernetes.entrypoint; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.KubernetesResourceManager; import org.apache.flink.kubernetes.KubernetesWorkerNode; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; +import org.apache.flink.kubernetes.kubeclient.KubeClientFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -43,6 +47,8 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto private static final KubernetesResourceManagerFactory INSTANCE = new KubernetesResourceManagerFactory(); + private static final Time POD_CREATION_RETRY_INTERVAL = Time.seconds(3L); + private KubernetesResourceManagerFactory() {} public static KubernetesResourceManagerFactory getInstance() { @@ -66,6 +72,10 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto rmServicesConfiguration, highAvailabilityServices, rpcService.getScheduledExecutor()); + final KubernetesResourceManagerConfiguration kubernetesResourceManagerConfiguration = + new KubernetesResourceManagerConfiguration( + configuration.getString(KubernetesConfigOptions.CLUSTER_ID), + POD_CREATION_RETRY_INTERVAL); return new KubernetesResourceManager( rpcService, @@ -78,6 +88,8 @@ public class KubernetesResourceManagerFactory extends ActiveResourceManagerFacto rmRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, - resourceManagerMetricGroup); + resourceManagerMetricGroup, + KubeClientFactory.fromConfiguration(configuration), + kubernetesResourceManagerConfiguration); } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 5d8d5e1..7051b95 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -36,10 +36,12 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesDeployment; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TimeUtils; import org.apache.flink.util.function.FunctionUtils; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.LoadBalancerStatus; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.ServicePort; @@ -51,15 +53,16 @@ import io.fabric8.kubernetes.client.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -82,13 +85,20 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { private final List<Decorator<Deployment, KubernetesDeployment>> flinkMasterDeploymentDecorators = new ArrayList<>(); private final List<Decorator<Pod, KubernetesPod>> taskManagerPodDecorators = new ArrayList<>(); - public Fabric8FlinkKubeClient(Configuration flinkConfig, KubernetesClient client) { + private final ExecutorService kubeClientExecutorService; + + public Fabric8FlinkKubeClient( + Configuration flinkConfig, + KubernetesClient client, + Supplier<ExecutorService> asyncExecutorFactory) { this.flinkConfig = checkNotNull(flinkConfig); this.internalClient = checkNotNull(client); this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID)); this.nameSpace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE); + this.kubeClientExecutorService = asyncExecutorFactory.get(); + initialize(); } @@ -158,69 +168,49 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { } @Override - public void createTaskManagerPod(TaskManagerPodParameter parameter) { - KubernetesPod pod = new KubernetesPod(this.flinkConfig); + public CompletableFuture<Void> createTaskManagerPod(TaskManagerPodParameter parameter) { + return CompletableFuture.runAsync( + () -> { + KubernetesPod pod = new KubernetesPod(this.flinkConfig); - for (Decorator<Pod, KubernetesPod> d : this.taskManagerPodDecorators) { - pod = d.decorate(pod); - } + for (Decorator<Pod, KubernetesPod> d : this.taskManagerPodDecorators) { + pod = d.decorate(pod); + } - pod = new TaskManagerPodDecorator(parameter).decorate(pod); + pod = new TaskManagerPodDecorator(parameter).decorate(pod); - LOG.debug("Create TaskManager pod with spec: {}", pod.getInternalResource().getSpec()); + LOG.debug("Create TaskManager pod with spec: {}", pod.getInternalResource().getSpec()); - this.internalClient.pods().inNamespace(this.nameSpace).create(pod.getInternalResource()); + this.internalClient.pods().inNamespace(this.nameSpace).create(pod.getInternalResource()); + }, + kubeClientExecutorService); } @Override - public void stopPod(String podName) { - this.internalClient.pods().withName(podName).delete(); + public CompletableFuture<Void> stopPod(String podName) { + return CompletableFuture.runAsync( + () -> this.internalClient.pods().withName(podName).delete(), + kubeClientExecutorService); } @Override - @Nullable - public Endpoint getRestEndpoint(String clusterId) { + public Optional<Endpoint> getRestEndpoint(String clusterId) { int restPort = this.flinkConfig.getInteger(RestOptions.PORT); String serviceExposedType = flinkConfig.getString(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); // Return the service.namespace directly when use ClusterIP. if (serviceExposedType.equals(KubernetesConfigOptions.ServiceExposedType.ClusterIP.toString())) { - return new Endpoint(clusterId + "." + nameSpace, restPort); - } - - KubernetesService restService = getRestService(clusterId); - if (restService == null) { - return null; - } - Service service = restService.getInternalResource(); - - String address = null; - - if (service.getStatus() != null && (service.getStatus().getLoadBalancer() != null || - service.getStatus().getLoadBalancer().getIngress() != null)) { - if (service.getStatus().getLoadBalancer().getIngress().size() > 0) { - address = service.getStatus().getLoadBalancer().getIngress().get(0).getIp(); - if (address == null || address.isEmpty()) { - address = service.getStatus().getLoadBalancer().getIngress().get(0).getHostname(); - } - } else { - address = this.internalClient.getMasterUrl().getHost(); - restPort = getServiceNodePort(service, RestOptions.PORT); - } - } else if (service.getSpec().getExternalIPs() != null && service.getSpec().getExternalIPs().size() > 0) { - address = service.getSpec().getExternalIPs().get(0); + return Optional.of(new Endpoint(clusterId + "." + nameSpace, restPort)); } - if (address == null || address.isEmpty()) { - return null; - } - return new Endpoint(address, restPort); + return getRestService(clusterId) + .flatMap(restService -> getRestEndPointFromService(restService.getInternalResource(), restPort)); } @Override public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems(); - if (podList == null || podList.size() < 1) { + if (podList == null || podList.isEmpty()) { return new ArrayList<>(); } @@ -241,14 +231,12 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { } @Override - @Nullable - public KubernetesService getInternalService(String clusterId) { + public Optional<KubernetesService> getInternalService(String clusterId) { return getService(clusterId); } @Override - @Nullable - public KubernetesService getRestService(String clusterId) { + public Optional<KubernetesService> getRestService(String clusterId) { return getService(clusterId + Constants.FLINK_REST_SERVICE_SUFFIX); } @@ -288,6 +276,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { @Override public void close() { this.internalClient.close(); + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.kubeClientExecutorService); } private CompletableFuture<KubernetesService> createService( @@ -324,7 +313,7 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { })); } - private KubernetesService getService(String serviceName) { + private Optional<KubernetesService> getService(String serviceName) { final Service service = this .internalClient .services() @@ -335,10 +324,10 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { if (service == null) { LOG.debug("Service {} does not exist", serviceName); - return null; + return Optional.empty(); } - return new KubernetesService(this.flinkConfig, service); + return Optional.of(new KubernetesService(flinkConfig, service)); } /** @@ -355,4 +344,43 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { } return port; } + + private Optional<Endpoint> getRestEndPointFromService(Service service, int restPort) { + if (service.getStatus() == null) { + return Optional.empty(); + } + + LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer(); + boolean hasExternalIP = service.getSpec() != null && + service.getSpec().getExternalIPs() != null && !service.getSpec().getExternalIPs().isEmpty(); + + if (loadBalancer != null) { + return getLoadBalancerRestEndpoint(loadBalancer, service, restPort); + } else if (hasExternalIP) { + final String address = service.getSpec().getExternalIPs().get(0); + if (address != null && !address.isEmpty()) { + return Optional.of(new Endpoint(address, restPort)); + } + } + return Optional.empty(); + } + + private Optional<Endpoint> getLoadBalancerRestEndpoint(LoadBalancerStatus loadBalancer, Service svc, int restPort) { + boolean hasIngress = loadBalancer.getIngress() != null && !loadBalancer.getIngress().isEmpty(); + String address; + int port = restPort; + if (hasIngress) { + address = loadBalancer.getIngress().get(0).getIp(); + // Use hostname when the ip address is null + if (address == null || address.isEmpty()) { + address = loadBalancer.getIngress().get(0).getHostname(); + } + } else { + // Use node port + address = this.internalClient.getMasterUrl().getHost(); + port = getServiceNodePort(svc, RestOptions.PORT); + } + boolean noAddress = address == null || address.isEmpty(); + return noAddress ? Optional.empty() : Optional.of(new Endpoint(address, port)); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java index 6a84a3e..357f2af 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java @@ -22,14 +22,15 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; -import javax.annotation.Nullable; - import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; /** - * The client to talk with kubernetes. + * The client to talk with kubernetes. The interfaces will be called both in Client and ResourceManager. To avoid + * potentially blocking the execution of RpcEndpoint's main thread, these interfaces + * {@link #createTaskManagerPod(TaskManagerPodParameter)}, {@link #stopPod(String)} should be implemented asynchronously. */ public interface FlinkKubeClient extends AutoCloseable { @@ -66,15 +67,17 @@ public interface FlinkKubeClient extends AutoCloseable { * Create task manager pod. * * @param parameter {@link TaskManagerPodParameter} to create a taskmanager pod. + * @return Return the taskmanager pod creation future */ - void createTaskManagerPod(TaskManagerPodParameter parameter); + CompletableFuture<Void> createTaskManagerPod(TaskManagerPodParameter parameter); /** * Stop a specified pod by name. * * @param podName pod name + * @return Return the pod stop future */ - void stopPod(String podName); + CompletableFuture<Void> stopPod(String podName); /** * Stop cluster and clean up all resources, include services, auxiliary services and all running pods. @@ -87,28 +90,25 @@ public interface FlinkKubeClient extends AutoCloseable { * Get the kubernetes internal service of the given flink clusterId. * * @param clusterId cluster id - * @return Return the internal service of the specified cluster id. Return null if the service does not exist. + * @return Return the optional internal service of the specified cluster id. */ - @Nullable - KubernetesService getInternalService(String clusterId); + Optional<KubernetesService> getInternalService(String clusterId); /** * Get the kubernetes rest service of the given flink clusterId. * * @param clusterId cluster id - * @return Return the rest service of the specified cluster id. Return null if the service does not exist. + * @return Return the optional rest service of the specified cluster id. */ - @Nullable - KubernetesService getRestService(String clusterId); + Optional<KubernetesService> getRestService(String clusterId); /** * Get the rest endpoint for access outside cluster. * * @param clusterId cluster id - * @return Return null if the service does not exist or could not extract the Endpoint from the service. + * @return Return empty if the service does not exist or could not extract the Endpoint from the service. */ - @Nullable - Endpoint getRestEndpoint(String clusterId); + Optional<Endpoint> getRestEndpoint(String clusterId); /** * List the pods with specified labels. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java index 7ada3c4..56200c8 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.kubeclient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.DefaultKubernetesClient; @@ -30,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Factory class to create {@link FlinkKubeClient}. @@ -58,6 +61,10 @@ public class KubeClientFactory { final KubernetesClient client = new DefaultKubernetesClient(config); - return new Fabric8FlinkKubeClient(flinkConfig, client); + return new Fabric8FlinkKubeClient(flinkConfig, client, KubeClientFactory::createThreadPoolForAsyncIO); + } + + private static ExecutorService createThreadPoolForAsyncIO() { + return Executors.newFixedThreadPool(2, new ExecutorThreadFactory("FlinkKubeClient-IO")); } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java index 08d8f51..c171402 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java @@ -23,7 +23,10 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration; +import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; @@ -32,6 +35,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -76,6 +81,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static junit.framework.TestCase.assertEquals; @@ -95,6 +102,8 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { private final String jobManagerHost = "jm-host1"; + private static final Time TESTING_POD_CREATION_RETRY_INTERVAL = Time.milliseconds(50L); + private Configuration flinkConfig; private TestingKubernetesResourceManager resourceManager; @@ -108,7 +117,7 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m")); flinkKubeClient = getFabric8FlinkKubeClient(); - resourceManager = createAndStartResourceManager(flinkConfig); + resourceManager = createAndStartResourceManager(flinkConfig, getFabric8FlinkKubeClient()); } @After @@ -134,7 +143,9 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, - ResourceManagerMetricGroup resourceManagerMetricGroup) { + ResourceManagerMetricGroup resourceManagerMetricGroup, + FlinkKubeClient flinkKubeClient, + KubernetesResourceManagerConfiguration configuration) { super( rpcService, resourceManagerEndpointId, @@ -146,7 +157,9 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { jobLeaderIdService, clusterInformation, fatalErrorHandler, - resourceManagerMetricGroup + resourceManagerMetricGroup, + flinkKubeClient, + configuration ); this.slotManager = slotManager; } @@ -160,11 +173,6 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { runnable.run(); } - @Override - protected FlinkKubeClient createFlinkKubeClient() { - return flinkKubeClient; - } - MainThreadExecutor getMainThreadExecutorForTesting() { return super.getMainThreadExecutor(); } @@ -266,7 +274,8 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { new ArrayList<>(), 1024, 1, - new HashMap<>())); + new HashMap<>())) + .get(); final KubernetesClient client = getKubeClient(); assertEquals(1, client.pods().list().getItems().size()); @@ -315,7 +324,30 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { assertThat(resourceManager.getCpuCores(configuration), is(3.0)); } - private TestingKubernetesResourceManager createAndStartResourceManager(Configuration configuration) throws Exception { + @Test + public void testCreateTaskManagerPodFailedAndRetry() throws Exception { + final AtomicInteger retries = new AtomicInteger(0); + final int numOfFailedRetries = 3; + final OneShotLatch podCreated = new OneShotLatch(); + final FlinkKubeClient flinkKubeClient = + createTestingFlinkKubeClientAllocatingPodsAfter(numOfFailedRetries, retries, podCreated); + final TestingKubernetesResourceManager testRM = createAndStartResourceManager(flinkConfig, flinkKubeClient); + registerSlotRequest(testRM); + + podCreated.await(); + // Creating taskmanager should retry 4 times (3 failed and then succeed) + assertThat( + "Creating taskmanager should fail " + numOfFailedRetries + " times and then succeed", + retries.get(), + is(numOfFailedRetries + 1)); + + testRM.close(); + flinkKubeClient.close(); + } + + private TestingKubernetesResourceManager createAndStartResourceManager( + Configuration configuration, + FlinkKubeClient flinkKubeClient) throws Exception { final TestingRpcService rpcService = new TestingRpcService(configuration); final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(rpcService, TIMEOUT); @@ -331,14 +363,15 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { rmServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), testingFatalErrorHandler, - UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup() - ); + UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), + flinkKubeClient, + new KubernetesResourceManagerConfiguration(CLUSTER_ID, TESTING_POD_CREATION_RETRY_INTERVAL)); kubernetesResourceManager.start(); rmServices.grantLeadership(); return kubernetesResourceManager; } - private void registerSlotRequest() throws Exception { + private void registerSlotRequest(TestingKubernetesResourceManager resourceManager) throws Exception { CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> { resourceManager.getSlotManager().registerSlotRequest( new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, jobManagerHost)); @@ -347,6 +380,10 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { registerSlotRequestFuture.get(); } + private void registerSlotRequest() throws Exception { + registerSlotRequest(resourceManager); + } + private void registerTaskExecutor(ResourceID resourceID) throws Exception { final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() .createTestingTaskExecutorGateway(); @@ -390,4 +427,21 @@ public class KubernetesResourceManagerTest extends KubernetesTestBase { .build()) .build()); } + + private FlinkKubeClient createTestingFlinkKubeClientAllocatingPodsAfter( + int numberOfRetries, + AtomicInteger retries, + OneShotLatch podCreated) { + ExecutorService kubeClientExecutorService = Executors.newDirectExecutorService(); + return new Fabric8FlinkKubeClient(flinkConfig, getKubeClient(), () -> kubeClientExecutorService) { + @Override + public CompletableFuture<Void> createTaskManagerPod(TaskManagerPodParameter parameter) { + if (retries.getAndIncrement() < numberOfRetries) { + return FutureUtils.completedExceptionally(new RuntimeException("Exception")); + } + podCreated.trigger(); + return super.createTaskManagerPod(parameter); + } + }; + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java index 32ce9e8..436067e 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java @@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.ServiceDecorator; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -117,7 +118,10 @@ public class KubernetesTestBase extends TestLogger { } protected FlinkKubeClient getFabric8FlinkKubeClient(Configuration flinkConfig){ - return new Fabric8FlinkKubeClient(flinkConfig, server.getClient().inNamespace(NAMESPACE)); + return new Fabric8FlinkKubeClient( + flinkConfig, + server.getClient().inNamespace(NAMESPACE), + Executors::newDirectExecutorService); } protected KubernetesClient getKubeClient() { diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java index 0a477b9..1e7c2f4 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8ClientTest.java @@ -44,12 +44,14 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -145,9 +147,10 @@ public class Fabric8ClientTest extends KubernetesTestBase { assertThat(service.getSpec().getPorts().stream().map(ServicePort::getPort).collect(Collectors.toList()), Matchers.hasItems(8081)); - final Endpoint endpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID); - assertEquals(MOCK_SERVICE_IP, endpoint.getAddress()); - assertEquals(8081, endpoint.getPort()); + final Optional<Endpoint> endpoint = flinkKubeClient.getRestEndpoint(CLUSTER_ID); + assertThat(endpoint.isPresent(), is(true)); + assertThat(endpoint.get().getAddress(), is(MOCK_SERVICE_IP)); + assertThat(endpoint.get().getPort(), is(8081)); } @Test @@ -197,7 +200,7 @@ public class Fabric8ClientTest extends KubernetesTestBase { } @Test - public void testCreateTaskManagerPod() { + public void testCreateTaskManagerPod() throws ExecutionException, InterruptedException { final String podName = "taskmanager-1"; final List<String> commands = Arrays.asList("/bin/bash", "-c", "start-command-of-taskmanager"); final int tmMem = 1234; @@ -210,7 +213,7 @@ public class Fabric8ClientTest extends KubernetesTestBase { tmMem, tmCpu, env); - flinkKubeClient.createTaskManagerPod(parameter); + flinkKubeClient.createTaskManagerPod(parameter).get(); final List<Pod> pods = kubeClient.pods().list().getItems(); assertEquals(1, pods.size()); @@ -251,28 +254,29 @@ public class Fabric8ClientTest extends KubernetesTestBase { assertEquals(FLINK_CONF_FILENAME, tmContainer.getVolumeMounts().get(0).getSubPath()); // Stop the pod - flinkKubeClient.stopPod(podName); + flinkKubeClient.stopPod(podName).get(); assertEquals(0, kubeClient.pods().list().getItems().size()); } @Test public void testServiceLoadBalancerWithNoIP() throws Exception { final String hostName = "test-host-name"; - final Endpoint endpoint = getRestEndpoint(hostName, ""); - assertEquals(hostName, endpoint.getAddress()); - assertEquals(8081, endpoint.getPort()); + final Optional<Endpoint> endpoint = getRestEndpoint(hostName, ""); + assertThat(endpoint.isPresent(), is(true)); + assertThat(endpoint.get().getAddress(), is(hostName)); + assertThat(endpoint.get().getPort(), is(8081)); } @Test public void testServiceLoadBalancerEmptyHostAndIP() throws Exception { - final Endpoint endpoint1 = getRestEndpoint("", ""); - assertNull(endpoint1); + final Optional<Endpoint> endpoint1 = getRestEndpoint("", ""); + assertThat(endpoint1.isPresent(), is(false)); - final Endpoint endpoint2 = getRestEndpoint(null, null); - assertNull(endpoint2); + final Optional<Endpoint> endpoint2 = getRestEndpoint(null, null); + assertThat(endpoint2.isPresent(), is(false)); } - private Endpoint getRestEndpoint(String hostName, String ip) throws Exception { + private Optional<Endpoint> getRestEndpoint(String hostName, String ip) throws Exception { final String clusterId = "flink-on-k8s-cluster-test"; mockRestServiceActionWatcher(clusterId); mockGetRestService(clusterId, hostName, ip);