wangyang0918 commented on a change in pull request #15501:
URL: https://github.com/apache/flink/pull/15501#discussion_r618128773
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -118,10 +115,8 @@ public void close() {
LOG.info("Closing {}.", this);
leaderElector.stop();
- synchronized (watchLock) {
- if (kubernetesWatch != null) {
- kubernetesWatch.close();
- }
+ if (kubernetesWatch != null) {
Review comment:
nit: `kubernetesWatch` is `final` and could not be null. Right? Then we
could save the null check here.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##########
@@ -89,6 +92,11 @@
this.kubeClient = checkNotNull(kubeClient);
this.clusterId =
checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));
+ this.configMapSharedWatcher =
+ this.kubeClient.createConfigMapSharedWatcher(
+ KubernetesUtils.getCommonLabels(this.clusterId));
Review comment:
I believe we only need to watch the HA related ConfigMaps.
```
KubernetesUtils.getConfigMapLabels(clusterId,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
Review comment:
Why do we need to get the processor twice, one is under the guard of
`processorsLock` and other one is not?
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
Review comment:
nit: could be `final`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
+ if (!processor.running.get()) {
+ watchingProcessors.remove(name);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ sharedIndexInformer.run();
+ }
+
+ @Override
+ public void close() {
+ sharedIndexInformer.stop();
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS,
this.watchingExecutor);
+ }
+
+ private class Processor implements Runnable {
+
+ private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>();
+
+ private final Object listenerQueueLock = new Object();
+ private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>>
listenerQueue =
Review comment:
`@GuardedBy("listenerQueueLock")`
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/** IT Tests for the {@link KubernetesSharedInformer}. */
+public class KubernetesSharedInformerITCase extends TestLogger {
+
+ @ClassRule public static KubernetesResource kubernetesResource = new
KubernetesResource();
+
+ @Test
+ public void testWatch() throws Exception {
+ final ImmutableMap<String, String> labels =
+ ImmutableMap.of("app", "shared-informer-test-cluster");
+ final String configMapPrefix = "shared-informer-test-cluster";
+ try (FlinkKubeClient client = kubernetesResource.getFlinkKubeClient();
+ KubernetesConfigMapSharedWatcher configMapSharedWatcher =
+ client.createConfigMapSharedWatcher(labels)) {
+
+ configMapSharedWatcher.run();
+ for (int i = 0; i < 5; i++) {
+ client.createConfigMap(
+ new KubernetesConfigMap(
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(configMapPrefix + i)
+ .withLabels(labels)
+ .endMetadata()
+ .build()))
+ .get();
+ }
+
+ List<TestingCallbackHandler> callbackHandlers = new ArrayList<>();
+ List<Watch> watchers = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ final String name = configMapPrefix + i % 10;
+ final TestingCallbackHandler handler = new
TestingCallbackHandler(name);
+ callbackHandlers.add(handler);
+ final Watch watch = configMapSharedWatcher.watch(name,
handler);
+ watchers.add(watch);
+ }
+ for (int i = 5; i < 10; i++) {
+ client.createConfigMap(
+ new KubernetesConfigMap(
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(configMapPrefix + i)
+ .withLabels(labels)
+ .endMetadata()
+ .build()))
+ .get();
+ }
+ for (int i = 0; i < 10; i++) {
+ final String configMapName = configMapPrefix + i;
+ client.checkAndUpdateConfigMap(
+ configMapName,
+ configMap -> {
+ configMap.getData().put("foo", "bar");
+ return Optional.of(configMap);
+ })
+ .get();
+ }
+ for (TestingCallbackHandler handler : callbackHandlers) {
+ handler.addOrUpdateFuture.get(2000, TimeUnit.MILLISECONDS);
+ }
+ client.deleteConfigMapsByLabels(labels).get();
+ for (TestingCallbackHandler handler : callbackHandlers) {
+ handler.deleteFuture.get(2000, TimeUnit.MILLISECONDS);
+ }
+ watchers.forEach(Watch::close);
+ }
+ }
+
+ private static final class TestingCallbackHandler
+ extends NoOpWatchCallbackHandler<KubernetesConfigMap> {
+
+ private final CompletableFuture<Void> addOrUpdateFuture = new
CompletableFuture<>();
+ private final CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
+
+ private final String name;
+
+ private TestingCallbackHandler(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void onAdded(List<KubernetesConfigMap> resources) {
Review comment:
I think we need to verify that every watch does not miss the `onAdded`
event.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
Review comment:
`watchingProcessors` is accessed out of the guard lock `processorsLock`.
I think we could add a log here when received some events for the ConfigMap
but there is not registered processor.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
Review comment:
`watchingProcessors` is accessed out of the lock. And we could directly
return `processor` to save the get operation.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
Review comment:
nit: `uuid` could be final.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
##########
@@ -90,10 +87,8 @@ public void close() {
LOG.info("Stopping {}.", this);
- synchronized (watchLock) {
- if (kubernetesWatch != null) {
- kubernetesWatch.close();
- }
+ if (kubernetesWatch != null) {
Review comment:
Same as above.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
Review comment:
Actually, I am a little afraid about using the `newCachedThreadPool`.
Because it may have too many running thread. But I do not have a better
solution here since each processor needs to have a dedicated thread.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
Review comment:
I think `uuid` could not be `null` here since it means that
`getOrStartProcessor` returned a stopped processor. This also means that maybe
we do not need the busy loop here.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -184,6 +174,15 @@ KubernetesWatch watchConfigMaps(
*/
CompletableFuture<Void> deleteConfigMap(String configMapName);
+ /**
+ * Create a shared watcher for ConfigMaps with specified labels.
+ *
+ * @param labels labels to filter ConfigMaps, if labels is empty or null,
all ConfigMaps will be
+ * take in account.
Review comment:
typo: take -> taken
I am curious whether we should allow the `null` labels. Because it may take
too many ConfigMap events and consume too many memory if not used properly.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
Review comment:
I am a little confused about the `resyncPeriodInMillis`. Could it happen
duplicated events after periodically resync. I will also dig more about the
implementation of shared informer.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
+ if (!processor.running.get()) {
+ watchingProcessors.remove(name);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ sharedIndexInformer.run();
+ }
+
+ @Override
+ public void close() {
+ sharedIndexInformer.stop();
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS,
this.watchingExecutor);
+ }
+
+ private class Processor implements Runnable {
+
+ private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>();
+
+ private final Object listenerQueueLock = new Object();
+ private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>>
listenerQueue =
+ new LinkedBlockingQueue<>();
+ private final Map<UUID, WatchCallbackHandler<R>> handlers = new
HashMap<>();
+
+ private final String namespace;
+ private final String name;
+ private final String objKey;
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ private T current;
+
+ private Processor(String namespace, String name) {
Review comment:
Do we really the `namespace` here? I ask this because we are using the
`NamespacedKubernetesClient` and all the resources need to be watched are in
the same namespace.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
+ if (!processor.running.get()) {
+ watchingProcessors.remove(name);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ sharedIndexInformer.run();
+ }
+
+ @Override
+ public void close() {
+ sharedIndexInformer.stop();
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS,
this.watchingExecutor);
+ }
+
+ private class Processor implements Runnable {
+
+ private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>();
+
+ private final Object listenerQueueLock = new Object();
+ private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>>
listenerQueue =
+ new LinkedBlockingQueue<>();
+ private final Map<UUID, WatchCallbackHandler<R>> handlers = new
HashMap<>();
+
+ private final String namespace;
+ private final String name;
+ private final String objKey;
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ private T current;
+
+ private Processor(String namespace, String name) {
+ this.namespace = namespace;
+ this.name = name;
+ this.objKey = namespace + "/" + name;
+ }
+
+ @Override
+ public void run() {
+ log.info("Starting watching processor for {}/{}", this.namespace,
this.name);
+ while (running.get()) {
+ try {
+ Pair<UUID, WatchCallbackHandler<R>> listener;
Review comment:
I think we could have an internal class `ListenerEvent` over the
inexpressive `Pare<>`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
+ if (!processor.running.get()) {
+ watchingProcessors.remove(name);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ sharedIndexInformer.run();
+ }
+
+ @Override
+ public void close() {
+ sharedIndexInformer.stop();
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS,
this.watchingExecutor);
+ }
+
+ private class Processor implements Runnable {
+
+ private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>();
+
+ private final Object listenerQueueLock = new Object();
+ private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>>
listenerQueue =
+ new LinkedBlockingQueue<>();
+ private final Map<UUID, WatchCallbackHandler<R>> handlers = new
HashMap<>();
+
+ private final String namespace;
+ private final String name;
+ private final String objKey;
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
Review comment:
Maybe the name `stopped` is more accurate since it is strange that the
default value of `running` is true.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
+ if (!processor.running.get()) {
+ watchingProcessors.remove(name);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ sharedIndexInformer.run();
+ }
+
+ @Override
+ public void close() {
+ sharedIndexInformer.stop();
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS,
this.watchingExecutor);
+ }
+
+ private class Processor implements Runnable {
+
+ private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>();
+
+ private final Object listenerQueueLock = new Object();
+ private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>>
listenerQueue =
+ new LinkedBlockingQueue<>();
+ private final Map<UUID, WatchCallbackHandler<R>> handlers = new
HashMap<>();
+
+ private final String namespace;
+ private final String name;
+ private final String objKey;
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ private T current;
+
+ private Processor(String namespace, String name) {
+ this.namespace = namespace;
+ this.name = name;
+ this.objKey = namespace + "/" + name;
+ }
+
+ @Override
+ public void run() {
+ log.info("Starting watching processor for {}/{}", this.namespace,
this.name);
+ while (running.get()) {
+ try {
+ Pair<UUID, WatchCallbackHandler<R>> listener;
+ while ((listener = listenerQueue.poll(0,
TimeUnit.MILLISECONDS)) != null) {
+ addOrRemoveListener(listener);
+ }
+
+ String next = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (next != null) {
+ handle();
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception ex) {
+ log.error("Failed to invoke event handler: {}",
ex.getMessage());
+ }
+ }
+ }
+
+ private void addOrRemoveListener(Pair<UUID, WatchCallbackHandler<R>>
listenerEvent) {
+ UUID uuid = listenerEvent.getLeft();
+ WatchCallbackHandler<R> handler = listenerEvent.getRight();
+ if (handler != null) {
+ log.info("Start to watch for {}, watching id:{}", objKey,
uuid);
+ handlers.put(uuid, handler);
+ if (current != null) {
+
handler.onAdded(Collections.singletonList(eventWrapper.apply(current)));
+ }
+ } else {
+ handlers.remove(uuid);
+ log.info("Stop to watch for {}, watching id:{}", objKey, uuid);
+ }
+ log.debug("Current total {} watching for {}", handlers.size(),
objKey);
+ if (handlers.size() == 0) {
+ tryToStop();
+ }
+ }
+
+ private void handle() {
+ T newResource = sharedIndexInformer.getIndexer().getByKey(objKey);
+ T oldResource = this.current;
+ if (newResource == null) {
+ if (oldResource != null) {
+ List<R> event =
Collections.singletonList(eventWrapper.apply(oldResource));
+ this.handlers.forEach((id, handler) ->
handler.onDeleted(event));
+ }
+ } else {
+ List<R> event =
Collections.singletonList(eventWrapper.apply(newResource));
+ if (oldResource == null) {
+ this.handlers.forEach((id, handler) ->
handler.onAdded(event));
+ } else if (!oldResource
+ .getMetadata()
+ .getResourceVersion()
+
.equals(newResource.getMetadata().getResourceVersion())) {
+ this.handlers.forEach((id, handler) ->
handler.onModified(event));
+ }
+ }
+ this.current = newResource;
+ }
+
+ private void tryToStop() {
+ synchronized (listenerQueueLock) {
+ if (listenerQueue.size() == 0) {
+ stop();
+ }
+ }
+ if (!running.get()) {
+ removeStaleProcessor(name);
+ }
+ }
+
+ private void stop() {
+ if (running.compareAndSet(true, false)) {
+ log.info("Stopping watching processor for {}/{}",
this.namespace, this.name);
+ }
+ }
+
+ private void triggerEvent(String name) {
+ Optional.ofNullable(name).ifPresent(this.queue::add);
Review comment:
Could it happen that the `name` is null?
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/** IT Tests for the {@link KubernetesSharedInformer}. */
+public class KubernetesSharedInformerITCase extends TestLogger {
+
+ @ClassRule public static KubernetesResource kubernetesResource = new
KubernetesResource();
+
+ @Test
+ public void testWatch() throws Exception {
+ final ImmutableMap<String, String> labels =
+ ImmutableMap.of("app", "shared-informer-test-cluster");
+ final String configMapPrefix = "shared-informer-test-cluster";
+ try (FlinkKubeClient client = kubernetesResource.getFlinkKubeClient();
+ KubernetesConfigMapSharedWatcher configMapSharedWatcher =
+ client.createConfigMapSharedWatcher(labels)) {
+
+ configMapSharedWatcher.run();
+ for (int i = 0; i < 5; i++) {
Review comment:
The create ConfigMap logics could be deduplicated by introducing an
internal method.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -110,6 +115,9 @@ public void teardown() throws Exception {
Context() {
flinkKubeClient = createFlinkKubeClient();
+ configMapSharedWatcher =
+ flinkKubeClient.createConfigMapSharedWatcher(
+ KubernetesUtils.getCommonLabels(CLUSTER_ID));
Review comment:
KubernetesUtils.getConfigMapLabels(CLUSTER_ID,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+ T extends HasMetadata, TList extends
KubernetesResourceList<T>, R>
+ implements KubernetesSharedWatcher<R> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final NamespacedKubernetesClient client;
+ private final SharedIndexInformer<T> sharedIndexInformer;
+ private final Function<T, R> eventWrapper;
+
+ private final ExecutorService watchingExecutor;
+
+ private final Object processorsLock = new Object();
+
+ @GuardedBy("processorsLock")
+ private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+ public KubernetesSharedInformer(
+ NamespacedKubernetesClient client,
+ Class<T> apiTypeClass,
+ Class<TList> apiListTypeClass,
+ Map<String, String> labels,
+ Function<T, R> eventWrapper) {
+ this.client = client;
+ SharedInformerFactory informerFactory =
+ client.informers(
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("KubernetesClient-Informer")));
+ if (!CollectionUtil.isNullOrEmpty(labels)) {
+ informerFactory.withLabels(labels);
+ }
+ this.sharedIndexInformer =
+ informerFactory.sharedIndexInformerFor(apiTypeClass,
apiListTypeClass, 30000);
+ this.sharedIndexInformer.addEventHandler(
+ new ResourceEventHandler<T>() {
+ @Override
+ public void onAdd(T obj) {
+ notify(obj.getMetadata().getName());
+ }
+
+ @Override
+ public void onUpdate(T oldObj, T newObj) {
+ notify(newObj.getMetadata().getName());
+ }
+
+ @Override
+ public void onDelete(T obj, boolean
deletedFinalStateUnknown) {
+ notify(obj.getMetadata().getName());
+ }
+
+ private void notify(String name) {
+ Optional.ofNullable(watchingProcessors.get(name))
+ .ifPresent(processor ->
processor.triggerEvent(name));
+ }
+ });
+
+ this.watchingExecutor =
+ Executors.newCachedThreadPool(
+ new
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+ this.eventWrapper = eventWrapper;
+ }
+
+ @Override
+ public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+ while (true) {
+ final Processor processor = getOrStartProcessor(name);
+ Optional<UUID> uuid = processor.registerListener(callbackHandler);
+ if (uuid.isPresent()) {
+ return () -> processor.unregisterListener(uuid.get());
+ }
+ log.debug(
+ "Watching processor for {}/{} is stopping, try to start
another",
+ this.client.getNamespace(),
+ name);
+ }
+ }
+
+ private Processor getOrStartProcessor(String name) {
+ Processor processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ synchronized (processorsLock) {
+ processor = watchingProcessors.get(name);
+ if (processor != null && processor.running.get()) {
+ return processor;
+ }
+ processor = new Processor(client.getNamespace(), name);
+ watchingProcessors.put(name, processor);
+ }
+ processor.triggerEvent(name);
+ watchingExecutor.execute(processor);
+ return watchingProcessors.get(name);
+ }
+
+ private void removeStaleProcessor(String name) {
+ synchronized (processorsLock) {
+ Processor processor = watchingProcessors.get(name);
+ if (!processor.running.get()) {
+ watchingProcessors.remove(name);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ sharedIndexInformer.run();
+ }
+
+ @Override
+ public void close() {
+ sharedIndexInformer.stop();
+ ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS,
this.watchingExecutor);
+ }
+
+ private class Processor implements Runnable {
+
+ private final BlockingQueue<String> queue = new
LinkedBlockingQueue<>();
+
+ private final Object listenerQueueLock = new Object();
+ private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>>
listenerQueue =
+ new LinkedBlockingQueue<>();
+ private final Map<UUID, WatchCallbackHandler<R>> handlers = new
HashMap<>();
+
+ private final String namespace;
+ private final String name;
+ private final String objKey;
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ private T current;
+
+ private Processor(String namespace, String name) {
+ this.namespace = namespace;
+ this.name = name;
+ this.objKey = namespace + "/" + name;
+ }
+
+ @Override
+ public void run() {
+ log.info("Starting watching processor for {}/{}", this.namespace,
this.name);
+ while (running.get()) {
+ try {
+ Pair<UUID, WatchCallbackHandler<R>> listener;
+ while ((listener = listenerQueue.poll(0,
TimeUnit.MILLISECONDS)) != null) {
+ addOrRemoveListener(listener);
+ }
+
+ String next = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (next != null) {
+ handle();
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception ex) {
+ log.error("Failed to invoke event handler: {}",
ex.getMessage());
+ }
+ }
+ }
+
+ private void addOrRemoveListener(Pair<UUID, WatchCallbackHandler<R>>
listenerEvent) {
+ UUID uuid = listenerEvent.getLeft();
+ WatchCallbackHandler<R> handler = listenerEvent.getRight();
+ if (handler != null) {
+ log.info("Start to watch for {}, watching id:{}", objKey,
uuid);
+ handlers.put(uuid, handler);
+ if (current != null) {
+
handler.onAdded(Collections.singletonList(eventWrapper.apply(current)));
+ }
+ } else {
+ handlers.remove(uuid);
+ log.info("Stop to watch for {}, watching id:{}", objKey, uuid);
+ }
+ log.debug("Current total {} watching for {}", handlers.size(),
objKey);
+ if (handlers.size() == 0) {
+ tryToStop();
+ }
+ }
+
+ private void handle() {
Review comment:
Compared with per-job watching, I have a concern here.
Currently, each ConfigMap will have a dedicated a processor, which has
multiple handlers for different watches. I think it could happen that the later
registered handler misses the `onAdded` event. It might not be a problem for
Kubernetes HA services now. But `KubernetesSharedInformer` is a common
interface and we should document this limitation or fix it.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -105,7 +101,8 @@ public KubernetesLeaderElectionDriver(
running = true;
leaderElector.run();
kubernetesWatch =
- kubeClient.watchConfigMaps(configMapName, new
ConfigMapCallbackHandlerImpl());
+ checkNotNull(configMapSharedWatcher, "ConfigMap Shared
Informer")
Review comment:
Maybe we should register the watch first and then run the
`leaderElector`. Otherwise, it could happen that the `onAdded` event appears
before we register the watch in `KubernetesSharedInformer`.
##########
File path:
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -467,25 +463,6 @@ public void
testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep
}
}
- @Test
- public void testWatchConfigMaps() throws Exception {
- final String kubeConfigFile = writeKubeConfigForMockKubernetesServer();
- flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE,
kubeConfigFile);
-
- final FlinkKubeClient realFlinkKubeClient =
-
FlinkKubeClientFactory.getInstance().fromConfiguration(flinkConfig, "testing");
- realFlinkKubeClient.watchConfigMaps(CLUSTER_ID, new
NoOpWatchCallbackHandler<>());
- final String path =
- "/api/v1/namespaces/"
- + NAMESPACE
- + "/configmaps?fieldSelector=metadata.name%3D"
- + CLUSTER_ID
- + "&watch=true";
- final RecordedRequest watchRequest = server.takeRequest(TIMEOUT,
TimeUnit.MILLISECONDS);
Review comment:
`TIMEOUT` could be removed now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]