This is an automated email from the ASF dual-hosted git repository. wangyang0918 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new a7b8a76 [FLINK-27098] Use namespaced Kubernetes client when creating InformerEventSource in session job controller a7b8a76 is described below commit a7b8a7679ee44141d9cf5a97d49d8afe8308532c Author: Hao Xin <haoxi...@gmail.com> AuthorDate: Wed Apr 6 23:15:20 2022 +0800 [FLINK-27098] Use namespaced Kubernetes client when creating InformerEventSource in session job controller This closes #157. --- .../controller/FlinkSessionJobController.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index b905b13..12d19f3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -29,7 +29,9 @@ import org.apache.flink.kubernetes.operator.utils.OperatorUtils; import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator; import org.apache.flink.util.Preconditions; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; @@ -148,18 +150,29 @@ public class FlinkSessionJobController Preconditions.checkNotNull(controllerConfig, "Controller config cannot be null"); Set<String> effectiveNamespaces = controllerConfig.getEffectiveNamespaces(); if (effectiveNamespaces.isEmpty()) { - return List.of(createFlinkDepInformerEventSource(ALL_NAMESPACE)); + return List.of( + createFlinkDepInformerEventSource( + kubernetesClient.resources(FlinkDeployment.class).inAnyNamespace(), + ALL_NAMESPACE)); } else { return effectiveNamespaces.stream() - .map(this::createFlinkDepInformerEventSource) + .map( + name -> + createFlinkDepInformerEventSource( + kubernetesClient + .resources(FlinkDeployment.class) + .inNamespace(name), + name)) .collect(Collectors.toList()); } } private InformerEventSource<FlinkDeployment, FlinkSessionJob> createFlinkDepInformerEventSource( + FilterWatchListDeletable<FlinkDeployment, KubernetesResourceList<FlinkDeployment>> + filteredClient, String name) { return new InformerEventSource<>( - kubernetesClient.resources(FlinkDeployment.class).runnableInformer(0), + filteredClient.runnableInformer(0), primaryResourceRetriever(), sessionJob -> new ResourceID(