This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a7b8a76  [FLINK-27098] Use namespaced Kubernetes client when creating 
InformerEventSource in session job controller
a7b8a76 is described below

commit a7b8a7679ee44141d9cf5a97d49d8afe8308532c
Author: Hao Xin <haoxi...@gmail.com>
AuthorDate: Wed Apr 6 23:15:20 2022 +0800

    [FLINK-27098] Use namespaced Kubernetes client when creating 
InformerEventSource in session job controller
    
    This closes #157.
---
 .../controller/FlinkSessionJobController.java         | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index b905b13..12d19f3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -29,7 +29,9 @@ import 
org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 import org.apache.flink.util.Preconditions;
 
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
 import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
@@ -148,18 +150,29 @@ public class FlinkSessionJobController
         Preconditions.checkNotNull(controllerConfig, "Controller config cannot 
be null");
         Set<String> effectiveNamespaces = 
controllerConfig.getEffectiveNamespaces();
         if (effectiveNamespaces.isEmpty()) {
-            return List.of(createFlinkDepInformerEventSource(ALL_NAMESPACE));
+            return List.of(
+                    createFlinkDepInformerEventSource(
+                            
kubernetesClient.resources(FlinkDeployment.class).inAnyNamespace(),
+                            ALL_NAMESPACE));
         } else {
             return effectiveNamespaces.stream()
-                    .map(this::createFlinkDepInformerEventSource)
+                    .map(
+                            name ->
+                                    createFlinkDepInformerEventSource(
+                                            kubernetesClient
+                                                    
.resources(FlinkDeployment.class)
+                                                    .inNamespace(name),
+                                            name))
                     .collect(Collectors.toList());
         }
     }
 
     private InformerEventSource<FlinkDeployment, FlinkSessionJob> 
createFlinkDepInformerEventSource(
+            FilterWatchListDeletable<FlinkDeployment, 
KubernetesResourceList<FlinkDeployment>>
+                    filteredClient,
             String name) {
         return new InformerEventSource<>(
-                
kubernetesClient.resources(FlinkDeployment.class).runnableInformer(0),
+                filteredClient.runnableInformer(0),
                 primaryResourceRetriever(),
                 sessionJob ->
                         new ResourceID(

Reply via email to