xintongsong commented on code in PR #24163:
URL: https://github.com/apache/flink/pull/24163#discussion_r1469101401
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
}
+ @Test
+ void testWatchPodsAndDoCallbackFail() throws Exception {
+ mockPodEventWithLabelsFail(
+ NAMESPACE, TASKMANAGER_POD_NAME,
KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+ TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler =
+ TestingWatchCallbackHandler.<KubernetesPod>builder().build();
+ Long watchStartTime = System.currentTimeMillis();
+ assertThatThrownBy(
+ () ->
+ this.flinkKubeClient.watchPodsAndDoCallback(
+ TESTING_LABELS, watchCallbackHandler));
Review Comment:
It is unclear to me whether `watchPodsAndDoCallback` suppose to succeed or
fail? There's no verification against the error being thrown.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -183,6 +183,29 @@ protected void mockPodEventWithLabels(
.once();
}
+ protected void mockPodEventWithLabelsFail(
+ String namespace, String podName, String resourceVersion,
Map<String, String> labels) {
Review Comment:
`podName` is never used.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
}
+ @Test
+ void testWatchPodsAndDoCallbackFail() throws Exception {
+ mockPodEventWithLabelsFail(
+ NAMESPACE, TASKMANAGER_POD_NAME,
KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+ TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler =
+ TestingWatchCallbackHandler.<KubernetesPod>builder().build();
+ Long watchStartTime = System.currentTimeMillis();
+ assertThatThrownBy(
+ () ->
+ this.flinkKubeClient.watchPodsAndDoCallback(
+ TESTING_LABELS, watchCallbackHandler));
+ Long watchEndTime = System.currentTimeMillis();
+ Long watchDuration = watchEndTime - watchStartTime;
+ assertThat(watchDuration > 100);
Review Comment:
Relying on the duration to decide whether the retry has happened can be
unstable. I'd suggest to mock the server in a way that replies failure messages
for certain times following by a success message.
##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:
##########
@@ -247,11 +259,12 @@ public KubernetesWatch watchPodsAndDoCallback(
new
KubernetesPodsWatcher(
podCallbackHandler))),
kubeClientExecutorService),
- maxRetryAttempts,
+ new ExponentialBackoffRetryStrategy(
+ maxRetryAttempts, initialRetryInterval,
maxRetryInterval),
t ->
ExceptionUtils.findThrowable(t,
KubernetesClientException.class)
.isPresent(),
- kubeClientExecutorService)
+ new
ScheduledExecutorServiceAdapter(kubeClientExecutorService))
.get();
Review Comment:
This means `watchPodsAndDoCallback` is blocking and can wait for minutes to
return. It should not longer be called on the RPC main thread.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
}
+ @Test
+ void testWatchPodsAndDoCallbackFail() throws Exception {
Review Comment:
`Exception` is never thrown.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -183,6 +183,29 @@ protected void mockPodEventWithLabels(
.once();
}
+ protected void mockPodEventWithLabelsFail(
+ String namespace, String podName, String resourceVersion,
Map<String, String> labels) {
+ final Pod pod =
+ new PodBuilder()
+ .withNewMetadata()
+ .withNamespace(namespace)
+ .withName(podName)
+ .withLabels(labels)
+ .withResourceVersion("5668")
+ .endMetadata()
+ .build();
Review Comment:
It seems there's no need to create this `pod`.
##########
docs/layouts/shortcodes/generated/kubernetes_config_configuration.html:
##########
@@ -290,5 +290,17 @@
<td>Integer</td>
<td>Defines the number of Kubernetes transactional operation
retries before the client gives up. For example, <code
class="highlighter-rouge">FlinkKubeClient#checkAndUpdateConfigMap</code>.</td>
</tr>
+ <tr>
+
<td><h5>kubernetes.transactional-operation.initial-retry-delay</h5></td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Duration</td>
+ <td>Defines the initial duration of Kubernetes transactional
operation retries after fail.</td>
+ </tr>
+ <tr>
+
<td><h5>kubernetes.transactional-operation.max-retry-delay</h5></td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Duration</td>
+ <td>Defines the max duration of Kubernetes transactional operation
retries after fail.</td>
+ </tr>
Review Comment:
We need to re-generate this document. The CI failure is related to this.
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java:
##########
@@ -183,6 +183,29 @@ protected void mockPodEventWithLabels(
.once();
}
+ protected void mockPodEventWithLabelsFail(
Review Comment:
This name does not reflect what this method really does. I'd suggest
`mockWatchPodFailure`
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java:
##########
@@ -454,6 +451,22 @@ void testWatchPodsAndDoCallback() throws Exception {
assertThat(podModifiedAction.get()).isEqualTo(Action.MODIFIED);
}
+ @Test
+ void testWatchPodsAndDoCallbackFail() throws Exception {
+ mockPodEventWithLabelsFail(
+ NAMESPACE, TASKMANAGER_POD_NAME,
KUBERNETES_ZERO_RESOURCE_VERSION, TESTING_LABELS);
+ TestingWatchCallbackHandler<KubernetesPod> watchCallbackHandler =
+ TestingWatchCallbackHandler.<KubernetesPod>builder().build();
+ Long watchStartTime = System.currentTimeMillis();
Review Comment:
I'd suggest to use `System.nanoTime()`.
`System.currentTimeMillis()` can be updated at a larger granularity and is
not suitable ms-level time counting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]