This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 025a95b627faf8ec8b725a7784d1279b41e10ba7
Author: Mrart <[email protected]>
AuthorDate: Wed Jun 7 15:30:28 2023 +0800

    [FLINK-27925][kubernetes]  adding the resourceVersion=0 when doing the list 
pods.
---
 .../kubeclient/Fabric8FlinkKubeClient.java         | 14 ++++++-
 .../apache/flink/kubernetes/utils/Constants.java   |  5 +++
 .../flink/kubernetes/KubernetesClientTestBase.java | 40 ++++++++++++++++++++
 .../kubeclient/Fabric8FlinkKubeClientTest.java     | 43 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 1 deletion(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index 6d3854230ce..df841779762 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -41,6 +41,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.IntOrString;
+import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
 import io.fabric8.kubernetes.api.model.OwnerReference;
 import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
@@ -66,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.kubernetes.utils.Constants.KUBERNETES_ZERO_RESOURCE_VERSION;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** The implementation of {@link FlinkKubeClient}. */
@@ -190,7 +192,15 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
 
     @Override
     public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
-        final List<Pod> podList = 
this.internalClient.pods().withLabels(labels).list().getItems();
+        final List<Pod> podList =
+                this.internalClient
+                        .pods()
+                        .withLabels(labels)
+                        .list(
+                                new ListOptionsBuilder()
+                                        
.withResourceVersion(KUBERNETES_ZERO_RESOURCE_VERSION)
+                                        .build())
+                        .getItems();
 
         if (podList == null || podList.isEmpty()) {
             return new ArrayList<>();
@@ -231,6 +241,8 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                                                         this.internalClient
                                                                 .pods()
                                                                 
.withLabels(labels)
+                                                                
.withResourceVersion(
+                                                                        
KUBERNETES_ZERO_RESOURCE_VERSION)
                                                                 .watch(
                                                                         new 
KubernetesPodsWatcher(
                                                                                
 podCallbackHandler))),
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index 33959606bb7..257f4d7b0cc 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -115,4 +115,9 @@ public class Constants {
     public static final String KUBERNETES_TASK_MANAGER_SCRIPT_PATH = 
"kubernetes-taskmanager.sh";
 
     public static final String ENV_TM_JVM_MEM_OPTS = "FLINK_TM_JVM_MEM_OPTS";
+
+    // "resourceVersion="0" is any resource version.It saves time to access 
etcd and improves
+    // performance.
+    // 
https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter
+    public static final String KUBERNETES_ZERO_RESOURCE_VERSION = "0";
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
index a9d926c1924..8d9a4af663b 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
@@ -33,12 +33,15 @@ import io.fabric8.kubernetes.api.model.NodeBuilder;
 import io.fabric8.kubernetes.api.model.NodeListBuilder;
 import io.fabric8.kubernetes.api.model.NodeSpecBuilder;
 import io.fabric8.kubernetes.api.model.NodeStatusBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServiceBuilder;
 import io.fabric8.kubernetes.api.model.ServicePort;
 import io.fabric8.kubernetes.api.model.ServicePortBuilder;
 import io.fabric8.kubernetes.api.model.ServiceStatus;
 import io.fabric8.kubernetes.api.model.ServiceStatusBuilder;
+import io.fabric8.kubernetes.api.model.WatchEvent;
 import io.fabric8.mockwebserver.dsl.DelayPathable;
 import io.fabric8.mockwebserver.dsl.HttpMethodable;
 import io.fabric8.mockwebserver.dsl.MockServerExpectation;
@@ -51,7 +54,9 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Base class for {@link KubernetesClusterDescriptorTest} and {@link
@@ -61,6 +66,7 @@ public class KubernetesClientTestBase extends 
KubernetesTestBase {
 
     protected static final int REST_PORT = 9021;
     protected static final int NODE_PORT = 31234;
+    protected static final Long EVENT_WAIT_PERIOD_MS = 10L;
 
     protected void mockExpectedNodesFromServerSide(List<String> addresses) {
         final List<Node> nodes = new ArrayList<>();
@@ -143,6 +149,40 @@ public class KubernetesClientTestBase extends 
KubernetesTestBase {
         server.expect().get().withPath(path).andReturn(500, "Expected 
error").always();
     }
 
+    protected void mockPodEventWithLabels(
+            String namespace, String podName, String resourceVersion, 
Map<String, String> labels) {
+        final Pod pod =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withNamespace(namespace)
+                        .withName(podName)
+                        .withLabels(labels)
+                        .withResourceVersion("5668")
+                        .endMetadata()
+                        .build();
+        // mock four kinds of events.
+        String mockPath =
+                String.format(
+                        
"/api/v1/namespaces/%s/pods?labelSelector=%s&resourceVersion=%s&allowWatchBookmarks=true&watch=true",
+                        namespace,
+                        labels.entrySet().stream()
+                                .map(entry -> entry.getKey() + "%3D" + 
entry.getValue())
+                                .collect(Collectors.joining("%2C")),
+                        resourceVersion);
+        server.expect()
+                .withPath(mockPath)
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(EVENT_WAIT_PERIOD_MS)
+                .andEmit(new WatchEvent(pod, "ADDED"))
+                .waitFor(EVENT_WAIT_PERIOD_MS)
+                .andEmit(new WatchEvent(pod, "MODIFIED"))
+                .waitFor(EVENT_WAIT_PERIOD_MS)
+                .andEmit(new WatchEvent(pod, "DELETED"))
+                .done()
+                .once();
+    }
+
     protected Service buildExternalServiceWithLoadBalancer(
             @Nullable String hostname, @Nullable String ip) {
         final ServicePort servicePort =
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
index 2e03924de55..36d182247b2 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
@@ -47,6 +47,7 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.Watcher.Action;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -55,6 +56,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -65,6 +67,7 @@ import static 
org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
+import static 
org.apache.flink.kubernetes.utils.Constants.KUBERNETES_ZERO_RESOURCE_VERSION;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
@@ -277,6 +280,25 @@ public class Fabric8FlinkKubeClientTest extends 
KubernetesClientTestBase {
         
assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull();
     }
 
+    @Test
+    void testGetPodsWithLabels() {
+        final String podName = "pod-with-labels";
+        final Pod pod =
+                new PodBuilder()
+                        .editOrNewMetadata()
+                        .withName(podName)
+                        .withLabels(TESTING_LABELS)
+                        .endMetadata()
+                        .editOrNewSpec()
+                        .endSpec()
+                        .build();
+        this.kubeClient.pods().inNamespace(NAMESPACE).create(pod);
+        List<KubernetesPod> kubernetesPods = 
this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS);
+        assertThat(kubernetesPods)
+                .satisfiesExactly(
+                        kubernetesPod -> 
assertThat(kubernetesPod.getName()).isEqualTo(podName));
+    }
+
     @Test
     void testServiceLoadBalancerWithNoIP() {
         final String hostName = "test-host-name";
@@ -411,6 +433,27 @@ public class Fabric8FlinkKubeClientTest extends 
KubernetesClientTestBase {
                 .isEmpty();
     }
 
+    @Test
+    void testWatchPodsAndDoCallback() throws Exception {
+        mockPodEventWithLabels(
+                NAMESPACE, TASKMANAGER_POD_NAME, 
KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+        // the count latch for events.
+        CompletableFuture<Action> podAddedAction = new CompletableFuture();
+        CompletableFuture<Action> podDeletedAction = new CompletableFuture();
+        CompletableFuture<Action> podModifiedAction = new CompletableFuture();
+        TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler =
+                TestingWatchCallbackHandler.<KubernetesPod>builder()
+                        .setOnAddedConsumer((ignore) -> 
podAddedAction.complete(Action.ADDED))
+                        .setOnDeletedConsumer((ignore) -> 
podDeletedAction.complete(Action.DELETED))
+                        .setOnModifiedConsumer(
+                                (ignore) -> 
podModifiedAction.complete(Action.MODIFIED))
+                        .build();
+        this.flinkKubeClient.watchPodsAndDoCallback(TESTING_LABELS, 
watchCallbackHandler);
+        assertThat(podAddedAction.get()).isEqualTo(Action.ADDED);
+        assertThat(podDeletedAction.get()).isEqualTo(Action.DELETED);
+        assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
+    }
+
     @Test
     void testCreateConfigMap() throws Exception {
         final KubernetesConfigMap configMap = buildTestingConfigMap();

Reply via email to