This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d8f19bc [FLINK-26260] Support watching specific namespaces only
d8f19bc is described below
commit d8f19bcea6d493d4cb5a30448c9f2865e6fef56b
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Feb 21 11:09:02 2022 +0100
[FLINK-26260] Support watching specific namespaces only
---
README.md | 6 +++
.../flink/kubernetes/operator/FlinkOperator.java | 18 ++++----
.../operator/controller/FlinkControllerConfig.java | 51 ++++++++++++++++++++++
helm/flink-operator/templates/flink-operator.yaml | 2 +
helm/flink-operator/templates/webhook.yaml | 10 ++++-
helm/flink-operator/values.yaml | 3 ++
6 files changed, 81 insertions(+), 9 deletions(-)
diff --git a/README.md b/README.md
index 26dd8fc..aecad31 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,8 @@ The operator is managed helm chart. To install run:
helm install flink-operator .
```
+### Validating webhook
+
In order to use the webhook for FlinkDeployment validation, you must install
the cert-manager on the Kubernetes cluster:
```
kubectl apply -f
https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
@@ -17,6 +19,10 @@ kubectl apply -f
https://github.com/jetstack/cert-manager/releases/download/v1.7
The webhook can be disabled during helm install by passing the `--set
webhook.create=false` parameter or editing the `values.yaml` directly.
+### Watching only specific namespaces
+
+The operator supports watching a specific list of namespaces for
FlinkDeployment resources. You can enable it by setting the `--set
watchNamespaces={flink-test}` parameter.
+
## User Guide
### Create a new Flink deployment
The flink-operator will watch the CRD resources and submit a new Flink
deployment once the CR is applied.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8cfdef1..9484bea 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator;
+import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
@@ -27,7 +28,6 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.javaoperatorsdk.operator.Operator;
-import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory;
/** Main Class for Flink native k8s operator. */
public class FlinkOperator {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkOperator.class);
- private static final String ENV_FLINK_OPERATOR_CONF_DIR =
"FLINK_OPERATOR_CONF_DIR";
+
+ public static final String ENV_FLINK_OPERATOR_CONF_DIR =
"FLINK_OPERATOR_CONF_DIR";
public static void main(String... args) {
@@ -48,11 +49,9 @@ public class FlinkOperator {
if (namespace == null) {
namespace = "default";
}
- Operator operator =
- new Operator(
- client,
- new
ConfigurationServiceOverrider(DefaultConfigurationService.instance())
- .build());
+
+ DefaultConfigurationService configurationService =
DefaultConfigurationService.instance();
+ Operator operator = new Operator(client, configurationService);
FlinkService flinkService = new FlinkService(client);
@@ -64,7 +63,10 @@ public class FlinkOperator {
new FlinkDeploymentController(
client, namespace, observer, jobReconciler,
sessionReconciler);
- operator.register(controller);
+ FlinkControllerConfig controllerConfig = new
FlinkControllerConfig(controller);
+ controllerConfig.setConfigurationService(configurationService);
+
+ operator.register(controller, controllerConfig);
operator.installShutdownHook();
operator.start();
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
new file mode 100644
index 0000000..1cfdab5
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkControllerConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.config.runtime.AnnotationConfiguration;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Custom config for {@link FlinkDeploymentController}. */
+public class FlinkControllerConfig extends
AnnotationConfiguration<FlinkDeployment> {
+
+ public static final String ENV_WATCHED_NAMESPACES =
"FLINK_OPERATOR_WATCH_NAMESPACES";
+
+ public FlinkControllerConfig(FlinkDeploymentController reconciler) {
+ super(reconciler);
+ }
+
+ public Set<String> getNamespaces() {
+ String watchedNamespaces = System.getenv(ENV_WATCHED_NAMESPACES);
+ Set<String> namespaces = new HashSet<>();
+
+ if (StringUtils.isEmpty(watchedNamespaces)) {
+ return namespaces;
+ }
+
+ for (String ns : watchedNamespaces.split(",")) {
+ namespaces.add(ns);
+ }
+
+ return namespaces;
+ }
+}
diff --git a/helm/flink-operator/templates/flink-operator.yaml
b/helm/flink-operator/templates/flink-operator.yaml
index f269310..028b25e 100644
--- a/helm/flink-operator/templates/flink-operator.yaml
+++ b/helm/flink-operator/templates/flink-operator.yaml
@@ -59,6 +59,8 @@ spec:
value: /opt/flink-operator/conf
- name: LOG_CONFIG
value:
-Dlog4j.configurationFile=/opt/flink-operator/conf/log4j2.properties
+ - name: FLINK_OPERATOR_WATCH_NAMESPACES
+ value: {{ join "," .Values.watchNamespaces }}
volumeMounts:
- name: flink-operator-config-volume
mountPath: /opt/flink-operator/conf
diff --git a/helm/flink-operator/templates/webhook.yaml
b/helm/flink-operator/templates/webhook.yaml
index 8fd8a94..66df08f 100644
--- a/helm/flink-operator/templates/webhook.yaml
+++ b/helm/flink-operator/templates/webhook.yaml
@@ -81,7 +81,7 @@ kind: ValidatingWebhookConfiguration
metadata:
annotations:
cert-manager.io/inject-ca-from: {{ .Values.operatorNamespace.name
}}/flink-operator-serving-cert
- name: flink-operator-validating-webhook-configuration
+ name: flink-operator-{{ .Values.operatorNamespace.name
}}-webhook-configuration
webhooks:
- name: vflinkdeployments.flink.apache.org
admissionReviewVersions: ["v1"]
@@ -94,10 +94,18 @@ webhooks:
rules:
- apiGroups: ["*"]
apiVersions: ["*"]
+ scope: "Namespaced"
operations:
- CREATE
- UPDATE
resources:
- flinkdeployments
sideEffects: None
+ {{- if .Values.watchNamespaces }}
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: In
+ values: {{ .Values.watchNamespaces }}
+ {{- end }}
{{- end }}
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml
index f65fd18..8a289e4 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/values.yaml
@@ -21,6 +21,9 @@
operatorNamespace:
name: default
+# List of kubernetes namespaces to watch for FlinkDeployment changes, empty
means all namespaces
+# watchNamespaces: ["flink"]
+
image:
repository: flink-operator
pullPolicy: IfNotPresent