This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 025a95b627faf8ec8b725a7784d1279b41e10ba7 Author: Mrart <[email protected]> AuthorDate: Wed Jun 7 15:30:28 2023 +0800 [FLINK-27925][kubernetes] adding the resourceVersion=0 when doing the list pods. --- .../kubeclient/Fabric8FlinkKubeClient.java | 14 ++++++- .../apache/flink/kubernetes/utils/Constants.java | 5 +++ .../flink/kubernetes/KubernetesClientTestBase.java | 40 ++++++++++++++++++++ .../kubeclient/Fabric8FlinkKubeClientTest.java | 43 ++++++++++++++++++++++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java index 6d3854230ce..df841779762 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java @@ -41,6 +41,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.ListOptionsBuilder; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.Pod; @@ -66,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.utils.Constants.KUBERNETES_ZERO_RESOURCE_VERSION; import static org.apache.flink.util.Preconditions.checkNotNull; /** The implementation of {@link FlinkKubeClient}. */ @@ -190,7 +192,15 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { @Override public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { - final List<Pod> podList = this.internalClient.pods().withLabels(labels).list().getItems(); + final List<Pod> podList = + this.internalClient + .pods() + .withLabels(labels) + .list( + new ListOptionsBuilder() + .withResourceVersion(KUBERNETES_ZERO_RESOURCE_VERSION) + .build()) + .getItems(); if (podList == null || podList.isEmpty()) { return new ArrayList<>(); @@ -231,6 +241,8 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient { this.internalClient .pods() .withLabels(labels) + .withResourceVersion( + KUBERNETES_ZERO_RESOURCE_VERSION) .watch( new KubernetesPodsWatcher( podCallbackHandler))), diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java index 33959606bb7..257f4d7b0cc 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java @@ -115,4 +115,9 @@ public class Constants { public static final String KUBERNETES_TASK_MANAGER_SCRIPT_PATH = "kubernetes-taskmanager.sh"; public static final String ENV_TM_JVM_MEM_OPTS = "FLINK_TM_JVM_MEM_OPTS"; + + // "resourceVersion="0" is any resource version.It saves time to access etcd and improves + // performance. + // https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter + public static final String KUBERNETES_ZERO_RESOURCE_VERSION = "0"; } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java index a9d926c1924..8d9a4af663b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java @@ -33,12 +33,15 @@ import io.fabric8.kubernetes.api.model.NodeBuilder; import io.fabric8.kubernetes.api.model.NodeListBuilder; import io.fabric8.kubernetes.api.model.NodeSpecBuilder; import io.fabric8.kubernetes.api.model.NodeStatusBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.ServiceBuilder; import io.fabric8.kubernetes.api.model.ServicePort; import io.fabric8.kubernetes.api.model.ServicePortBuilder; import io.fabric8.kubernetes.api.model.ServiceStatus; import io.fabric8.kubernetes.api.model.ServiceStatusBuilder; +import io.fabric8.kubernetes.api.model.WatchEvent; import io.fabric8.mockwebserver.dsl.DelayPathable; import io.fabric8.mockwebserver.dsl.HttpMethodable; import io.fabric8.mockwebserver.dsl.MockServerExpectation; @@ -51,7 +54,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; /** * Base class for {@link KubernetesClusterDescriptorTest} and {@link @@ -61,6 +66,7 @@ public class KubernetesClientTestBase extends KubernetesTestBase { protected static final int REST_PORT = 9021; protected static final int NODE_PORT = 31234; + protected static final Long EVENT_WAIT_PERIOD_MS = 10L; protected void mockExpectedNodesFromServerSide(List<String> addresses) { final List<Node> nodes = new ArrayList<>(); @@ -143,6 +149,40 @@ public class KubernetesClientTestBase extends KubernetesTestBase { server.expect().get().withPath(path).andReturn(500, "Expected error").always(); } + protected void mockPodEventWithLabels( + String namespace, String podName, String resourceVersion, Map<String, String> labels) { + final Pod pod = + new PodBuilder() + .withNewMetadata() + .withNamespace(namespace) + .withName(podName) + .withLabels(labels) + .withResourceVersion("5668") + .endMetadata() + .build(); + // mock four kinds of events. + String mockPath = + String.format( + "/api/v1/namespaces/%s/pods?labelSelector=%s&resourceVersion=%s&allowWatchBookmarks=true&watch=true", + namespace, + labels.entrySet().stream() + .map(entry -> entry.getKey() + "%3D" + entry.getValue()) + .collect(Collectors.joining("%2C")), + resourceVersion); + server.expect() + .withPath(mockPath) + .andUpgradeToWebSocket() + .open() + .waitFor(EVENT_WAIT_PERIOD_MS) + .andEmit(new WatchEvent(pod, "ADDED")) + .waitFor(EVENT_WAIT_PERIOD_MS) + .andEmit(new WatchEvent(pod, "MODIFIED")) + .waitFor(EVENT_WAIT_PERIOD_MS) + .andEmit(new WatchEvent(pod, "DELETED")) + .done() + .once(); + } + protected Service buildExternalServiceWithLoadBalancer( @Nullable String hostname, @Nullable String ip) { final ServicePort servicePort = diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java index 2e03924de55..36d182247b2 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java @@ -47,6 +47,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.Watcher.Action; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -55,6 +56,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -65,6 +67,7 @@ import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME; import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME; +import static org.apache.flink.kubernetes.utils.Constants.KUBERNETES_ZERO_RESOURCE_VERSION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; @@ -277,6 +280,25 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase { assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull(); } + @Test + void testGetPodsWithLabels() { + final String podName = "pod-with-labels"; + final Pod pod = + new PodBuilder() + .editOrNewMetadata() + .withName(podName) + .withLabels(TESTING_LABELS) + .endMetadata() + .editOrNewSpec() + .endSpec() + .build(); + this.kubeClient.pods().inNamespace(NAMESPACE).create(pod); + List<KubernetesPod> kubernetesPods = this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS); + assertThat(kubernetesPods) + .satisfiesExactly( + kubernetesPod -> assertThat(kubernetesPod.getName()).isEqualTo(podName)); + } + @Test void testServiceLoadBalancerWithNoIP() { final String hostName = "test-host-name"; @@ -411,6 +433,27 @@ public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase { .isEmpty(); } + @Test + void testWatchPodsAndDoCallback() throws Exception { + mockPodEventWithLabels( + NAMESPACE, TASKMANAGER_POD_NAME, KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS); + // the count latch for events. + CompletableFuture<Action> podAddedAction = new CompletableFuture(); + CompletableFuture<Action> podDeletedAction = new CompletableFuture(); + CompletableFuture<Action> podModifiedAction = new CompletableFuture(); + TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler = + TestingWatchCallbackHandler.<KubernetesPod>builder() + .setOnAddedConsumer((ignore) -> podAddedAction.complete(Action.ADDED)) + .setOnDeletedConsumer((ignore) -> podDeletedAction.complete(Action.DELETED)) + .setOnModifiedConsumer( + (ignore) -> podModifiedAction.complete(Action.MODIFIED)) + .build(); + this.flinkKubeClient.watchPodsAndDoCallback(TESTING_LABELS, watchCallbackHandler); + assertThat(podAddedAction.get()).isEqualTo(Action.ADDED); + assertThat(podDeletedAction.get()).isEqualTo(Action.DELETED); + assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED); + } + @Test void testCreateConfigMap() throws Exception { final KubernetesConfigMap configMap = buildTestingConfigMap();
