huwh commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160572180
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String
clusterId) {
@Override
public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
- final List<Pod> podList =
this.internalClient.pods().withLabels(labels).list().getItems();
+ final List<Pod> podList =
+ this.internalClient
+ .pods()
+ .withLabels(labels)
+ .list(new
ListOptionsBuilder().withResourceVersion("0").build())
Review Comment:
This "0" could be extracted as a static final variable
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+ protected void mockPodEventWithLabels(Map<String, String> labels) {
+ final Pod pod1 =
Review Comment:
maybe "pod" is enough, since there is only one pod here
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+ protected void mockPodEventWithLabels(Map<String, String> labels) {
Review Comment:
It's better to move the namespace, name, resource version to function
arguments.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -277,6 +281,23 @@ void testStopPod() throws ExecutionException,
InterruptedException {
assertThat(this.kubeClient.pods().inNamespace(NAMESPACE).withName(podName).get()).isNull();
}
+ @Test
+ void testGetPodsWithLabels() {
+ final String podName = "pod-with-labels";
+ final Pod pod =
+ new PodBuilder()
+ .editOrNewMetadata()
+ .withName(podName)
+ .withLabels(TESTING_LABELS)
+ .endMetadata()
+ .editOrNewSpec()
+ .endSpec()
+ .build();
+ this.kubeClient.pods().inNamespace(NAMESPACE).create(pod);
+ List<KubernetesPod> kubernetesPods =
this.flinkKubeClient.getPodsWithLabels(TESTING_LABELS);
+ assertThat(kubernetesPods.size()).isEqualTo(1);
Review Comment:
Should check the list pod is excepted one
assertThat(kubernetesPods)
.satisfiesExactly(
kubernetesPod ->
assertThat(kubernetesPod.getName()).isEqualTo(podName));
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
.isEmpty();
}
+ @Test
+ void testWatchPodsAndDoCallback() throws Exception {
+ mockPodEventWithLabels(TESTING_LABELS);
+ // the count latch for events.
+ final CountDownLatch eventLatch = new CountDownLatch(4);
+ this.flinkKubeClient.watchPodsAndDoCallback(
Review Comment:
It's better to check each event is received.
Maybe you can use three CountDownLatch and a anonymous classes here
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -630,4 +661,39 @@ private KubernetesConfigMap buildTestingConfigMap() {
.withData(data)
.build());
}
+
+ private class TestingKubernetesPodCallbackHandler
+ implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
+
+ private final CountDownLatch eventLatch;
+
+ public TestingKubernetesPodCallbackHandler(CountDownLatch eventLatch) {
+ this.eventLatch = eventLatch;
+ }
+
+ @Override
+ public void onAdded(List<KubernetesPod> resources) {
+ this.eventLatch.countDown();
+ }
+
+ @Override
+ public void onModified(List<KubernetesPod> resources) {
+ this.eventLatch.countDown();
+ }
+
+ @Override
+ public void onDeleted(List<KubernetesPod> resources) {
+ this.eventLatch.countDown();
+ }
+
+ @Override
+ public void onError(List<KubernetesPod> resources) {
Review Comment:
ERROR event will not trigger Watcher#eventReceived, so this unit test will
failed.
Maybe we can just skip this in unit test.
ref:
https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java#L325
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+ protected void mockPodEventWithLabels(Map<String, String> labels) {
+ final Pod pod1 =
+ new PodBuilder()
+ .withNewMetadata()
+ .withNamespace("test")
+ .withName("tm_pod1")
+ .withLabels(labels)
+ .withResourceVersion("5668")
+ .endMetadata()
+ .build();
+ // mock four kinds of events.
+ server.expect()
+ .withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
Review Comment:
This path can be formatted using String.format.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+ protected void mockPodEventWithLabels(Map<String, String> labels) {
+ final Pod pod1 =
+ new PodBuilder()
+ .withNewMetadata()
+ .withNamespace("test")
+ .withName("tm_pod1")
+ .withLabels(labels)
+ .withResourceVersion("5668")
+ .endMetadata()
+ .build();
+ // mock four kinds of events.
+ server.expect()
+ .withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+ .andUpgradeToWebSocket()
+ .open()
+ .waitFor(1000)
Review Comment:
1000 is a bit long
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
.isEmpty();
}
+ @Test
+ void testWatchPodsAndDoCallback() throws Exception {
+ mockPodEventWithLabels(TESTING_LABELS);
+ // the count latch for events.
+ final CountDownLatch eventLatch = new CountDownLatch(4);
+ this.flinkKubeClient.watchPodsAndDoCallback(
+ TESTING_LABELS, new
TestingKubernetesPodCallbackHandler(eventLatch));
+ assertTrue(eventLatch.await(10, TimeUnit.SECONDS));
Review Comment:
We need use junit5 and Assertj in unit tests.
assertThat(eventLatch.await(1000, TimeUnit.MILLISECONDS)).isTrue();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]