This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d8f19bc  [FLINK-26260] Support watching specific namespaces only
d8f19bc is described below

commit d8f19bcea6d493d4cb5a30448c9f2865e6fef56b
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Feb 21 11:09:02 2022 +0100

    [FLINK-26260] Support watching specific namespaces only
---
 README.md                                          |  6 +++
 .../flink/kubernetes/operator/FlinkOperator.java   | 18 ++++----
 .../operator/controller/FlinkControllerConfig.java | 51 ++++++++++++++++++++++
 helm/flink-operator/templates/flink-operator.yaml  |  2 +
 helm/flink-operator/templates/webhook.yaml         | 10 ++++-
 helm/flink-operator/values.yaml                    |  3 ++
 6 files changed, 81 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index 26dd8fc..aecad31 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,8 @@ The operator is managed helm chart. To install run:
  helm install flink-operator .
 ```
 
+### Validating webhook
+
 In order to use the webhook for FlinkDeployment validation, you must install 
the cert-manager on the Kubernetes cluster:
 ```
 kubectl apply -f 
https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
@@ -17,6 +19,10 @@ kubectl apply -f 
https://github.com/jetstack/cert-manager/releases/download/v1.7
 
 The webhook can be disabled during helm install by passing the `--set 
webhook.create=false` parameter or editing the `values.yaml` directly.
 
+### Watching only specific namespaces
+
+The operator supports watching a specific list of namespaces for 
FlinkDeployment resources. You can enable it by setting the `--set 
watchNamespaces={flink-test}` parameter.
+
 ## User Guide
 ### Create a new Flink deployment
 The flink-operator will watch the CRD resources and submit a new Flink 
deployment once the CR is applied.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8cfdef1..9484bea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
@@ -27,7 +28,6 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
-import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory;
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkOperator.class);
-    private static final String ENV_FLINK_OPERATOR_CONF_DIR = 
"FLINK_OPERATOR_CONF_DIR";
+
+    public static final String ENV_FLINK_OPERATOR_CONF_DIR = 
"FLINK_OPERATOR_CONF_DIR";
 
     public static void main(String... args) {
 
@@ -48,11 +49,9 @@ public class FlinkOperator {
         if (namespace == null) {
             namespace = "default";
         }
-        Operator operator =
-                new Operator(
-                        client,
-                        new 
ConfigurationServiceOverrider(DefaultConfigurationService.instance())
-                                .build());
+
+        DefaultConfigurationService configurationService = 
DefaultConfigurationService.instance();
+        Operator operator = new Operator(client, configurationService);
 
         FlinkService flinkService = new FlinkService(client);
 
@@ -64,7 +63,10 @@ public class FlinkOperator {
                 new FlinkDeploymentController(
                         client, namespace, observer, jobReconciler, 
sessionReconciler);
 
-        operator.register(controller);
+        FlinkControllerConfig controllerConfig = new 
FlinkControllerConfig(controller);
+        controllerConfig.setConfigurationService(configurationService);
+
+        operator.register(controller, controllerConfig);
         operator.installShutdownHook();
         operator.start();
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
new file mode 100644
index 0000000..1cfdab5
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Custom config for {@link FlinkDeploymentController}. */
+public class FlinkControllerConfig extends 
AnnotationConfiguration<FlinkDeployment> {
+
+    public static final String ENV_WATCHED_NAMESPACES = 
"FLINK_OPERATOR_WATCH_NAMESPACES";
+
+    public FlinkControllerConfig(FlinkDeploymentController reconciler) {
+        super(reconciler);
+    }
+
+    public Set<String> getNamespaces() {
+        String watchedNamespaces = System.getenv(ENV_WATCHED_NAMESPACES);
+        Set<String> namespaces = new HashSet<>();
+
+        if (StringUtils.isEmpty(watchedNamespaces)) {
+            return namespaces;
+        }
+
+        for (String ns : watchedNamespaces.split(",")) {
+            namespaces.add(ns);
+        }
+
+        return namespaces;
+    }
+}
diff --git a/helm/flink-operator/templates/flink-operator.yaml 
b/helm/flink-operator/templates/flink-operator.yaml
index f269310..028b25e 100644
--- a/helm/flink-operator/templates/flink-operator.yaml
+++ b/helm/flink-operator/templates/flink-operator.yaml
@@ -59,6 +59,8 @@ spec:
               value: /opt/flink-operator/conf
             - name: LOG_CONFIG
               value: 
-Dlog4j.configurationFile=/opt/flink-operator/conf/log4j2.properties
+            - name: FLINK_OPERATOR_WATCH_NAMESPACES
+              value: {{ join "," .Values.watchNamespaces  }}
           volumeMounts:
             - name: flink-operator-config-volume
               mountPath: /opt/flink-operator/conf
diff --git a/helm/flink-operator/templates/webhook.yaml 
b/helm/flink-operator/templates/webhook.yaml
index 8fd8a94..66df08f 100644
--- a/helm/flink-operator/templates/webhook.yaml
+++ b/helm/flink-operator/templates/webhook.yaml
@@ -81,7 +81,7 @@ kind: ValidatingWebhookConfiguration
 metadata:
   annotations:
     cert-manager.io/inject-ca-from: {{ .Values.operatorNamespace.name 
}}/flink-operator-serving-cert
-  name: flink-operator-validating-webhook-configuration
+  name: flink-operator-{{ .Values.operatorNamespace.name 
}}-webhook-configuration
 webhooks:
 - name: vflinkdeployments.flink.apache.org
   admissionReviewVersions: ["v1"]
@@ -94,10 +94,18 @@ webhooks:
   rules:
   - apiGroups: ["*"]
     apiVersions: ["*"]
+    scope: "Namespaced"
     operations:
     - CREATE
     - UPDATE
     resources:
     - flinkdeployments
   sideEffects: None
+  {{- if .Values.watchNamespaces }}
+  namespaceSelector:
+    matchExpressions:
+      - key: kubernetes.io/metadata.name
+        operator: In
+        values: {{ .Values.watchNamespaces }}
+  {{- end }}
   {{- end }}
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml
index f65fd18..8a289e4 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/values.yaml
@@ -21,6 +21,9 @@
 operatorNamespace:
   name: default
 
+# List of kubernetes namespaces to watch for FlinkDeployment changes, empty 
means all namespaces
+# watchNamespaces: ["flink"]
+
 image:
   repository: flink-operator
   pullPolicy: IfNotPresent

Reply via email to