xintongsong commented on code in PR #24163:
URL: https://github.com/apache/flink/pull/24163#discussion_r1494040971
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -412,11 +415,9 @@ private void stopPod(String podName) {
});
}
- private Optional<KubernetesWatch> watchTaskManagerPods() throws Exception {
- return Optional.of(
- flinkKubeClient.watchPodsAndDoCallback(
- KubernetesUtils.getTaskManagerSelectors(clusterId),
- new PodCallbackHandlerImpl()));
+ private CompletableFuture<KubernetesWatch> watchTaskManagerPods() throws
Exception {
+ return flinkKubeClient.watchPodsAndDoCallback(
+ KubernetesUtils.getTaskManagerSelectors(clusterId), new
PodCallbackHandlerImpl());
Review Comment:
It would be helpful to print a log when the future is completed.
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -114,7 +116,8 @@ public KubernetesResourceManagerDriver(
@Override
protected void initializeInternal() throws Exception {
- podsWatchOpt = watchTaskManagerPods();
+ podsWatchOptFuture = watchTaskManagerPods();
+ podsWatchOptFuture.get();
Review Comment:
Why do we need to wait for the `podsWatchOptFuture.get()` here? This blocks
the RPC main thread.
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java:
##########
@@ -452,10 +453,22 @@ public void handleError(Throwable throwable) {
.execute(
() -> {
if (running) {
-
podsWatchOpt.ifPresent(KubernetesWatch::close);
+ try {
+ if (podsWatchOptFuture.get() !=
null) {
Review Comment:
This blocks the main thread. Is it necessary to wait for the previous watch
to be closed before creating the new one? If yes, we should create the new
watch in `podsWatchOptFuture.whenCompleteAsync()` to avoid blocking the main
thread.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -454,6 +451,21 @@ void testWatchPodsAndDoCallback() throws Exception {
assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
}
+ @Test
+ void testWatchPodsAndDoCallbackFail() throws Exception {
+ mockWatchPodSuccessAfterFailTwoTimes(
+ NAMESPACE, KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+ TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler =
+ TestingWatchCallbackHandler.<KubernetesPod>builder().build();
+ // disable the retry of kubeClient
+ kubeClient.getConfiguration().setRequestRetryBackoffLimit(0);
+ assertThat(
+ flinkKubeClient
+ .watchPodsAndDoCallback(TESTING_LABELS,
watchCallbackHandler)
+ .get())
Review Comment:
We should add a timeout, or the test case may hang for a long time.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java:
##########
@@ -149,9 +149,10 @@ public List<KubernetesPod> getPodsWithLabels(Map<String,
String> labels) {
}
@Override
- public KubernetesWatch watchPodsAndDoCallback(
+ public CompletableFuture<KubernetesWatch> watchPodsAndDoCallback(
Map<String, String> labels, WatchCallbackHandler<KubernetesPod>
podCallbackHandler) {
- return watchPodsAndDoCallbackFunction.apply(labels,
podCallbackHandler);
+ return CompletableFuture.supplyAsync(
+ () -> watchPodsAndDoCallbackFunction.apply(labels,
podCallbackHandler));
Review Comment:
I think we should change the return value type of
`watchPodsAndDoCallbackFunction` to align with the method signature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]