reswqa commented on code in PR #21527:
URL: https://github.com/apache/flink/pull/21527#discussion_r1160799424
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String
clusterId) {
@Override
public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
- final List<Pod> podList =
this.internalClient.pods().withLabels(labels).list().getItems();
+ final List<Pod> podList =
+ this.internalClient
+ .pods()
+ .withLabels(labels)
+ .list(new
ListOptionsBuilder().withResourceVersion("0").build())
Review Comment:
```suggestion
.list(new
ListOptionsBuilder().withResourceVersion("0").build())
```
We'd better add some comments for this magic number.
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -233,6 +239,7 @@ public KubernetesWatch watchPodsAndDoCallback(
this.internalClient
.pods()
.withLabels(labels)
+
.withResourceVersion("0")
Review Comment:
Refer to previous comments.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+ protected void mockPodEventWithLabels(Map<String, String> labels) {
+ final Pod pod1 =
+ new PodBuilder()
+ .withNewMetadata()
+ .withNamespace("test")
+ .withName("tm_pod1")
+ .withLabels(labels)
+ .withResourceVersion("5668")
+ .endMetadata()
+ .build();
+ // mock four kinds of events.
+ server.expect()
+ .withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+ .andUpgradeToWebSocket()
+ .open()
+ .waitFor(1000)
Review Comment:
I'm not very familiar with this api. Can anyone tell me if this means we
will definitely wait for `1` second or at most `1` second. If it is the former,
it should definitely be prohibited because it will seriously slow down the
execution time of `AZP`.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
.isEmpty();
}
+ @Test
+ void testWatchPodsAndDoCallback() throws Exception {
+ mockPodEventWithLabels(TESTING_LABELS);
+ // the count latch for events.
+ final CountDownLatch eventLatch = new CountDownLatch(4);
+ this.flinkKubeClient.watchPodsAndDoCallback(
Review Comment:
It seems that we have too many class implements
`FlinkKubeClient.WatchCallbackHandler<T>` only for testing purpose. We'd better
introduce a more general and reusable `TestingKubernetesPodCallbackHandler` as
the first commit and rewrite all other impls like `NoOpWatchCallbackHandler` &
`TestingCallbackHandler`.
Don't worry too much, if you don't know how to do this refactor, I can push
this commit as example.
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -191,7 +192,12 @@ public Optional<Endpoint> getRestEndpoint(String
clusterId) {
@Override
public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) {
- final List<Pod> podList =
this.internalClient.pods().withLabels(labels).list().getItems();
+ final List<Pod> podList =
+ this.internalClient
+ .pods()
+ .withLabels(labels)
+ .list(new
ListOptionsBuilder().withResourceVersion("0").build())
Review Comment:
+1 for this.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -143,6 +147,34 @@ protected void mockGetDeploymentWithError() {
server.expect().get().withPath(path).andReturn(500, "Expected
error").always();
}
+ protected void mockPodEventWithLabels(Map<String, String> labels) {
+ final Pod pod1 =
+ new PodBuilder()
+ .withNewMetadata()
+ .withNamespace("test")
+ .withName("tm_pod1")
+ .withLabels(labels)
+ .withResourceVersion("5668")
+ .endMetadata()
+ .build();
+ // mock four kinds of events.
+ server.expect()
+ .withPath(
+
"/api/v1/namespaces/test/pods?labelSelector=label1%3Dvalue1%2Clabel2%3Dvalue2&resourceVersion=0&allowWatchBookmarks=true&watch=true")
+ .andUpgradeToWebSocket()
+ .open()
+ .waitFor(1000)
Review Comment:
This time also should be extracted to a constant value.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -68,9 +70,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
+import static org.junit.Assert.assertTrue;
/** Tests for Fabric implementation of {@link FlinkKubeClient}. */
public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
+
Review Comment:
Why add this new line?
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -411,6 +432,16 @@ void testStopAndCleanupCluster() throws Exception {
.isEmpty();
}
+ @Test
+ void testWatchPodsAndDoCallback() throws Exception {
+ mockPodEventWithLabels(TESTING_LABELS);
+ // the count latch for events.
+ final CountDownLatch eventLatch = new CountDownLatch(4);
+ this.flinkKubeClient.watchPodsAndDoCallback(
Review Comment:
I agreed that we should also check all received events. IMH, We can
introduce three `CompletableFuture<Watch.Action>` to handle this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]