This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 460e179  [FLINK-20798][k8s] Use namespaced kubernetes client when 
creating FlinkKubeClient
460e179 is described below

commit 460e1799d2b9069835ecae295c75f571c7d6b65a
Author: wangyang0918 <[email protected]>
AuthorDate: Wed Jan 6 20:24:11 2021 +0800

    [FLINK-20798][k8s] Use namespaced kubernetes client when creating 
FlinkKubeClient
    
    After using namespaced kubernetes client, we will not need to always set 
the namespace when creating kubernetes resources(e.g. deployment, pods, 
configmap, watch, etc.).
    
    Address comments: Update the unit test to verify the configmap watch is 
created in appropriate namespace
    
    This closes #14570.
---
 .../kubeclient/DefaultKubeClientFactory.java       |  8 +++-
 .../kubeclient/Fabric8FlinkKubeClient.java         | 55 ++++------------------
 .../resources/KubernetesLeaderElector.java         |  3 +-
 .../flink/kubernetes/KubernetesTestBase.java       | 37 ++++++++++++++-
 .../flink/kubernetes/MixedKubernetesServer.java    | 17 +++++--
 .../kubeclient/Fabric8FlinkKubeClientTest.java     | 24 ++++++++++
 .../kubeclient/TestingFlinkKubeClient.java         |  5 +-
 .../resources/NoOpWatchCallbackHandler.java        | 55 ++++++++++++++++++++++
 8 files changed, 147 insertions(+), 57 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
index 59f157a..e7b5759 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
@@ -25,8 +25,8 @@ import org.apache.flink.util.FileUtils;
 
 import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +83,11 @@ public class DefaultKubeClientFactory implements 
KubeClientFactory {
             config = Config.autoConfigure(kubeContext);
         }
 
-        final KubernetesClient client = new DefaultKubernetesClient(config);
+        final String namespace = 
flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
+        LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
+        config.setNamespace(namespace);
+
+        final NamespacedKubernetesClient client = new 
DefaultKubernetesClient(config);
 
         return new Fabric8FlinkKubeClient(flinkConfig, client, () -> 
ioExecutor);
     }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index bd2943c..779b369 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -44,7 +44,6 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.ServicePort;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.slf4j.Logger;
@@ -69,7 +68,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
 
-    private final KubernetesClient internalClient;
+    private final NamespacedKubernetesClient internalClient;
     private final String clusterId;
     private final String namespace;
     private final int maxRetryAttempts;
@@ -78,7 +77,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
 
     public Fabric8FlinkKubeClient(
             Configuration flinkConfig,
-            KubernetesClient client,
+            NamespacedKubernetesClient client,
             Supplier<Executor> asyncExecutorFactory) {
         this.internalClient = checkNotNull(client);
         this.clusterId = 
checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
@@ -100,19 +99,12 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
         // create Deployment
         LOG.debug("Start to create deployment with spec {}", 
deployment.getSpec().toString());
         final Deployment createdDeployment =
-                this.internalClient
-                        .apps()
-                        .deployments()
-                        .inNamespace(this.namespace)
-                        .create(deployment);
+                this.internalClient.apps().deployments().create(deployment);
 
         // Note that we should use the uid of the created Deployment for the 
OwnerReference.
         setOwnerReference(createdDeployment, accompanyingResources);
 
-        this.internalClient
-                .resourceList(accompanyingResources)
-                .inNamespace(this.namespace)
-                .createOrReplace();
+        
this.internalClient.resourceList(accompanyingResources).createOrReplace();
     }
 
     @Override
@@ -123,7 +115,6 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                             this.internalClient
                                     .apps()
                                     .deployments()
-                                    .inNamespace(this.namespace)
                                     
.withName(KubernetesUtils.getDeploymentName(clusterId))
                                     .get();
 
@@ -146,10 +137,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                             kubernetesPod.getInternalResource().getMetadata(),
                             kubernetesPod.getInternalResource().getSpec());
 
-                    this.internalClient
-                            .pods()
-                            .inNamespace(this.namespace)
-                            .create(kubernetesPod.getInternalResource());
+                    
this.internalClient.pods().create(kubernetesPod.getInternalResource());
                 },
                 kubeClientExecutorService);
     }
@@ -201,7 +189,6 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
         this.internalClient
                 .apps()
                 .deployments()
-                .inNamespace(this.namespace)
                 .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
@@ -217,12 +204,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
         final String serviceName = 
ExternalServiceDecorator.getExternalServiceName(clusterId);
 
         final Service service =
-                this.internalClient
-                        .services()
-                        .inNamespace(namespace)
-                        .withName(serviceName)
-                        .fromServer()
-                        .get();
+                
this.internalClient.services().withName(serviceName).fromServer().get();
 
         if (service == null) {
             LOG.debug("Service {} does not exist", serviceName);
@@ -247,10 +229,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
             KubernetesLeaderElectionConfiguration leaderElectionConfiguration,
             KubernetesLeaderElector.LeaderCallbackHandler 
leaderCallbackHandler) {
         return new KubernetesLeaderElector(
-                (NamespacedKubernetesClient) this.internalClient,
-                namespace,
-                leaderElectionConfiguration,
-                leaderCallbackHandler);
+                this.internalClient, leaderElectionConfiguration, 
leaderCallbackHandler);
     }
 
     @Override
@@ -260,7 +239,6 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                         () ->
                                 this.internalClient
                                         .configMaps()
-                                        .inNamespace(namespace)
                                         
.create(configMap.getInternalResource()),
                         kubeClientExecutorService)
                 .exceptionally(
@@ -274,8 +252,7 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
 
     @Override
     public Optional<KubernetesConfigMap> getConfigMap(String name) {
-        final ConfigMap configMap =
-                
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+        final ConfigMap configMap = 
this.internalClient.configMaps().withName(name).get();
         return configMap == null
                 ? Optional.empty()
                 : Optional.of(new KubernetesConfigMap(configMap));
@@ -299,8 +276,6 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
                                                                                
         this
                                                                                
                 .internalClient
                                                                                
                 .configMaps()
-                                                                               
                 .inNamespace(
-                                                                               
                         namespace)
                                                                                
                 .withName(
                                                                                
                         configMapName)
                                                                                
                 .lockResourceVersion(
@@ -353,24 +328,14 @@ public class Fabric8FlinkKubeClient implements 
FlinkKubeClient {
     @Override
     public CompletableFuture<Void> deleteConfigMapsByLabels(Map<String, 
String> labels) {
         return CompletableFuture.runAsync(
-                () ->
-                        this.internalClient
-                                .configMaps()
-                                .inNamespace(namespace)
-                                .withLabels(labels)
-                                .delete(),
+                () -> 
this.internalClient.configMaps().withLabels(labels).delete(),
                 kubeClientExecutorService);
     }
 
     @Override
     public CompletableFuture<Void> deleteConfigMap(String configMapName) {
         return CompletableFuture.runAsync(
-                () ->
-                        this.internalClient
-                                .configMaps()
-                                .inNamespace(namespace)
-                                .withName(configMapName)
-                                .delete(),
+                () -> 
this.internalClient.configMaps().withName(configMapName).delete(),
                 kubeClientExecutorService);
     }
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index abdf0e2..82a0bf9 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -61,7 +61,6 @@ public class KubernetesLeaderElector {
 
     public KubernetesLeaderElector(
             NamespacedKubernetesClient kubernetesClient,
-            String namespace,
             KubernetesLeaderElectionConfiguration leaderConfig,
             LeaderCallbackHandler leaderCallbackHandler) {
         final LeaderElectionConfig leaderElectionConfig =
@@ -70,7 +69,7 @@ public class KubernetesLeaderElector {
                         .withLeaseDuration(leaderConfig.getLeaseDuration())
                         .withLock(
                                 new ConfigMapLock(
-                                        namespace,
+                                        kubernetesClient.getNamespace(),
                                         leaderConfig.getConfigMapName(),
                                         leaderConfig.getLockIdentity()))
                         .withRenewDeadline(leaderConfig.getRenewDeadline())
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
index 7f452ee..4b9b26c 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
@@ -31,7 +31,12 @@ import 
org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.TestLogger;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.api.model.Config;
+import io.fabric8.kubernetes.api.model.ConfigBuilder;
+import io.fabric8.kubernetes.api.model.NamedClusterBuilder;
+import io.fabric8.kubernetes.api.model.NamedContextBuilder;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.utils.Serialization;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -66,7 +71,7 @@ public class KubernetesTestBase extends TestLogger {
 
     protected final Configuration flinkConfig = new Configuration();
 
-    protected KubernetesClient kubeClient;
+    protected NamespacedKubernetesClient kubeClient;
 
     protected FlinkKubeClient flinkKubeClient;
 
@@ -132,4 +137,32 @@ public class KubernetesTestBase extends TestLogger {
         KubernetesTestUtils.createTemporyFile("some keytab", kerberosDir, 
KEYTAB_FILE);
         KubernetesTestUtils.createTemporyFile("some conf", kerberosDir, 
KRB5_CONF_FILE);
     }
+
+    protected String writeKubeConfigForMockKubernetesServer() throws Exception 
{
+        final Config kubeConfig =
+                new ConfigBuilder()
+                        .withApiVersion(server.getClient().getApiVersion())
+                        .withClusters(
+                                new NamedClusterBuilder()
+                                        .withName(CLUSTER_ID)
+                                        .withNewCluster()
+                                        
.withNewServer(server.getClient().getMasterUrl().toString())
+                                        .withInsecureSkipTlsVerify(true)
+                                        .endCluster()
+                                        .build())
+                        .withContexts(
+                                new NamedContextBuilder()
+                                        .withName(CLUSTER_ID)
+                                        .withNewContext()
+                                        .withCluster(CLUSTER_ID)
+                                        .withUser(
+                                                
server.getClient().getConfiguration().getUsername())
+                                        .endContext()
+                                        .build())
+                        .withNewCurrentContext(CLUSTER_ID)
+                        .build();
+        final File kubeConfigFile = new 
File(temporaryFolder.newFolder(".kube"), "config");
+        Serialization.yamlMapper().writeValue(kubeConfigFile, kubeConfig);
+        return kubeConfigFile.getAbsolutePath();
+    }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
index 57b9a9d..acdf697 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
@@ -25,23 +25,28 @@ import io.fabric8.mockwebserver.ServerRequest;
 import io.fabric8.mockwebserver.ServerResponse;
 import io.fabric8.mockwebserver.dsl.MockServerExpectation;
 import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.rules.ExternalResource;
 
 import java.util.HashMap;
 import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 
 /** The mock server that host MixedDispatcher. */
 public class MixedKubernetesServer extends ExternalResource {
 
     private KubernetesMockServer mock;
     private NamespacedKubernetesClient client;
-    private boolean https;
+    private final boolean https;
 
-    private boolean crudMode;
+    private final boolean crudMode;
+
+    private final MockWebServer mockWebServer;
 
     public MixedKubernetesServer(boolean https, boolean crudMode) {
         this.https = https;
         this.crudMode = crudMode;
+        mockWebServer = new MockWebServer();
     }
 
     public void before() {
@@ -50,11 +55,11 @@ public class MixedKubernetesServer extends ExternalResource 
{
                 crudMode
                         ? new KubernetesMockServer(
                                 new Context(),
-                                new MockWebServer(),
+                                mockWebServer,
                                 response,
                                 new MixedDispatcher(response),
                                 true)
-                        : new KubernetesMockServer(https);
+                        : new KubernetesMockServer(mockWebServer, response, 
https);
         mock.init();
         client = mock.createClient();
     }
@@ -68,6 +73,10 @@ public class MixedKubernetesServer extends ExternalResource {
         return client;
     }
 
+    public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws 
Exception {
+        return mockWebServer.takeRequest(timeout, unit);
+    }
+
     public MockServerExpectation expect() {
         return mock.expect();
     }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
index 3c49e03..b0195ef 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
@@ -33,6 +33,8 @@ import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactor
 import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import 
org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
@@ -42,6 +44,7 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
+import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -50,6 +53,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
@@ -64,6 +68,7 @@ import static org.junit.Assert.fail;
 
 /** Tests for Fabric implementation of {@link FlinkKubeClient}. */
 public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
+    private static final long TIMEOUT = 10 * 1000;
     private static final int RPC_PORT = 7123;
     private static final int BLOB_SERVER_PORT = 8346;
 
@@ -456,6 +461,25 @@ public class Fabric8FlinkKubeClientTest extends 
KubernetesClientTestBase {
         }
     }
 
+    @Test
+    public void testWatchConfigMaps() throws Exception {
+        final String kubeConfigFile = writeKubeConfigForMockKubernetesServer();
+        flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, 
kubeConfigFile);
+
+        final FlinkKubeClient realFlinkKubeClient =
+                
DefaultKubeClientFactory.getInstance().fromConfiguration(flinkConfig);
+        realFlinkKubeClient.watchConfigMaps(CLUSTER_ID, new 
NoOpWatchCallbackHandler<>());
+        final String path =
+                "/api/v1/namespaces/"
+                        + NAMESPACE
+                        + "/configmaps?fieldSelector=metadata.name%3D"
+                        + CLUSTER_ID
+                        + "&watch=true";
+        final RecordedRequest watchRequest = server.takeRequest(TIMEOUT, 
TimeUnit.MILLISECONDS);
+        assertThat(watchRequest.getPath(), is(path));
+        assertThat(watchRequest.getMethod(), 
is(HttpMethodWrapper.GET.toString()));
+    }
+
     private KubernetesConfigMap buildTestingConfigMap() {
         final Map<String, String> data = new HashMap<>();
         data.put(TESTING_CONFIG_MAP_KEY, TESTING_CONFIG_MAP_VALUE);
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
index 5879dcd..c046613 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.Preconditions;
 
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -413,12 +415,11 @@ public class TestingFlinkKubeClient implements 
FlinkKubeClient {
 
     /** Testing implementation of {@link KubernetesLeaderElector}. */
     public static class TestingKubernetesLeaderElector extends 
KubernetesLeaderElector {
-        private static final String NAMESPACE = "test";
 
         public TestingKubernetesLeaderElector(
                 KubernetesLeaderElectionConfiguration leaderConfig,
                 LeaderCallbackHandler leaderCallbackHandler) {
-            super(null, NAMESPACE, leaderConfig, leaderCallbackHandler);
+            super(new KubernetesMockServer().createClient(), leaderConfig, 
leaderCallbackHandler);
         }
 
         @Override
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
new file mode 100644
index 0000000..6e64a4d
--- /dev/null
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import java.util.List;
+
+/**
+ * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}.
+ *
+ * @param <T> Type of resource to be watched
+ */
+public class NoOpWatchCallbackHandler<T> implements 
FlinkKubeClient.WatchCallbackHandler<T> {
+    @Override
+    public void onAdded(List<T> resources) {
+        // noop
+    }
+
+    @Override
+    public void onModified(List<T> resources) {
+        // noop
+    }
+
+    @Override
+    public void onDeleted(List<T> resources) {
+        // noop
+    }
+
+    @Override
+    public void onError(List<T> resources) {
+        // noop
+    }
+
+    @Override
+    public void handleFatalError(Throwable throwable) {
+        // noop
+    }
+}

Reply via email to