This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2c49edc43ff5d3f4d2a5e90729dd0cb2232df6c
Author: Mrart <[email protected]>
AuthorDate: Wed Jun 7 15:29:59 2023 +0800

    [FLINK-27925][kubernetes] refactor TestingWatchCallbackHandler for watch tm 
pod.
---
 .../kubeclient/TestingWatchCallbackHandler.java    | 128 ++++++++++++++
 .../resources/KubernetesPodsWatcherTest.java       |  72 +++-----
 .../resources/KubernetesSharedInformerITCase.java  | 194 +++++++++++----------
 .../resources/NoOpWatchCallbackHandler.java        |  55 ------
 4 files changed, 255 insertions(+), 194 deletions(-)

diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingWatchCallbackHandler.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingWatchCallbackHandler.java
new file mode 100644
index 00000000000..c51a545f387
--- /dev/null
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingWatchCallbackHandler.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+/** Mock {@link WatchCallbackHandler} for testing. */
+public class TestingWatchCallbackHandler<T> implements WatchCallbackHandler<T> 
{
+    private final Consumer<List<T>> onAddedConsumer;
+
+    private final Consumer<List<T>> onModifiedConsumer;
+
+    private final Consumer<List<T>> onDeletedConsumer;
+
+    private final Consumer<List<T>> onErrorConsumer;
+
+    private final Consumer<Throwable> handleErrorConsumer;
+
+    private TestingWatchCallbackHandler(
+            Consumer<List<T>> onAddedConsumer,
+            Consumer<List<T>> onModifiedConsumer,
+            Consumer<List<T>> onDeletedConsumer,
+            Consumer<List<T>> onErrorConsumer,
+            Consumer<Throwable> handleErrorConsumer) {
+        this.onAddedConsumer = onAddedConsumer;
+        this.onModifiedConsumer = onModifiedConsumer;
+        this.onDeletedConsumer = onDeletedConsumer;
+        this.onErrorConsumer = onErrorConsumer;
+        this.handleErrorConsumer = handleErrorConsumer;
+    }
+
+    @Override
+    public void onAdded(List<T> resources) {
+        onAddedConsumer.accept(resources);
+    }
+
+    @Override
+    public void onModified(List<T> resources) {
+        onModifiedConsumer.accept(resources);
+    }
+
+    @Override
+    public void onDeleted(List<T> resources) {
+        onDeletedConsumer.accept(resources);
+    }
+
+    @Override
+    public void onError(List<T> resources) {
+        onErrorConsumer.accept(resources);
+    }
+
+    @Override
+    public void handleError(Throwable throwable) {
+        handleErrorConsumer.accept(throwable);
+    }
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /** Builder for {@link TestingWatchCallbackHandler}. */
+    public static class Builder<T> {
+        private Consumer<List<T>> onAddedConsumer = (ignore) -> {};
+
+        private Consumer<List<T>> onModifiedConsumer = (ignore) -> {};
+
+        private Consumer<List<T>> onDeletedConsumer = (ignore) -> {};
+
+        private Consumer<List<T>> onErrorConsumer = (ignore) -> {};
+
+        private Consumer<Throwable> handleErrorConsumer = (ignore) -> {};
+
+        private Builder() {}
+
+        public Builder<T> setOnAddedConsumer(Consumer<List<T>> 
onAddedConsumer) {
+            this.onAddedConsumer = onAddedConsumer;
+            return this;
+        }
+
+        public Builder<T> setOnModifiedConsumer(Consumer<List<T>> 
onModifiedConsumer) {
+            this.onModifiedConsumer = onModifiedConsumer;
+            return this;
+        }
+
+        public Builder<T> setOnDeletedConsumer(Consumer<List<T>> 
onDeletedConsumer) {
+            this.onDeletedConsumer = onDeletedConsumer;
+            return this;
+        }
+
+        public Builder<T> setOnErrorConsumer(Consumer<List<T>> 
onErrorConsumer) {
+            this.onErrorConsumer = onErrorConsumer;
+            return this;
+        }
+
+        public Builder<T> setHandleErrorConsumer(Consumer<Throwable> 
handleErrorConsumer) {
+            this.handleErrorConsumer = handleErrorConsumer;
+            return this;
+        }
+
+        public TestingWatchCallbackHandler<T> build() {
+            return new TestingWatchCallbackHandler<>(
+                    onAddedConsumer,
+                    onModifiedConsumer,
+                    onDeletedConsumer,
+                    onErrorConsumer,
+                    handleErrorConsumer);
+        }
+    }
+}
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
index f94caf85a04..905d236ebf2 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.resources;
 
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.TestingWatchCallbackHandler;
 
 import io.fabric8.kubernetes.api.model.StatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClientException;
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 
 import static java.net.HttpURLConnection.HTTP_GONE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -48,7 +47,9 @@ class KubernetesPodsWatcherTest {
     void testClosingWithNullException() {
         final KubernetesPodsWatcher podsWatcher =
                 new KubernetesPodsWatcher(
-                        new TestingCallbackHandler(e -> fail("Should not reach 
here.")));
+                        TestingWatchCallbackHandler.<KubernetesPod>builder()
+                                .setHandleErrorConsumer(e -> fail("Should not 
reach here."))
+                                .build());
         podsWatcher.onClose(null);
     }
 
@@ -56,7 +57,10 @@ class KubernetesPodsWatcherTest {
     void testClosingWithException() {
         final AtomicBoolean called = new AtomicBoolean(false);
         final KubernetesPodsWatcher podsWatcher =
-                new KubernetesPodsWatcher(new TestingCallbackHandler(e -> 
called.set(true)));
+                new KubernetesPodsWatcher(
+                        TestingWatchCallbackHandler.<KubernetesPod>builder()
+                                .setHandleErrorConsumer(e -> called.set(true))
+                                .build());
         podsWatcher.onClose(new WatcherException("exception"));
         assertThat(called.get()).isTrue();
     }
@@ -65,7 +69,13 @@ class KubernetesPodsWatcherTest {
     void testCallbackHandler() {
         FlinkPod pod = new FlinkPod.Builder().build();
         final KubernetesPodsWatcher podsWatcher =
-                new KubernetesPodsWatcher(new TestingCallbackHandler(e -> {}));
+                new KubernetesPodsWatcher(
+                        TestingWatchCallbackHandler.<KubernetesPod>builder()
+                                .setOnAddedConsumer(pods -> 
podAddedList.addAll(pods))
+                                .setOnModifiedConsumer(pods -> 
podModifiedList.addAll(pods))
+                                .setOnDeletedConsumer(pods -> 
podDeletedList.addAll(pods))
+                                .setOnErrorConsumer(pods -> 
podErrorList.addAll(pods))
+                                .build());
         podsWatcher.eventReceived(Watcher.Action.ADDED, 
pod.getPodWithoutMainContainer());
         podsWatcher.eventReceived(Watcher.Action.MODIFIED, 
pod.getPodWithoutMainContainer());
         podsWatcher.eventReceived(Watcher.Action.DELETED, 
pod.getPodWithoutMainContainer());
@@ -82,52 +92,20 @@ class KubernetesPodsWatcherTest {
         final String errMsg = "too old resource version";
         final KubernetesPodsWatcher podsWatcher =
                 new KubernetesPodsWatcher(
-                        new TestingCallbackHandler(
-                                e -> {
-                                    assertThat(e)
-                                            .isInstanceOf(
-                                                    
KubernetesTooOldResourceVersionException.class)
-                                            .hasMessageContaining(errMsg);
-                                }));
+                        TestingWatchCallbackHandler.<KubernetesPod>builder()
+                                .setHandleErrorConsumer(
+                                        e -> {
+                                            assertThat(e)
+                                                    .isInstanceOf(
+                                                            
KubernetesTooOldResourceVersionException
+                                                                    .class)
+                                                    
.hasMessageContaining(errMsg);
+                                        })
+                                .build());
         podsWatcher.onClose(
                 new WatcherException(
                         errMsg,
                         new KubernetesClientException(
                                 errMsg, HTTP_GONE, new 
StatusBuilder().build())));
     }
-
-    private class TestingCallbackHandler
-            implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
-
-        final Consumer<Throwable> consumer;
-
-        TestingCallbackHandler(Consumer<Throwable> consumer) {
-            this.consumer = consumer;
-        }
-
-        @Override
-        public void onAdded(List<KubernetesPod> pods) {
-            podAddedList.addAll(pods);
-        }
-
-        @Override
-        public void onModified(List<KubernetesPod> pods) {
-            podModifiedList.addAll(pods);
-        }
-
-        @Override
-        public void onDeleted(List<KubernetesPod> pods) {
-            podDeletedList.addAll(pods);
-        }
-
-        @Override
-        public void onError(List<KubernetesPod> pods) {
-            podErrorList.addAll(pods);
-        }
-
-        @Override
-        public void handleError(Throwable throwable) {
-            consumer.accept(throwable);
-        }
-    }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
index 3d3d5b3e02a..b48677da8d4 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.KubernetesExtension;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
 import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
+import org.apache.flink.kubernetes.kubeclient.TestingWatchCallbackHandler;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
@@ -59,6 +60,8 @@ class KubernetesSharedInformerITCase {
     private FlinkKubeClient client;
     private ExecutorService watchCallbackExecutorService;
 
+    private static volatile Long blockVal = 0L;
+
     @BeforeEach
     void setUp() throws Exception {
         client = kubernetesExtension.getFlinkKubeClient();
@@ -121,18 +124,38 @@ class KubernetesSharedInformerITCase {
             final String configMapName = 
getConfigMapName(System.currentTimeMillis());
             final long block = 500;
             final long maxUpdateVal = 30;
-            final TestingBlockCallbackHandler handler =
-                    new TestingBlockCallbackHandler(block, maxUpdateVal);
+
+            final CompletableFuture<Void> expectedFuture = new 
CompletableFuture<>();
+            final CompletableFuture<Void> assertFuture = new 
CompletableFuture<>();
+            final TestingWatchCallbackHandler handler =
+                    TestingWatchCallbackHandler.<KubernetesConfigMap>builder()
+                            .setOnAddedConsumer(
+                                    (resources) ->
+                                            onAddedOrModified(
+                                                    resources,
+                                                    maxUpdateVal,
+                                                    block,
+                                                    expectedFuture,
+                                                    assertFuture))
+                            .setOnModifiedConsumer(
+                                    (resources) ->
+                                            onAddedOrModified(
+                                                    resources,
+                                                    maxUpdateVal,
+                                                    block,
+                                                    expectedFuture,
+                                                    assertFuture))
+                            .build();
             final Watch watch = watcher.watch(configMapName, handler, 
watchCallbackExecutorService);
             createConfigMap(configMapName);
             for (int i = 1; i <= maxUpdateVal; i++) {
                 updateConfigMap(configMapName, ImmutableMap.of("val", 
String.valueOf(i)));
             }
-            assertThatCode(() -> handler.expectedFuture.get(120, 
TimeUnit.SECONDS))
-                    .as("expected value: " + maxUpdateVal + ", actual: " + 
handler.val)
+            assertThatCode(() -> expectedFuture.get(120, TimeUnit.SECONDS))
+                    .as("expected value: " + maxUpdateVal + ", actual: " + 
blockVal)
                     .doesNotThrowAnyException();
             try {
-                handler.assertFuture.get(2 * block, TimeUnit.MILLISECONDS);
+                assertFuture.get(2 * block, TimeUnit.MILLISECONDS);
             } catch (TimeoutException e) {
                 // expected
             }
@@ -187,7 +210,7 @@ class KubernetesSharedInformerITCase {
             final String name = getConfigMapName(i % 10);
             final TestingCallbackHandler handler = new 
TestingCallbackHandler(name);
             callbackHandlers.add(handler);
-            final Watch watch = watcher.watch(name, handler, 
watchCallbackExecutorService);
+            final Watch watch = watcher.watch(name, handler.build(), 
watchCallbackExecutorService);
             watchers.add(watch);
         }
     }
@@ -196,8 +219,7 @@ class KubernetesSharedInformerITCase {
         return "shared-informer-test-cluster-" + id;
     }
 
-    private static final class TestingCallbackHandler
-            extends NoOpWatchCallbackHandler<KubernetesConfigMap> {
+    private static final class TestingCallbackHandler {
 
         private final CompletableFuture<Void> addFuture = new 
CompletableFuture<>();
         private final CompletableFuture<Void> addOrUpdateFuture = new 
CompletableFuture<>();
@@ -210,96 +232,84 @@ class KubernetesSharedInformerITCase {
             this.resourceName = resourceName;
         }
 
-        @Override
-        public void onAdded(List<KubernetesConfigMap> resources) {
-            final KubernetesConfigMap kubernetesConfigMap = resources.get(0);
-            check(
-                    assertFuture,
-                    () -> {
-                        
assertThat(kubernetesConfigMap.getName()).isEqualTo(resourceName);
-                        assertThat(addFuture).isNotDone();
-                        assertThat(addOrUpdateFuture).isNotDone();
-                    });
-            addFuture.complete(null);
-            final String foo = kubernetesConfigMap.getData().get("foo");
-            if (foo != null) {
-                check(assertFuture, () -> assertThat(foo).isEqualTo("bar"));
-                addOrUpdateFuture.complete(null);
-            }
-        }
-
-        @Override
-        public void onModified(List<KubernetesConfigMap> resources) {
-            final KubernetesConfigMap kubernetesConfigMap = resources.get(0);
-            final String foo = kubernetesConfigMap.getData().get("foo");
-            check(
-                    assertFuture,
-                    () -> {
-                        
assertThat(kubernetesConfigMap.getName()).isEqualTo(resourceName);
-                        assertThat(foo).isEqualTo("bar");
-                        assertThat(addFuture).isDone();
-                    });
-            if (addOrUpdateFuture.isDone()) {
-                check(assertFuture, () -> 
assertThat(isDeleting(kubernetesConfigMap)).isTrue());
-            } else {
-                addOrUpdateFuture.complete(null);
-            }
-        }
-
-        @Override
-        public void onDeleted(List<KubernetesConfigMap> resources) {
-            check(assertFuture, () -> assertThat(deleteFuture).isNotDone());
-            deleteFuture.complete(null);
+        private TestingWatchCallbackHandler build() {
+            return TestingWatchCallbackHandler.<KubernetesConfigMap>builder()
+                    .setOnAddedConsumer(
+                            resources -> {
+                                final KubernetesConfigMap kubernetesConfigMap 
= resources.get(0);
+                                check(
+                                        assertFuture,
+                                        () -> {
+                                            
assertThat(kubernetesConfigMap.getName())
+                                                    .isEqualTo(resourceName);
+                                            assertThat(addFuture).isNotDone();
+                                            
assertThat(addOrUpdateFuture).isNotDone();
+                                        });
+                                addFuture.complete(null);
+                                final String foo = 
kubernetesConfigMap.getData().get("foo");
+                                if (foo != null) {
+                                    check(assertFuture, () -> 
assertThat(foo).isEqualTo("bar"));
+                                    addOrUpdateFuture.complete(null);
+                                }
+                            })
+                    .setOnModifiedConsumer(
+                            (resources) -> {
+                                final KubernetesConfigMap kubernetesConfigMap 
= resources.get(0);
+                                final String foo = 
kubernetesConfigMap.getData().get("foo");
+                                check(
+                                        assertFuture,
+                                        () -> {
+                                            
assertThat(kubernetesConfigMap.getName())
+                                                    .isEqualTo(resourceName);
+                                            assertThat(foo).isEqualTo("bar");
+                                            assertThat(addFuture).isDone();
+                                        });
+                                if (addOrUpdateFuture.isDone()) {
+                                    check(
+                                            assertFuture,
+                                            () ->
+                                                    
assertThat(isDeleting(kubernetesConfigMap))
+                                                            .isTrue());
+                                } else {
+                                    addOrUpdateFuture.complete(null);
+                                }
+                            })
+                    .setOnDeletedConsumer(
+                            resources -> {
+                                check(assertFuture, () -> 
assertThat(deleteFuture).isNotDone());
+                                deleteFuture.complete(null);
+                            })
+                    .build();
         }
     }
 
-    private static final class TestingBlockCallbackHandler
-            extends NoOpWatchCallbackHandler<KubernetesConfigMap> {
-        private final CompletableFuture<Void> expectedFuture = new 
CompletableFuture<>();
-        private final CompletableFuture<Void> assertFuture = new 
CompletableFuture<>();
-        private final long block;
-        private final long expected;
-
-        private volatile long val = 0;
-
-        public TestingBlockCallbackHandler(long block, long expected) {
-            this.block = block;
-            this.expected = expected;
+    private static void onAddedOrModified(
+            List<KubernetesConfigMap> resources,
+            Long expected,
+            Long block,
+            CompletableFuture expectedFuture,
+            CompletableFuture assertFuture) {
+        final KubernetesConfigMap kubernetesConfigMap = resources.get(0);
+        final String valData = kubernetesConfigMap.getData().get("val");
+        if (valData == null) {
+            return;
         }
-
-        @Override
-        public void onAdded(List<KubernetesConfigMap> resources) {
-            onAddedOrModified(resources);
-        }
-
-        @Override
-        public void onModified(List<KubernetesConfigMap> resources) {
-            onAddedOrModified(resources);
-        }
-
-        private void onAddedOrModified(List<KubernetesConfigMap> resources) {
-            final KubernetesConfigMap kubernetesConfigMap = resources.get(0);
-            final String valData = kubernetesConfigMap.getData().get("val");
-            if (valData == null) {
-                return;
-            }
-            final long newVal = Long.parseLong(valData);
-            check(
-                    assertFuture,
-                    () -> assertThat(newVal > val || 
isDeleting(kubernetesConfigMap)).isTrue());
-            val = newVal;
-            block();
-            if (expected == val) {
-                expectedFuture.complete(null);
-            }
+        final long newVal = Long.parseLong(valData);
+        check(
+                assertFuture,
+                () -> assertThat(newVal > blockVal || 
isDeleting(kubernetesConfigMap)).isTrue());
+        blockVal = newVal;
+        block(block);
+        if (expected == blockVal) {
+            expectedFuture.complete(null);
         }
+    }
 
-        private void block() {
-            try {
-                Thread.sleep(block);
-            } catch (InterruptedException e) {
-                // ignore
-            }
+    private static void block(Long block) {
+        try {
+            Thread.sleep(block);
+        } catch (InterruptedException e) {
+            // ignore
         }
     }
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
deleted file mode 100644
index 85988868b30..00000000000
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.kubeclient.resources;
-
-import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
-
-import java.util.List;
-
-/**
- * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}.
- *
- * @param <T> Type of resource to be watched
- */
-public class NoOpWatchCallbackHandler<T> implements 
FlinkKubeClient.WatchCallbackHandler<T> {
-    @Override
-    public void onAdded(List<T> resources) {
-        // noop
-    }
-
-    @Override
-    public void onModified(List<T> resources) {
-        // noop
-    }
-
-    @Override
-    public void onDeleted(List<T> resources) {
-        // noop
-    }
-
-    @Override
-    public void onError(List<T> resources) {
-        // noop
-    }
-
-    @Override
-    public void handleError(Throwable throwable) {
-        // noop
-    }
-}

Reply via email to