This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2c49edc43ff5d3f4d2a5e90729dd0cb2232df6c Author: Mrart <[email protected]> AuthorDate: Wed Jun 7 15:29:59 2023 +0800 [FLINK-27925][kubernetes] refactor TestingWatchCallbackHandler for watch tm pod. --- .../kubeclient/TestingWatchCallbackHandler.java | 128 ++++++++++++++ .../resources/KubernetesPodsWatcherTest.java | 72 +++----- .../resources/KubernetesSharedInformerITCase.java | 194 +++++++++++---------- .../resources/NoOpWatchCallbackHandler.java | 55 ------ 4 files changed, 255 insertions(+), 194 deletions(-) diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingWatchCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingWatchCallbackHandler.java new file mode 100644 index 00000000000..c51a545f387 --- /dev/null +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingWatchCallbackHandler.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient; + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler; + +import java.util.List; +import java.util.function.Consumer; + +/** Mock {@link WatchCallbackHandler} for testing. */ +public class TestingWatchCallbackHandler<T> implements WatchCallbackHandler<T> { + private final Consumer<List<T>> onAddedConsumer; + + private final Consumer<List<T>> onModifiedConsumer; + + private final Consumer<List<T>> onDeletedConsumer; + + private final Consumer<List<T>> onErrorConsumer; + + private final Consumer<Throwable> handleErrorConsumer; + + private TestingWatchCallbackHandler( + Consumer<List<T>> onAddedConsumer, + Consumer<List<T>> onModifiedConsumer, + Consumer<List<T>> onDeletedConsumer, + Consumer<List<T>> onErrorConsumer, + Consumer<Throwable> handleErrorConsumer) { + this.onAddedConsumer = onAddedConsumer; + this.onModifiedConsumer = onModifiedConsumer; + this.onDeletedConsumer = onDeletedConsumer; + this.onErrorConsumer = onErrorConsumer; + this.handleErrorConsumer = handleErrorConsumer; + } + + @Override + public void onAdded(List<T> resources) { + onAddedConsumer.accept(resources); + } + + @Override + public void onModified(List<T> resources) { + onModifiedConsumer.accept(resources); + } + + @Override + public void onDeleted(List<T> resources) { + onDeletedConsumer.accept(resources); + } + + @Override + public void onError(List<T> resources) { + onErrorConsumer.accept(resources); + } + + @Override + public void handleError(Throwable throwable) { + handleErrorConsumer.accept(throwable); + } + + public static <T> Builder<T> builder() { + return new Builder<>(); + } + + /** Builder for {@link TestingWatchCallbackHandler}. */ + public static class Builder<T> { + private Consumer<List<T>> onAddedConsumer = (ignore) -> {}; + + private Consumer<List<T>> onModifiedConsumer = (ignore) -> {}; + + private Consumer<List<T>> onDeletedConsumer = (ignore) -> {}; + + private Consumer<List<T>> onErrorConsumer = (ignore) -> {}; + + private Consumer<Throwable> handleErrorConsumer = (ignore) -> {}; + + private Builder() {} + + public Builder<T> setOnAddedConsumer(Consumer<List<T>> onAddedConsumer) { + this.onAddedConsumer = onAddedConsumer; + return this; + } + + public Builder<T> setOnModifiedConsumer(Consumer<List<T>> onModifiedConsumer) { + this.onModifiedConsumer = onModifiedConsumer; + return this; + } + + public Builder<T> setOnDeletedConsumer(Consumer<List<T>> onDeletedConsumer) { + this.onDeletedConsumer = onDeletedConsumer; + return this; + } + + public Builder<T> setOnErrorConsumer(Consumer<List<T>> onErrorConsumer) { + this.onErrorConsumer = onErrorConsumer; + return this; + } + + public Builder<T> setHandleErrorConsumer(Consumer<Throwable> handleErrorConsumer) { + this.handleErrorConsumer = handleErrorConsumer; + return this; + } + + public TestingWatchCallbackHandler<T> build() { + return new TestingWatchCallbackHandler<>( + onAddedConsumer, + onModifiedConsumer, + onDeletedConsumer, + onErrorConsumer, + handleErrorConsumer); + } + } +} diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java index f94caf85a04..905d236ebf2 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcherTest.java @@ -18,8 +18,8 @@ package org.apache.flink.kubernetes.kubeclient.resources; -import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.TestingWatchCallbackHandler; import io.fabric8.kubernetes.api.model.StatusBuilder; import io.fabric8.kubernetes.client.KubernetesClientException; @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import static java.net.HttpURLConnection.HTTP_GONE; import static org.assertj.core.api.Assertions.assertThat; @@ -48,7 +47,9 @@ class KubernetesPodsWatcherTest { void testClosingWithNullException() { final KubernetesPodsWatcher podsWatcher = new KubernetesPodsWatcher( - new TestingCallbackHandler(e -> fail("Should not reach here."))); + TestingWatchCallbackHandler.<KubernetesPod>builder() + .setHandleErrorConsumer(e -> fail("Should not reach here.")) + .build()); podsWatcher.onClose(null); } @@ -56,7 +57,10 @@ class KubernetesPodsWatcherTest { void testClosingWithException() { final AtomicBoolean called = new AtomicBoolean(false); final KubernetesPodsWatcher podsWatcher = - new KubernetesPodsWatcher(new TestingCallbackHandler(e -> called.set(true))); + new KubernetesPodsWatcher( + TestingWatchCallbackHandler.<KubernetesPod>builder() + .setHandleErrorConsumer(e -> called.set(true)) + .build()); podsWatcher.onClose(new WatcherException("exception")); assertThat(called.get()).isTrue(); } @@ -65,7 +69,13 @@ class KubernetesPodsWatcherTest { void testCallbackHandler() { FlinkPod pod = new FlinkPod.Builder().build(); final KubernetesPodsWatcher podsWatcher = - new KubernetesPodsWatcher(new TestingCallbackHandler(e -> {})); + new KubernetesPodsWatcher( + TestingWatchCallbackHandler.<KubernetesPod>builder() + .setOnAddedConsumer(pods -> podAddedList.addAll(pods)) + .setOnModifiedConsumer(pods -> podModifiedList.addAll(pods)) + .setOnDeletedConsumer(pods -> podDeletedList.addAll(pods)) + .setOnErrorConsumer(pods -> podErrorList.addAll(pods)) + .build()); podsWatcher.eventReceived(Watcher.Action.ADDED, pod.getPodWithoutMainContainer()); podsWatcher.eventReceived(Watcher.Action.MODIFIED, pod.getPodWithoutMainContainer()); podsWatcher.eventReceived(Watcher.Action.DELETED, pod.getPodWithoutMainContainer()); @@ -82,52 +92,20 @@ class KubernetesPodsWatcherTest { final String errMsg = "too old resource version"; final KubernetesPodsWatcher podsWatcher = new KubernetesPodsWatcher( - new TestingCallbackHandler( - e -> { - assertThat(e) - .isInstanceOf( - KubernetesTooOldResourceVersionException.class) - .hasMessageContaining(errMsg); - })); + TestingWatchCallbackHandler.<KubernetesPod>builder() + .setHandleErrorConsumer( + e -> { + assertThat(e) + .isInstanceOf( + KubernetesTooOldResourceVersionException + .class) + .hasMessageContaining(errMsg); + }) + .build()); podsWatcher.onClose( new WatcherException( errMsg, new KubernetesClientException( errMsg, HTTP_GONE, new StatusBuilder().build()))); } - - private class TestingCallbackHandler - implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> { - - final Consumer<Throwable> consumer; - - TestingCallbackHandler(Consumer<Throwable> consumer) { - this.consumer = consumer; - } - - @Override - public void onAdded(List<KubernetesPod> pods) { - podAddedList.addAll(pods); - } - - @Override - public void onModified(List<KubernetesPod> pods) { - podModifiedList.addAll(pods); - } - - @Override - public void onDeleted(List<KubernetesPod> pods) { - podDeletedList.addAll(pods); - } - - @Override - public void onError(List<KubernetesPod> pods) { - podErrorList.addAll(pods); - } - - @Override - public void handleError(Throwable throwable) { - consumer.accept(throwable); - } - } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java index 3d3d5b3e02a..b48677da8d4 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.KubernetesExtension; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher; import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch; +import org.apache.flink.kubernetes.kubeclient.TestingWatchCallbackHandler; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -59,6 +60,8 @@ class KubernetesSharedInformerITCase { private FlinkKubeClient client; private ExecutorService watchCallbackExecutorService; + private static volatile Long blockVal = 0L; + @BeforeEach void setUp() throws Exception { client = kubernetesExtension.getFlinkKubeClient(); @@ -121,18 +124,38 @@ class KubernetesSharedInformerITCase { final String configMapName = getConfigMapName(System.currentTimeMillis()); final long block = 500; final long maxUpdateVal = 30; - final TestingBlockCallbackHandler handler = - new TestingBlockCallbackHandler(block, maxUpdateVal); + + final CompletableFuture<Void> expectedFuture = new CompletableFuture<>(); + final CompletableFuture<Void> assertFuture = new CompletableFuture<>(); + final TestingWatchCallbackHandler handler = + TestingWatchCallbackHandler.<KubernetesConfigMap>builder() + .setOnAddedConsumer( + (resources) -> + onAddedOrModified( + resources, + maxUpdateVal, + block, + expectedFuture, + assertFuture)) + .setOnModifiedConsumer( + (resources) -> + onAddedOrModified( + resources, + maxUpdateVal, + block, + expectedFuture, + assertFuture)) + .build(); final Watch watch = watcher.watch(configMapName, handler, watchCallbackExecutorService); createConfigMap(configMapName); for (int i = 1; i <= maxUpdateVal; i++) { updateConfigMap(configMapName, ImmutableMap.of("val", String.valueOf(i))); } - assertThatCode(() -> handler.expectedFuture.get(120, TimeUnit.SECONDS)) - .as("expected value: " + maxUpdateVal + ", actual: " + handler.val) + assertThatCode(() -> expectedFuture.get(120, TimeUnit.SECONDS)) + .as("expected value: " + maxUpdateVal + ", actual: " + blockVal) .doesNotThrowAnyException(); try { - handler.assertFuture.get(2 * block, TimeUnit.MILLISECONDS); + assertFuture.get(2 * block, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // expected } @@ -187,7 +210,7 @@ class KubernetesSharedInformerITCase { final String name = getConfigMapName(i % 10); final TestingCallbackHandler handler = new TestingCallbackHandler(name); callbackHandlers.add(handler); - final Watch watch = watcher.watch(name, handler, watchCallbackExecutorService); + final Watch watch = watcher.watch(name, handler.build(), watchCallbackExecutorService); watchers.add(watch); } } @@ -196,8 +219,7 @@ class KubernetesSharedInformerITCase { return "shared-informer-test-cluster-" + id; } - private static final class TestingCallbackHandler - extends NoOpWatchCallbackHandler<KubernetesConfigMap> { + private static final class TestingCallbackHandler { private final CompletableFuture<Void> addFuture = new CompletableFuture<>(); private final CompletableFuture<Void> addOrUpdateFuture = new CompletableFuture<>(); @@ -210,96 +232,84 @@ class KubernetesSharedInformerITCase { this.resourceName = resourceName; } - @Override - public void onAdded(List<KubernetesConfigMap> resources) { - final KubernetesConfigMap kubernetesConfigMap = resources.get(0); - check( - assertFuture, - () -> { - assertThat(kubernetesConfigMap.getName()).isEqualTo(resourceName); - assertThat(addFuture).isNotDone(); - assertThat(addOrUpdateFuture).isNotDone(); - }); - addFuture.complete(null); - final String foo = kubernetesConfigMap.getData().get("foo"); - if (foo != null) { - check(assertFuture, () -> assertThat(foo).isEqualTo("bar")); - addOrUpdateFuture.complete(null); - } - } - - @Override - public void onModified(List<KubernetesConfigMap> resources) { - final KubernetesConfigMap kubernetesConfigMap = resources.get(0); - final String foo = kubernetesConfigMap.getData().get("foo"); - check( - assertFuture, - () -> { - assertThat(kubernetesConfigMap.getName()).isEqualTo(resourceName); - assertThat(foo).isEqualTo("bar"); - assertThat(addFuture).isDone(); - }); - if (addOrUpdateFuture.isDone()) { - check(assertFuture, () -> assertThat(isDeleting(kubernetesConfigMap)).isTrue()); - } else { - addOrUpdateFuture.complete(null); - } - } - - @Override - public void onDeleted(List<KubernetesConfigMap> resources) { - check(assertFuture, () -> assertThat(deleteFuture).isNotDone()); - deleteFuture.complete(null); + private TestingWatchCallbackHandler build() { + return TestingWatchCallbackHandler.<KubernetesConfigMap>builder() + .setOnAddedConsumer( + resources -> { + final KubernetesConfigMap kubernetesConfigMap = resources.get(0); + check( + assertFuture, + () -> { + assertThat(kubernetesConfigMap.getName()) + .isEqualTo(resourceName); + assertThat(addFuture).isNotDone(); + assertThat(addOrUpdateFuture).isNotDone(); + }); + addFuture.complete(null); + final String foo = kubernetesConfigMap.getData().get("foo"); + if (foo != null) { + check(assertFuture, () -> assertThat(foo).isEqualTo("bar")); + addOrUpdateFuture.complete(null); + } + }) + .setOnModifiedConsumer( + (resources) -> { + final KubernetesConfigMap kubernetesConfigMap = resources.get(0); + final String foo = kubernetesConfigMap.getData().get("foo"); + check( + assertFuture, + () -> { + assertThat(kubernetesConfigMap.getName()) + .isEqualTo(resourceName); + assertThat(foo).isEqualTo("bar"); + assertThat(addFuture).isDone(); + }); + if (addOrUpdateFuture.isDone()) { + check( + assertFuture, + () -> + assertThat(isDeleting(kubernetesConfigMap)) + .isTrue()); + } else { + addOrUpdateFuture.complete(null); + } + }) + .setOnDeletedConsumer( + resources -> { + check(assertFuture, () -> assertThat(deleteFuture).isNotDone()); + deleteFuture.complete(null); + }) + .build(); } } - private static final class TestingBlockCallbackHandler - extends NoOpWatchCallbackHandler<KubernetesConfigMap> { - private final CompletableFuture<Void> expectedFuture = new CompletableFuture<>(); - private final CompletableFuture<Void> assertFuture = new CompletableFuture<>(); - private final long block; - private final long expected; - - private volatile long val = 0; - - public TestingBlockCallbackHandler(long block, long expected) { - this.block = block; - this.expected = expected; + private static void onAddedOrModified( + List<KubernetesConfigMap> resources, + Long expected, + Long block, + CompletableFuture expectedFuture, + CompletableFuture assertFuture) { + final KubernetesConfigMap kubernetesConfigMap = resources.get(0); + final String valData = kubernetesConfigMap.getData().get("val"); + if (valData == null) { + return; } - - @Override - public void onAdded(List<KubernetesConfigMap> resources) { - onAddedOrModified(resources); - } - - @Override - public void onModified(List<KubernetesConfigMap> resources) { - onAddedOrModified(resources); - } - - private void onAddedOrModified(List<KubernetesConfigMap> resources) { - final KubernetesConfigMap kubernetesConfigMap = resources.get(0); - final String valData = kubernetesConfigMap.getData().get("val"); - if (valData == null) { - return; - } - final long newVal = Long.parseLong(valData); - check( - assertFuture, - () -> assertThat(newVal > val || isDeleting(kubernetesConfigMap)).isTrue()); - val = newVal; - block(); - if (expected == val) { - expectedFuture.complete(null); - } + final long newVal = Long.parseLong(valData); + check( + assertFuture, + () -> assertThat(newVal > blockVal || isDeleting(kubernetesConfigMap)).isTrue()); + blockVal = newVal; + block(block); + if (expected == blockVal) { + expectedFuture.complete(null); } + } - private void block() { - try { - Thread.sleep(block); - } catch (InterruptedException e) { - // ignore - } + private static void block(Long block) { + try { + Thread.sleep(block); + } catch (InterruptedException e) { + // ignore } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java deleted file mode 100644 index 85988868b30..00000000000 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.kubeclient.resources; - -import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; - -import java.util.List; - -/** - * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}. - * - * @param <T> Type of resource to be watched - */ -public class NoOpWatchCallbackHandler<T> implements FlinkKubeClient.WatchCallbackHandler<T> { - @Override - public void onAdded(List<T> resources) { - // noop - } - - @Override - public void onModified(List<T> resources) { - // noop - } - - @Override - public void onDeleted(List<T> resources) { - // noop - } - - @Override - public void onError(List<T> resources) { - // noop - } - - @Override - public void handleError(Throwable throwable) { - // noop - } -}
