This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 30566ed7 [Flink 31966] Flink Kubernetes operator lacks TLS support
30566ed7 is described below

commit 30566ed7390a4fce1dc3e239efe3c519c55aa9b1
Author: Tony Garrard <garr...@uk.ibm.com>
AuthorDate: Thu Dec 7 12:16:39 2023 +0000

    [Flink 31966] Flink Kubernetes operator lacks TLS support
    
    Signed-off-by: A. Garrard <garr...@uk.ibm.com>
---
 docs/content/docs/operations/helm.md               |   4 +
 examples/README.md                                 |   5 +
 .../basic-secure-deployment-only.yaml              |  49 ++++
 .../basic-secure-session-job-only.yaml             |  42 +++
 examples/flink-tls-example/basic-secure.yaml       |  47 +++
 examples/flink-tls-example/pre-install.yaml        | 106 +++++++
 .../operator/service/AbstractFlinkService.java     |  68 ++++-
 .../operator/service/NativeFlinkService.java       |   2 +-
 .../operator/service/StandaloneFlinkService.java   |   2 +-
 .../flink/kubernetes/operator/utils/EnvUtils.java  |   3 +
 .../kubernetes/operator/TestingFlinkService.java   |   2 +-
 .../operator/service/AbstractFlinkServiceTest.java |   2 +-
 .../operator/service/SecureFlinkServiceTest.java   | 319 +++++++++++++++++++++
 .../src/test/resources/keystore.jks                | Bin 0 -> 3018 bytes
 .../src/test/resources/truststore.jks              | Bin 0 -> 861 bytes
 .../templates/flink-operator.yaml                  |  21 ++
 helm/flink-kubernetes-operator/values.yaml         |   8 +
 17 files changed, 671 insertions(+), 9 deletions(-)

diff --git a/docs/content/docs/operations/helm.md 
b/docs/content/docs/operations/helm.md
index bb84a2b5..dee6efd0 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -111,6 +111,10 @@ The configurable parameters of the Helm chart and which 
default values as detail
 | operatorHealth.livenessProbe                   | Liveness probe 
configuration for the operator using the health endpoint. Only time settings 
should be configured, endpoint is set automatically based on port. |            
                                                                                
                                                                                
                                                                                
                              [...]
 | operatorHealth.startupProbe                    | Startup probe configuration 
for the operator using the health endpoint. Only time settings should be 
configured, endpoint is set automatically based on port.  |                     
                                                                                
                                                                                
                                                                                
                     [...]
 | postStart                                      | The postStart hook 
configuration for the main container.                                           
                                                            |                   
                                                                                
                                                                                
                                                                                
                       [...]
+| tls.create                                     | Whether to mount an 
optional secret containing a tls truststore for the flink-kubernetes-operator.  
                                                           | false              
                                                                                
                                                                                
                                                                                
                      [...]
+| tls.secretName                                 | The name of the tls secret  
                                                                                
                                                   | flink-operator-cert        
                                                                                
                                                                                
                                                                                
              [...]
+| tls.secretKeyRef.name                 | The name of the secret containing 
the password for the java keystore/truststore                                   
                                             | operator-certificate-password    
                                                                                
                                                                                
                                                                                
              |
+| tls.secretKeyRef.key                  | The key that holds this password     
                                                                                
                                          | password                            
                                                                                
                                                                                
                                                                                
           |
 
 For more information check the [Helm 
documentation](https://helm.sh/docs/helm/helm_install/).
 
diff --git a/examples/README.md b/examples/README.md
index 7200c801..473450b4 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -169,3 +169,8 @@ how to override default values using 
[kustomize](https://kustomize.io/)
 For the detailed description of advanced configuration techniques follow this 
 
[link](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/#advanced-customization-techniques).
 
+#### Enabling TLS on your deployments
+
+In order for the operator to communicate with the rest service of a deployment 
you need to mount a jks/pkcs12 secret onto the operator that uses the same ca 
certificate as those used by your deployments.
+N.b. Make sure you use the same mount location and keystore password for your 
deployment as you have for the operator
+[This](flink-tls-example) example provides a pre-install.yaml file that you 
would need to apply to your cluster before helm installing your cluster using 
the value tls.create=true. It creates an issuer that is CA Certificate backed 
that can be used for both the operator and the examples provided
\ No newline at end of file
diff --git a/examples/flink-tls-example/basic-secure-deployment-only.yaml 
b/examples/flink-tls-example/basic-secure-deployment-only.yaml
new file mode 100644
index 00000000..253dfe47
--- /dev/null
+++ b/examples/flink-tls-example/basic-secure-deployment-only.yaml
@@ -0,0 +1,49 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: basic-secure-deployment-only
+spec:
+  image: flink:1.17
+  flinkVersion: v1_17
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+    security.ssl.internal.enabled: 'true'
+    security.ssl.internal.keystore: /opt/flink/tls-cert/keystore.jks
+    security.ssl.internal.keystore-password: password1234
+    security.ssl.internal.truststore: /opt/flink/tls-cert/truststore.jks
+    security.ssl.internal.key-password: password1234
+    security.ssl.internal.truststore-password: password1234
+    security.ssl.rest.enabled: 'true'
+    security.ssl.rest.keystore: /opt/flink/tls-cert/keystore.jks
+    security.ssl.rest.keystore-password: password1234
+    security.ssl.rest.truststore: /opt/flink/tls-cert/truststore.jks
+    security.ssl.rest.key-password: password1234
+    security.ssl.rest.truststore-password: password1234
+    kubernetes.secrets: 'basic-secure-cert:/opt/flink/tls-cert'
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
diff --git a/examples/flink-tls-example/basic-secure-session-job-only.yaml 
b/examples/flink-tls-example/basic-secure-session-job-only.yaml
new file mode 100644
index 00000000..60b85113
--- /dev/null
+++ b/examples/flink-tls-example/basic-secure-session-job-only.yaml
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkSessionJob
+metadata:
+  name: basic-secure-session-job-only
+spec:
+  flinkConfiguration:
+    security.ssl.internal.enabled: 'true'
+    security.ssl.internal.keystore: /opt/flink/tls-cert/keystore.jks
+    security.ssl.internal.keystore-password: password1234
+    security.ssl.internal.truststore: /opt/flink/tls-cert/truststore.jks
+    security.ssl.internal.key-password: password1234
+    security.ssl.internal.truststore-password: password1234
+    security.ssl.rest.keystore: /opt/flink/tls-cert/keystore.jks
+    security.ssl.rest.truststore: /opt/flink/tls-cert/truststore.jks
+    security.ssl.rest.key-password: password1234
+    security.ssl.rest.truststore-password: password1234
+    security.ssl.rest.enabled: 'true'
+    security.ssl.rest.keystore-password: password1234
+    kubernetes.secrets: 'basic-secure-cert:/opt/flink/tls-cert'
+  deploymentName: basic-secure-deployment-only
+  job:
+    jarURI: 
https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
+    parallelism: 4
+    upgradeMode: stateless
+
diff --git a/examples/flink-tls-example/basic-secure.yaml 
b/examples/flink-tls-example/basic-secure.yaml
new file mode 100644
index 00000000..a5245ca3
--- /dev/null
+++ b/examples/flink-tls-example/basic-secure.yaml
@@ -0,0 +1,47 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: basic-secure
+spec:
+  image: flink:1.17
+  flinkVersion: v1_17
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+    security.ssl.enabled: 'true'
+    security.ssl.truststore: /opt/flink/tls-cert/truststore.jks
+    security.ssl.truststore-password: password1234
+    security.ssl.keystore: /opt/flink/tls-cert/keystore.jks
+    security.ssl.keystore-password: password1234
+    security.ssl.key-password: password1234
+    kubernetes.secrets: 'basic-secure-cert:/opt/flink/tls-cert'
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    upgradeMode: stateless
\ No newline at end of file
diff --git a/examples/flink-tls-example/pre-install.yaml 
b/examples/flink-tls-example/pre-install.yaml
new file mode 100644
index 00000000..235a2bdb
--- /dev/null
+++ b/examples/flink-tls-example/pre-install.yaml
@@ -0,0 +1,106 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+---
+apiVersion: v1
+kind: Secret
+metadata:
+  name: basic-secure-cert-secret-creds
+type: Opaque
+data:
+  password: cGFzc3dvcmQxMjM0
+---
+apiVersion: v1
+kind: Secret
+metadata:
+  name: operator-certificate-password
+type: Opaque
+data:
+  password: cGFzc3dvcmQxMjM0
+---
+apiVersion: cert-manager.io/v1
+kind: Issuer
+metadata:
+  name: flink-selfsigned-issuer
+spec:
+  selfSigned: {}
+---
+apiVersion: cert-manager.io/v1
+kind: Certificate
+metadata:
+  name: flink-ca-cert
+spec:
+  issuerRef:
+    kind: Issuer
+    name: flink-selfsigned-issuer
+  commonName: FlinkCA
+  isCA: true
+  secretName: flink-ca-cert
+  subject:
+    organizations:
+      - Apache Flink
+---
+apiVersion: cert-manager.io/v1
+kind: Issuer
+metadata:
+  name: flink-ca-issuer
+spec:
+  ca:
+    secretName: flink-ca-cert
+---
+apiVersion: cert-manager.io/v1
+kind: Certificate
+metadata:
+  name: basic-secure-cert
+spec:
+  dnsNames:
+    - '*.flink.svc'
+    - '*.svc.cluster.local'
+    - 'basic-secure-rest'
+    - 'basic-secure-deployment-only-rest'
+  keystores:
+    jks:
+      create: true
+      passwordSecretRef:
+        name: basic-secure-cert-secret-creds
+        key: password
+  issuerRef:
+    kind: Issuer
+    name: flink-ca-issuer
+  commonName: FlinkDeployment
+  secretName: basic-secure-cert
+---
+apiVersion: cert-manager.io/v1
+kind: Certificate
+metadata:
+  name: flink-operator-cert
+spec:
+  dnsNames:
+    - '*.flink.svc'
+    - '*.svc.cluster.local'
+  keystores:
+    jks:
+      create: true
+      passwordSecretRef:
+        name: operator-certificate-password
+        key: password
+  issuerRef:
+    kind: Issuer
+    name: flink-ca-issuer
+  commonName: FlinkOperator
+  secretName: flink-operator-cert
+---
\ No newline at end of file
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 3c4fe4aa..cd5a1137 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import 
org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -46,6 +47,7 @@ import 
org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
 import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -90,7 +92,6 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
@@ -116,6 +117,7 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -133,7 +135,6 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
-import static 
org.apache.flink.runtime.rest.messages.queue.QueueStatus.Id.IN_PROGRESS;
 
 /**
  * An abstract {@link FlinkService} containing some common implementations for 
the native and
@@ -170,6 +171,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
     protected abstract void deployApplicationCluster(JobSpec jobSpec, 
Configuration conf)
             throws Exception;
 
+    protected abstract void deploySessionCluster(Configuration conf) throws 
Exception;
+
     @Override
     public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
@@ -199,6 +202,11 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         deployApplicationCluster(jobSpec, removeOperatorConfigs(conf));
     }
 
+    @Override
+    public void submitSessionCluster(Configuration conf) throws Exception {
+        deploySessionCluster(conf);
+    }
+
     @Override
     public boolean isHaMetadataAvailable(Configuration conf) {
         if (FlinkUtils.isKubernetesHAActivated(conf)) {
@@ -753,6 +761,10 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
         final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
         final int port = conf.getInteger(RestOptions.PORT);
+        Configuration operatorRestConf = conf;
+        if (SecurityOptions.isRestSSLEnabled(conf)) {
+            operatorRestConf = getOperatorRestConfig(conf);
+        }
         final String host =
                 ObjectUtils.firstNonNull(
                         operatorConfig.getFlinkServiceHostOverride(),
@@ -761,7 +773,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         final String restServerAddress = String.format("http://%s:%s";, host, 
port);
         LOG.debug("Creating RestClusterClient({})", restServerAddress);
         return new RestClusterClient<>(
-                conf, clusterId, (c, e) -> new 
StandaloneClientHAServices(restServerAddress));
+                operatorRestConf,
+                clusterId,
+                (c, e) -> new StandaloneClientHAServices(restServerAddress));
     }
 
     @VisibleForTesting
@@ -838,8 +852,12 @@ public abstract class AbstractFlinkService implements 
FlinkService {
     }
 
     @VisibleForTesting
-    protected RestClient getRestClient(Configuration conf) throws 
ConfigurationException {
-        return new RestClient(conf, executorService);
+    protected RestClient getRestClient(Configuration conf) throws Exception {
+        Configuration operatorRestConf = conf;
+        if (SecurityOptions.isRestSSLEnabled(conf)) {
+            operatorRestConf = getOperatorRestConfig(operatorRestConf);
+        }
+        return new RestClient(operatorRestConf, executorService);
     }
 
     private String findJarURI(JobSpec jobSpec) {
@@ -1077,4 +1095,44 @@ public abstract class AbstractFlinkService implements 
FlinkService {
             status.getJobStatus().setState(JobStatus.FINISHED.name());
         }
     }
+
+    private Configuration getOperatorRestConfig(Configuration origConfig) 
throws IOException {
+        Configuration conf = new Configuration(origConfig);
+        EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)
+                .ifPresent(
+                        path -> {
+                            if (Files.notExists(Paths.get(path))) {
+                                return;
+                            }
+                            conf.set(
+                                    SecurityOptions.SSL_REST_TRUSTSTORE,
+                                    
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH));
+                            conf.set(
+                                    
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD,
+                                    
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                            if 
(SecurityOptions.isRestSSLAuthenticationEnabled(conf)
+                                    && 
EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)
+                                            .isPresent()) {
+                                conf.set(
+                                        SecurityOptions.SSL_REST_KEYSTORE,
+                                        
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH));
+                                conf.set(
+                                        
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD,
+                                        EnvUtils.getRequired(
+                                                
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                                conf.set(
+                                        SecurityOptions.SSL_REST_KEY_PASSWORD,
+                                        EnvUtils.getRequired(
+                                                
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                            } else {
+                                
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
+                                
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
+                            }
+                            conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
+                            
conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+                            conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
+                            
conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+                        });
+        return conf;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 27d5a265..9781a5b0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -106,7 +106,7 @@ public class NativeFlinkService extends 
AbstractFlinkService {
     }
 
     @Override
-    public void submitSessionCluster(Configuration conf) throws Exception {
+    public void deploySessionCluster(Configuration conf) throws Exception {
         submitClusterInternal(removeOperatorConfigs(conf));
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 55fa11d3..c0439d04 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -74,7 +74,7 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
     }
 
     @Override
-    public void submitSessionCluster(Configuration conf) throws Exception {
+    public void deploySessionCluster(Configuration conf) throws Exception {
         LOG.info("Deploying session cluster");
         submitClusterInternal(removeOperatorConfigs(conf), Mode.SESSION);
         LOG.info("Session cluster successfully deployed");
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
index 39a2aaf8..e978b0fc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
@@ -56,6 +56,9 @@ public class EnvUtils {
     public static final String ENV_OPERATOR_NAME = "OPERATOR_NAME";
     public static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE";
     public static final String ENV_WATCH_NAMESPACES = "WATCH_NAMESPACES";
+    public static final String ENV_OPERATOR_KEYSTORE_PASSWORD = 
"OPERATOR_KEYSTORE_PASSWORD";
+    public static final String ENV_OPERATOR_KEYSTORE_PATH = 
"OPERATOR_KEYSTORE_PATH";
+    public static final String ENV_OPERATOR_TRUSTSTORE_PATH = 
"OPERATOR_TRUSTSTORE_PATH";
 
     private static final String PROP_FILE = 
".flink-kubernetes-operator.version.properties";
     private static final String FAIL_MESSAGE =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 94453b06..fe61cc67 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -240,7 +240,7 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     }
 
     @Override
-    public void submitSessionCluster(Configuration conf) throws Exception {
+    public void deploySessionCluster(Configuration conf) throws Exception {
         if (deployFailure) {
             throw new Exception("Deployment failure");
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index a108b8bb..86c1933e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -1073,7 +1073,7 @@ public class AbstractFlinkServiceTest {
         }
 
         @Override
-        public void submitSessionCluster(Configuration conf) {
+        public void deploySessionCluster(Configuration conf) {
             throw new UnsupportedOperationException();
         }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.java
new file mode 100644
index 00000000..08b8e933
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
+import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * @link FlinkService unit tests
+ */
+@EnableKubernetesMockClient(crud = true)
+public class SecureFlinkServiceTest {
+    KubernetesClient client;
+    private final Configuration configuration = new Configuration();
+
+    private final EventCollector eventCollector = new EventCollector();
+
+    private EventRecorder eventRecorder;
+    private FlinkOperatorConfiguration operatorConfig;
+    private ExecutorService executorService;
+
+    @BeforeEach
+    public void setup() {
+        configuration.set(KubernetesConfigOptions.CLUSTER_ID, 
TestUtils.TEST_DEPLOYMENT_NAME);
+        configuration.set(KubernetesConfigOptions.NAMESPACE, 
TestUtils.TEST_NAMESPACE);
+        configuration.set(FLINK_VERSION, FlinkVersion.v1_18);
+        eventRecorder = new EventRecorder(eventCollector);
+        operatorConfig = 
FlinkOperatorConfiguration.fromConfiguration(configuration);
+        executorService = Executors.newDirectExecutorService();
+    }
+
+    @Test
+    public void testGetClusterClientWithoutCerts() throws Exception {
+        Configuration deployConfig = createOperatorConfig();
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+        ConfigurationException thrown =
+                assertThrows(
+                        ConfigurationException.class,
+                        () -> {
+                            flinkService.getClusterClient(deployConfig);
+                        });
+        assertEquals("Failed to initialize SSLContext for the REST client", 
thrown.getMessage());
+    }
+
+    @Test
+    public void testGetRestClientWithoutCerts() throws Exception {
+        Configuration deployConfig = createOperatorConfig();
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+        ConfigurationException thrown =
+                assertThrows(
+                        ConfigurationException.class,
+                        () -> {
+                            flinkService.getRestClient(deployConfig);
+                        });
+        assertEquals("Failed to initialize SSLContext for the REST client", 
thrown.getMessage());
+    }
+
+    @Test
+    public void testSubmitApplicationClusterWithoutCerts() throws Exception {
+        Configuration deployConfig = createOperatorConfig();
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+        var testService = new TestingFlinkService(flinkService);
+        RuntimeException thrown =
+                assertThrows(
+                        RuntimeException.class,
+                        () -> {
+                            final FlinkDeployment deployment = 
TestUtils.buildApplicationCluster();
+                            testService.submitApplicationCluster(
+                                    deployment.getSpec().getJob(), 
deployConfig, false);
+                        });
+        assertInstanceOf(ClusterRetrieveException.class, thrown.getCause());
+    }
+
+    @Test
+    public void testSubmitSessionClusterWithoutCerts() throws Exception {
+        Configuration deployConfig = createOperatorConfig();
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+        var testService = new TestingFlinkService(flinkService);
+        RuntimeException thrown =
+                assertThrows(
+                        RuntimeException.class,
+                        () -> {
+                            testService.submitSessionCluster(deployConfig);
+                        });
+        assertInstanceOf(ClusterRetrieveException.class, thrown.getCause());
+    }
+
+    @Test
+    public void testGetClusterClientWithCerts() throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        Configuration deployConfig = createOperatorConfig();
+        Map<String, String> systemEnv = new HashMap<>(originalEnv);
+        // Set the env var to define the certficates
+        systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, 
getAbsolutePath("/keystore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234");
+        TestUtils.setEnv(systemEnv);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+
+        try {
+            flinkService.getClusterClient(deployConfig);
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    public void testAuthenticatedClusterClientWithNoKeystore() throws 
Exception {
+        Map<String, String> originalEnv = System.getenv();
+        Configuration deployConfig = createOperatorConfig();
+        deployConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, 
true);
+        Map<String, String> systemEnv = new HashMap<>(originalEnv);
+        // Set the env var to define the certficates
+        systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, 
getAbsolutePath("/truststore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234");
+        TestUtils.setEnv(systemEnv);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+
+        try {
+            ConfigurationException thrown =
+                    assertThrows(
+                            ConfigurationException.class,
+                            () -> {
+                                flinkService.getClusterClient(deployConfig);
+                            });
+            assertEquals(
+                    "Failed to initialize SSLContext for the REST client", 
thrown.getMessage());
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    public void testAuthenticatedClusterClientWithKeystore() throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        Configuration deployConfig = createOperatorConfig();
+        deployConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, 
true);
+        Map<String, String> systemEnv = new HashMap<>(originalEnv);
+        // Set the env var to define the certficates
+        systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, 
getAbsolutePath("/truststore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH, 
getAbsolutePath("/keystore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234");
+        TestUtils.setEnv(systemEnv);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+
+        try {
+            flinkService.getClusterClient(deployConfig);
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    public void testGetSecureRestClientWithCerts() throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        Configuration deployConfig = createOperatorConfig();
+        Map<String, String> systemEnv = new HashMap<>(originalEnv);
+        // Set the env var to define the certficates
+        systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, 
getAbsolutePath("/truststore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234");
+        TestUtils.setEnv(systemEnv);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+
+        try {
+            flinkService.getRestClient(deployConfig);
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    public void testSubmitApplicationClusterWithCerts() throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        Configuration deployConfig = createOperatorConfig();
+        Map<String, String> systemEnv = new HashMap<>(originalEnv);
+        // Set the env var to define the certficates
+        systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, 
getAbsolutePath("/truststore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234");
+        TestUtils.setEnv(systemEnv);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+        var testService = new TestingFlinkService(flinkService);
+
+        try {
+            final FlinkDeployment deployment = 
TestUtils.buildApplicationCluster();
+            testService.submitApplicationCluster(
+                    deployment.getSpec().getJob(), deployConfig, false);
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+
+    @Test
+    public void testSubmitSessionClusterWithCerts() throws Exception {
+        Map<String, String> originalEnv = System.getenv();
+        Configuration deployConfig = createOperatorConfig();
+        Map<String, String> systemEnv = new HashMap<>(originalEnv);
+        // Set the env var to define the certficates
+        systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, 
getAbsolutePath("/truststore.jks"));
+        systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234");
+        TestUtils.setEnv(systemEnv);
+        var flinkService =
+                new NativeFlinkService(
+                        client, null, executorService, operatorConfig, 
eventRecorder);
+        var testService = new TestingFlinkService(flinkService);
+
+        try {
+            testService.submitSessionCluster(deployConfig);
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+
+    private String getAbsolutePath(String path) throws URISyntaxException {
+        return 
SecureFlinkServiceTest.class.getResource(path).toURI().getPath();
+    }
+
+    private Configuration createOperatorConfig() {
+        Configuration deployConfig = new Configuration(configuration);
+        deployConfig.setString(OPERATOR_HEALTH_PROBE_PORT.key(), "80");
+        deployConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+        deployConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, 
"/etc/certs/keystore.jks");
+        deployConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, 
"/etc/certs/truststore.jks");
+        return deployConfig;
+    }
+
+    class TestingFlinkService extends NativeFlinkService {
+        private Configuration runtimeConfig;
+
+        public TestingFlinkService(NativeFlinkService nativeFlinkService) {
+            super(
+                    nativeFlinkService.kubernetesClient,
+                    nativeFlinkService.artifactManager,
+                    nativeFlinkService.executorService,
+                    nativeFlinkService.operatorConfig,
+                    eventRecorder);
+        }
+
+        @Override
+        public void submitApplicationCluster(
+                JobSpec jobSpec, Configuration conf, boolean 
requireHaMetadata) throws Exception {
+            try {
+                getClusterClient(conf);
+            } catch (ConfigurationException e) {
+                throw new RuntimeException(
+                        new ClusterRetrieveException("Could not create the 
RestClusterClient.", e));
+            }
+        }
+
+        @Override
+        public void submitSessionCluster(Configuration conf) throws Exception {
+            try {
+                getClusterClient(conf);
+            } catch (ConfigurationException e) {
+                throw new RuntimeException(
+                        new ClusterRetrieveException("Could not create the 
RestClusterClient.", e));
+            }
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/resources/keystore.jks 
b/flink-kubernetes-operator/src/test/resources/keystore.jks
new file mode 100644
index 00000000..a69db8b7
Binary files /dev/null and 
b/flink-kubernetes-operator/src/test/resources/keystore.jks differ
diff --git a/flink-kubernetes-operator/src/test/resources/truststore.jks 
b/flink-kubernetes-operator/src/test/resources/truststore.jks
new file mode 100644
index 00000000..c43a5da0
Binary files /dev/null and 
b/flink-kubernetes-operator/src/test/resources/truststore.jks differ
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml 
b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index 0349612b..69e1f8f8 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -110,6 +110,17 @@ spec:
               value: 
-Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
             - name: JVM_ARGS
               value: {{ .Values.jvmArgs.operator }}
+            {{- if .Values.tls.create }}
+            - name: OPERATOR_KEYSTORE_PATH
+              value: /opt/flink/tls-cert/keystore.jks
+            - name: OPERATOR_TRUSTSTORE_PATH
+              value: /opt/flink/tls-cert/truststore.jks
+            - name: OPERATOR_KEYSTORE_PASSWORD
+              valueFrom:
+                secretKeyRef:
+                  {{- toYaml .Values.tls.secretKeyRef | nindent 18 }}
+                  optional: true
+            {{- end }}
             {{- with .Values.operatorPod.env }}
             {{- toYaml . | nindent 12 }}
             {{- end }}
@@ -130,6 +141,10 @@ spec:
             - name: flink-artifacts-volume
               mountPath: /opt/flink/artifacts
             {{- end }}
+          {{- if .Values.tls.create }}
+            - name: flink-operator-cert-secret
+              mountPath: /opt/flink/tls-cert
+          {{- end }}
           {{- if and (index .Values "operatorHealth") (index 
.Values.operatorHealth "livenessProbe") }}
           livenessProbe:
             {{- toYaml .Values.operatorHealth.livenessProbe | nindent 12 }}
@@ -229,6 +244,12 @@ spec:
             - key: keystore.p12
               path: keystore.p12
         {{- end }}
+        {{- if .Values.tls.create }}
+        - name: flink-operator-cert-secret
+          secret:
+            secretName: {{ .Values.tls.secretName }}
+            optional: true
+        {{- end }}
 ---
 {{- if .Values.defaultConfiguration.create }}
 apiVersion: v1
diff --git a/helm/flink-kubernetes-operator/values.yaml 
b/helm/flink-kubernetes-operator/values.yaml
index 6b8c9042..12a97bf2 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -194,3 +194,11 @@ operatorHealth:
 
 # Set postStart hook of the main container
 postStart: {}
+
+# Configuration for tls
+tls:
+  create: false
+  secretName: flink-operator-cert
+  secretKeyRef:
+    name: operator-certificate-password
+    key: password


Reply via email to