wangyang0918 commented on a change in pull request #15501:
URL: https://github.com/apache/flink/pull/15501#discussion_r618128773



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -118,10 +115,8 @@ public void close() {
         LOG.info("Closing {}.", this);
         leaderElector.stop();
 
-        synchronized (watchLock) {
-            if (kubernetesWatch != null) {
-                kubernetesWatch.close();
-            }
+        if (kubernetesWatch != null) {

Review comment:
       nit: `kubernetesWatch` is `final` and could not be null. Right? Then we 
could save the null check here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##########
@@ -89,6 +92,11 @@
         this.kubeClient = checkNotNull(kubeClient);
         this.clusterId = 
checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));
 
+        this.configMapSharedWatcher =
+                this.kubeClient.createConfigMapSharedWatcher(
+                        KubernetesUtils.getCommonLabels(this.clusterId));

Review comment:
       I believe we only need to watch the HA related ConfigMaps.
   
   ```
   KubernetesUtils.getConfigMapLabels(clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);

Review comment:
       Why do we need to get the processor twice, one is under the guard of 
`processorsLock` and other one is not?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);

Review comment:
       nit: could be `final`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);
+            if (!processor.running.get()) {
+                watchingProcessors.remove(name);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        sharedIndexInformer.run();
+    }
+
+    @Override
+    public void close() {
+        sharedIndexInformer.stop();
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.watchingExecutor);
+    }
+
+    private class Processor implements Runnable {
+
+        private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>();
+
+        private final Object listenerQueueLock = new Object();
+        private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>> 
listenerQueue =

Review comment:
       `@GuardedBy("listenerQueueLock")`

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/** IT Tests for the {@link KubernetesSharedInformer}. */
+public class KubernetesSharedInformerITCase extends TestLogger {
+
+    @ClassRule public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+
+    @Test
+    public void testWatch() throws Exception {
+        final ImmutableMap<String, String> labels =
+                ImmutableMap.of("app", "shared-informer-test-cluster");
+        final String configMapPrefix = "shared-informer-test-cluster";
+        try (FlinkKubeClient client = kubernetesResource.getFlinkKubeClient();
+                KubernetesConfigMapSharedWatcher configMapSharedWatcher =
+                        client.createConfigMapSharedWatcher(labels)) {
+
+            configMapSharedWatcher.run();
+            for (int i = 0; i < 5; i++) {
+                client.createConfigMap(
+                                new KubernetesConfigMap(
+                                        new ConfigMapBuilder()
+                                                .withNewMetadata()
+                                                .withName(configMapPrefix + i)
+                                                .withLabels(labels)
+                                                .endMetadata()
+                                                .build()))
+                        .get();
+            }
+
+            List<TestingCallbackHandler> callbackHandlers = new ArrayList<>();
+            List<Watch> watchers = new ArrayList<>();
+            for (int i = 0; i < 100; i++) {
+                final String name = configMapPrefix + i % 10;
+                final TestingCallbackHandler handler = new 
TestingCallbackHandler(name);
+                callbackHandlers.add(handler);
+                final Watch watch = configMapSharedWatcher.watch(name, 
handler);
+                watchers.add(watch);
+            }
+            for (int i = 5; i < 10; i++) {
+                client.createConfigMap(
+                                new KubernetesConfigMap(
+                                        new ConfigMapBuilder()
+                                                .withNewMetadata()
+                                                .withName(configMapPrefix + i)
+                                                .withLabels(labels)
+                                                .endMetadata()
+                                                .build()))
+                        .get();
+            }
+            for (int i = 0; i < 10; i++) {
+                final String configMapName = configMapPrefix + i;
+                client.checkAndUpdateConfigMap(
+                                configMapName,
+                                configMap -> {
+                                    configMap.getData().put("foo", "bar");
+                                    return Optional.of(configMap);
+                                })
+                        .get();
+            }
+            for (TestingCallbackHandler handler : callbackHandlers) {
+                handler.addOrUpdateFuture.get(2000, TimeUnit.MILLISECONDS);
+            }
+            client.deleteConfigMapsByLabels(labels).get();
+            for (TestingCallbackHandler handler : callbackHandlers) {
+                handler.deleteFuture.get(2000, TimeUnit.MILLISECONDS);
+            }
+            watchers.forEach(Watch::close);
+        }
+    }
+
+    private static final class TestingCallbackHandler
+            extends NoOpWatchCallbackHandler<KubernetesConfigMap> {
+
+        private final CompletableFuture<Void> addOrUpdateFuture = new 
CompletableFuture<>();
+        private final CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
+
+        private final String name;
+
+        private TestingCallbackHandler(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public void onAdded(List<KubernetesConfigMap> resources) {

Review comment:
       I think we need to verify that every watch does not miss the `onAdded` 
event.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {

Review comment:
       `watchingProcessors` is accessed out of the guard lock `processorsLock`.
   I think we could add a log here when received some events for the ConfigMap 
but there is not registered processor.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);

Review comment:
       `watchingProcessors` is accessed out of the lock. And we could directly 
return `processor` to save the get operation.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);

Review comment:
       nit: `uuid` could be final.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
##########
@@ -90,10 +87,8 @@ public void close() {
 
         LOG.info("Stopping {}.", this);
 
-        synchronized (watchLock) {
-            if (kubernetesWatch != null) {
-                kubernetesWatch.close();
-            }
+        if (kubernetesWatch != null) {

Review comment:
       Same as above.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =

Review comment:
       Actually, I am a little afraid about using the `newCachedThreadPool`. 
Because it may have too many running thread. But I do not have a better 
solution here since each processor needs to have a dedicated thread.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {

Review comment:
       I think `uuid` could not be `null` here since it means that 
`getOrStartProcessor` returned a stopped processor. This also means that maybe 
we do not need the busy loop here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -184,6 +174,15 @@ KubernetesWatch watchConfigMaps(
      */
     CompletableFuture<Void> deleteConfigMap(String configMapName);
 
+    /**
+     * Create a shared watcher for ConfigMaps with specified labels.
+     *
+     * @param labels labels to filter ConfigMaps, if labels is empty or null, 
all ConfigMaps will be
+     *     take in account.

Review comment:
       typo: take -> taken
   
   I am curious whether we should allow the `null` labels. Because it may take 
too many ConfigMap events and consume too many memory if not used properly.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =

Review comment:
       I am a little confused about the `resyncPeriodInMillis`. Could it happen 
duplicated events after periodically resync. I will also dig more about the 
implementation of shared informer.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);
+            if (!processor.running.get()) {
+                watchingProcessors.remove(name);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        sharedIndexInformer.run();
+    }
+
+    @Override
+    public void close() {
+        sharedIndexInformer.stop();
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.watchingExecutor);
+    }
+
+    private class Processor implements Runnable {
+
+        private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>();
+
+        private final Object listenerQueueLock = new Object();
+        private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>> 
listenerQueue =
+                new LinkedBlockingQueue<>();
+        private final Map<UUID, WatchCallbackHandler<R>> handlers = new 
HashMap<>();
+
+        private final String namespace;
+        private final String name;
+        private final String objKey;
+
+        private final AtomicBoolean running = new AtomicBoolean(true);
+
+        private T current;
+
+        private Processor(String namespace, String name) {

Review comment:
       Do we really the `namespace` here? I ask this because we are using the 
`NamespacedKubernetesClient` and all the resources need to be watched are in 
the same namespace.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);
+            if (!processor.running.get()) {
+                watchingProcessors.remove(name);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        sharedIndexInformer.run();
+    }
+
+    @Override
+    public void close() {
+        sharedIndexInformer.stop();
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.watchingExecutor);
+    }
+
+    private class Processor implements Runnable {
+
+        private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>();
+
+        private final Object listenerQueueLock = new Object();
+        private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>> 
listenerQueue =
+                new LinkedBlockingQueue<>();
+        private final Map<UUID, WatchCallbackHandler<R>> handlers = new 
HashMap<>();
+
+        private final String namespace;
+        private final String name;
+        private final String objKey;
+
+        private final AtomicBoolean running = new AtomicBoolean(true);
+
+        private T current;
+
+        private Processor(String namespace, String name) {
+            this.namespace = namespace;
+            this.name = name;
+            this.objKey = namespace + "/" + name;
+        }
+
+        @Override
+        public void run() {
+            log.info("Starting watching processor for {}/{}", this.namespace, 
this.name);
+            while (running.get()) {
+                try {
+                    Pair<UUID, WatchCallbackHandler<R>> listener;

Review comment:
       I think we could have an internal class `ListenerEvent` over the 
inexpressive `Pare<>`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);
+            if (!processor.running.get()) {
+                watchingProcessors.remove(name);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        sharedIndexInformer.run();
+    }
+
+    @Override
+    public void close() {
+        sharedIndexInformer.stop();
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.watchingExecutor);
+    }
+
+    private class Processor implements Runnable {
+
+        private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>();
+
+        private final Object listenerQueueLock = new Object();
+        private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>> 
listenerQueue =
+                new LinkedBlockingQueue<>();
+        private final Map<UUID, WatchCallbackHandler<R>> handlers = new 
HashMap<>();
+
+        private final String namespace;
+        private final String name;
+        private final String objKey;
+
+        private final AtomicBoolean running = new AtomicBoolean(true);

Review comment:
       Maybe the name `stopped` is more accurate since it is strange that the 
default value of `running` is true.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);
+            if (!processor.running.get()) {
+                watchingProcessors.remove(name);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        sharedIndexInformer.run();
+    }
+
+    @Override
+    public void close() {
+        sharedIndexInformer.stop();
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.watchingExecutor);
+    }
+
+    private class Processor implements Runnable {
+
+        private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>();
+
+        private final Object listenerQueueLock = new Object();
+        private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>> 
listenerQueue =
+                new LinkedBlockingQueue<>();
+        private final Map<UUID, WatchCallbackHandler<R>> handlers = new 
HashMap<>();
+
+        private final String namespace;
+        private final String name;
+        private final String objKey;
+
+        private final AtomicBoolean running = new AtomicBoolean(true);
+
+        private T current;
+
+        private Processor(String namespace, String name) {
+            this.namespace = namespace;
+            this.name = name;
+            this.objKey = namespace + "/" + name;
+        }
+
+        @Override
+        public void run() {
+            log.info("Starting watching processor for {}/{}", this.namespace, 
this.name);
+            while (running.get()) {
+                try {
+                    Pair<UUID, WatchCallbackHandler<R>> listener;
+                    while ((listener = listenerQueue.poll(0, 
TimeUnit.MILLISECONDS)) != null) {
+                        addOrRemoveListener(listener);
+                    }
+
+                    String next = queue.poll(100, TimeUnit.MILLISECONDS);
+                    if (next != null) {
+                        handle();
+                    }
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    return;
+                } catch (Exception ex) {
+                    log.error("Failed to invoke event handler: {}", 
ex.getMessage());
+                }
+            }
+        }
+
+        private void addOrRemoveListener(Pair<UUID, WatchCallbackHandler<R>> 
listenerEvent) {
+            UUID uuid = listenerEvent.getLeft();
+            WatchCallbackHandler<R> handler = listenerEvent.getRight();
+            if (handler != null) {
+                log.info("Start to watch for {}, watching id:{}", objKey, 
uuid);
+                handlers.put(uuid, handler);
+                if (current != null) {
+                    
handler.onAdded(Collections.singletonList(eventWrapper.apply(current)));
+                }
+            } else {
+                handlers.remove(uuid);
+                log.info("Stop to watch for {}, watching id:{}", objKey, uuid);
+            }
+            log.debug("Current total {} watching for {}", handlers.size(), 
objKey);
+            if (handlers.size() == 0) {
+                tryToStop();
+            }
+        }
+
+        private void handle() {
+            T newResource = sharedIndexInformer.getIndexer().getByKey(objKey);
+            T oldResource = this.current;
+            if (newResource == null) {
+                if (oldResource != null) {
+                    List<R> event = 
Collections.singletonList(eventWrapper.apply(oldResource));
+                    this.handlers.forEach((id, handler) -> 
handler.onDeleted(event));
+                }
+            } else {
+                List<R> event = 
Collections.singletonList(eventWrapper.apply(newResource));
+                if (oldResource == null) {
+                    this.handlers.forEach((id, handler) -> 
handler.onAdded(event));
+                } else if (!oldResource
+                        .getMetadata()
+                        .getResourceVersion()
+                        
.equals(newResource.getMetadata().getResourceVersion())) {
+                    this.handlers.forEach((id, handler) -> 
handler.onModified(event));
+                }
+            }
+            this.current = newResource;
+        }
+
+        private void tryToStop() {
+            synchronized (listenerQueueLock) {
+                if (listenerQueue.size() == 0) {
+                    stop();
+                }
+            }
+            if (!running.get()) {
+                removeStaleProcessor(name);
+            }
+        }
+
+        private void stop() {
+            if (running.compareAndSet(true, false)) {
+                log.info("Stopping watching processor for {}/{}", 
this.namespace, this.name);
+            }
+        }
+
+        private void triggerEvent(String name) {
+            Optional.ofNullable(name).ifPresent(this.queue::add);

Review comment:
       Could it happen that the `name` is null?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformerITCase.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.KubernetesResource;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher.Watch;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/** IT Tests for the {@link KubernetesSharedInformer}. */
+public class KubernetesSharedInformerITCase extends TestLogger {
+
+    @ClassRule public static KubernetesResource kubernetesResource = new 
KubernetesResource();
+
+    @Test
+    public void testWatch() throws Exception {
+        final ImmutableMap<String, String> labels =
+                ImmutableMap.of("app", "shared-informer-test-cluster");
+        final String configMapPrefix = "shared-informer-test-cluster";
+        try (FlinkKubeClient client = kubernetesResource.getFlinkKubeClient();
+                KubernetesConfigMapSharedWatcher configMapSharedWatcher =
+                        client.createConfigMapSharedWatcher(labels)) {
+
+            configMapSharedWatcher.run();
+            for (int i = 0; i < 5; i++) {

Review comment:
       The create ConfigMap logics could be deduplicated by introducing an 
internal method.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -110,6 +115,9 @@ public void teardown() throws Exception {
 
         Context() {
             flinkKubeClient = createFlinkKubeClient();
+            configMapSharedWatcher =
+                    flinkKubeClient.createConfigMapSharedWatcher(
+                            KubernetesUtils.getCommonLabels(CLUSTER_ID));

Review comment:
       KubernetesUtils.getConfigMapLabels(CLUSTER_ID, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesSharedInformer.java
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExecutorUtils;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final NamespacedKubernetesClient client;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+
+    private final ExecutorService watchingExecutor;
+
+    private final Object processorsLock = new Object();
+
+    @GuardedBy("processorsLock")
+    private final Map<String, Processor> watchingProcessors = new HashMap<>();
+
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        this.client = client;
+        SharedInformerFactory informerFactory =
+                client.informers(
+                        Executors.newSingleThreadExecutor(
+                                new 
ExecutorThreadFactory("KubernetesClient-Informer")));
+        if (!CollectionUtil.isNullOrEmpty(labels)) {
+            informerFactory.withLabels(labels);
+        }
+        this.sharedIndexInformer =
+                informerFactory.sharedIndexInformerFor(apiTypeClass, 
apiListTypeClass, 30000);
+        this.sharedIndexInformer.addEventHandler(
+                new ResourceEventHandler<T>() {
+                    @Override
+                    public void onAdd(T obj) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onUpdate(T oldObj, T newObj) {
+                        notify(newObj.getMetadata().getName());
+                    }
+
+                    @Override
+                    public void onDelete(T obj, boolean 
deletedFinalStateUnknown) {
+                        notify(obj.getMetadata().getName());
+                    }
+
+                    private void notify(String name) {
+                        Optional.ofNullable(watchingProcessors.get(name))
+                                .ifPresent(processor -> 
processor.triggerEvent(name));
+                    }
+                });
+
+        this.watchingExecutor =
+                Executors.newCachedThreadPool(
+                        new 
ExecutorThreadFactory("KubernetesSharedInformer-Watching-Processor"));
+        this.eventWrapper = eventWrapper;
+    }
+
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> callbackHandler) {
+        while (true) {
+            final Processor processor = getOrStartProcessor(name);
+            Optional<UUID> uuid = processor.registerListener(callbackHandler);
+            if (uuid.isPresent()) {
+                return () -> processor.unregisterListener(uuid.get());
+            }
+            log.debug(
+                    "Watching processor for {}/{} is stopping, try to start 
another",
+                    this.client.getNamespace(),
+                    name);
+        }
+    }
+
+    private Processor getOrStartProcessor(String name) {
+        Processor processor = watchingProcessors.get(name);
+        if (processor != null && processor.running.get()) {
+            return processor;
+        }
+        synchronized (processorsLock) {
+            processor = watchingProcessors.get(name);
+            if (processor != null && processor.running.get()) {
+                return processor;
+            }
+            processor = new Processor(client.getNamespace(), name);
+            watchingProcessors.put(name, processor);
+        }
+        processor.triggerEvent(name);
+        watchingExecutor.execute(processor);
+        return watchingProcessors.get(name);
+    }
+
+    private void removeStaleProcessor(String name) {
+        synchronized (processorsLock) {
+            Processor processor = watchingProcessors.get(name);
+            if (!processor.running.get()) {
+                watchingProcessors.remove(name);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        sharedIndexInformer.run();
+    }
+
+    @Override
+    public void close() {
+        sharedIndexInformer.stop();
+        ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
this.watchingExecutor);
+    }
+
+    private class Processor implements Runnable {
+
+        private final BlockingQueue<String> queue = new 
LinkedBlockingQueue<>();
+
+        private final Object listenerQueueLock = new Object();
+        private final BlockingQueue<Pair<UUID, WatchCallbackHandler<R>>> 
listenerQueue =
+                new LinkedBlockingQueue<>();
+        private final Map<UUID, WatchCallbackHandler<R>> handlers = new 
HashMap<>();
+
+        private final String namespace;
+        private final String name;
+        private final String objKey;
+
+        private final AtomicBoolean running = new AtomicBoolean(true);
+
+        private T current;
+
+        private Processor(String namespace, String name) {
+            this.namespace = namespace;
+            this.name = name;
+            this.objKey = namespace + "/" + name;
+        }
+
+        @Override
+        public void run() {
+            log.info("Starting watching processor for {}/{}", this.namespace, 
this.name);
+            while (running.get()) {
+                try {
+                    Pair<UUID, WatchCallbackHandler<R>> listener;
+                    while ((listener = listenerQueue.poll(0, 
TimeUnit.MILLISECONDS)) != null) {
+                        addOrRemoveListener(listener);
+                    }
+
+                    String next = queue.poll(100, TimeUnit.MILLISECONDS);
+                    if (next != null) {
+                        handle();
+                    }
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    return;
+                } catch (Exception ex) {
+                    log.error("Failed to invoke event handler: {}", 
ex.getMessage());
+                }
+            }
+        }
+
+        private void addOrRemoveListener(Pair<UUID, WatchCallbackHandler<R>> 
listenerEvent) {
+            UUID uuid = listenerEvent.getLeft();
+            WatchCallbackHandler<R> handler = listenerEvent.getRight();
+            if (handler != null) {
+                log.info("Start to watch for {}, watching id:{}", objKey, 
uuid);
+                handlers.put(uuid, handler);
+                if (current != null) {
+                    
handler.onAdded(Collections.singletonList(eventWrapper.apply(current)));
+                }
+            } else {
+                handlers.remove(uuid);
+                log.info("Stop to watch for {}, watching id:{}", objKey, uuid);
+            }
+            log.debug("Current total {} watching for {}", handlers.size(), 
objKey);
+            if (handlers.size() == 0) {
+                tryToStop();
+            }
+        }
+
+        private void handle() {

Review comment:
       Compared with per-job watching, I have a concern here.
   
   Currently, each ConfigMap will have a dedicated a processor, which has 
multiple handlers for different watches. I think it could happen that the later 
registered handler misses the `onAdded` event. It might not be a problem for 
Kubernetes HA services now. But `KubernetesSharedInformer` is a common 
interface and we should document this limitation or fix it.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -105,7 +101,8 @@ public KubernetesLeaderElectionDriver(
         running = true;
         leaderElector.run();
         kubernetesWatch =
-                kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+                checkNotNull(configMapSharedWatcher, "ConfigMap Shared 
Informer")

Review comment:
       Maybe we should register the watch first and then run the 
`leaderElector`. Otherwise, it could happen that the `onAdded` event appears 
before we register the watch in `KubernetesSharedInformer`.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -467,25 +463,6 @@ public void 
testCheckAndUpdateConfigMapWhenReplaceConfigMapFailed() throws Excep
         }
     }
 
-    @Test
-    public void testWatchConfigMaps() throws Exception {
-        final String kubeConfigFile = writeKubeConfigForMockKubernetesServer();
-        flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, 
kubeConfigFile);
-
-        final FlinkKubeClient realFlinkKubeClient =
-                
FlinkKubeClientFactory.getInstance().fromConfiguration(flinkConfig, "testing");
-        realFlinkKubeClient.watchConfigMaps(CLUSTER_ID, new 
NoOpWatchCallbackHandler<>());
-        final String path =
-                "/api/v1/namespaces/"
-                        + NAMESPACE
-                        + "/configmaps?fieldSelector=metadata.name%3D"
-                        + CLUSTER_ID
-                        + "&watch=true";
-        final RecordedRequest watchRequest = server.takeRequest(TIMEOUT, 
TimeUnit.MILLISECONDS);

Review comment:
       `TIMEOUT` could be removed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to