This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 30566ed7 [Flink 31966] Flink Kubernetes operator lacks TLS support 30566ed7 is described below commit 30566ed7390a4fce1dc3e239efe3c519c55aa9b1 Author: Tony Garrard <garr...@uk.ibm.com> AuthorDate: Thu Dec 7 12:16:39 2023 +0000 [Flink 31966] Flink Kubernetes operator lacks TLS support Signed-off-by: A. Garrard <garr...@uk.ibm.com> --- docs/content/docs/operations/helm.md | 4 + examples/README.md | 5 + .../basic-secure-deployment-only.yaml | 49 ++++ .../basic-secure-session-job-only.yaml | 42 +++ examples/flink-tls-example/basic-secure.yaml | 47 +++ examples/flink-tls-example/pre-install.yaml | 106 +++++++ .../operator/service/AbstractFlinkService.java | 68 ++++- .../operator/service/NativeFlinkService.java | 2 +- .../operator/service/StandaloneFlinkService.java | 2 +- .../flink/kubernetes/operator/utils/EnvUtils.java | 3 + .../kubernetes/operator/TestingFlinkService.java | 2 +- .../operator/service/AbstractFlinkServiceTest.java | 2 +- .../operator/service/SecureFlinkServiceTest.java | 319 +++++++++++++++++++++ .../src/test/resources/keystore.jks | Bin 0 -> 3018 bytes .../src/test/resources/truststore.jks | Bin 0 -> 861 bytes .../templates/flink-operator.yaml | 21 ++ helm/flink-kubernetes-operator/values.yaml | 8 + 17 files changed, 671 insertions(+), 9 deletions(-) diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md index bb84a2b5..dee6efd0 100644 --- a/docs/content/docs/operations/helm.md +++ b/docs/content/docs/operations/helm.md @@ -111,6 +111,10 @@ The configurable parameters of the Helm chart and which default values as detail | operatorHealth.livenessProbe | Liveness probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | [...] | operatorHealth.startupProbe | Startup probe configuration for the operator using the health endpoint. Only time settings should be configured, endpoint is set automatically based on port. | [...] | postStart | The postStart hook configuration for the main container. | [...] +| tls.create | Whether to mount an optional secret containing a tls truststore for the flink-kubernetes-operator. | false [...] +| tls.secretName | The name of the tls secret | flink-operator-cert [...] +| tls.secretKeyRef.name | The name of the secret containing the password for the java keystore/truststore | operator-certificate-password | +| tls.secretKeyRef.key | The key that holds this password | password | For more information check the [Helm documentation](https://helm.sh/docs/helm/helm_install/). diff --git a/examples/README.md b/examples/README.md index 7200c801..473450b4 100644 --- a/examples/README.md +++ b/examples/README.md @@ -169,3 +169,8 @@ how to override default values using [kustomize](https://kustomize.io/) For the detailed description of advanced configuration techniques follow this [link](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/#advanced-customization-techniques). +#### Enabling TLS on your deployments + +In order for the operator to communicate with the rest service of a deployment you need to mount a jks/pkcs12 secret onto the operator that uses the same ca certificate as those used by your deployments. +N.b. Make sure you use the same mount location and keystore password for your deployment as you have for the operator +[This](flink-tls-example) example provides a pre-install.yaml file that you would need to apply to your cluster before helm installing your cluster using the value tls.create=true. It creates an issuer that is CA Certificate backed that can be used for both the operator and the examples provided \ No newline at end of file diff --git a/examples/flink-tls-example/basic-secure-deployment-only.yaml b/examples/flink-tls-example/basic-secure-deployment-only.yaml new file mode 100644 index 00000000..253dfe47 --- /dev/null +++ b/examples/flink-tls-example/basic-secure-deployment-only.yaml @@ -0,0 +1,49 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-secure-deployment-only +spec: + image: flink:1.17 + flinkVersion: v1_17 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + security.ssl.internal.enabled: 'true' + security.ssl.internal.keystore: /opt/flink/tls-cert/keystore.jks + security.ssl.internal.keystore-password: password1234 + security.ssl.internal.truststore: /opt/flink/tls-cert/truststore.jks + security.ssl.internal.key-password: password1234 + security.ssl.internal.truststore-password: password1234 + security.ssl.rest.enabled: 'true' + security.ssl.rest.keystore: /opt/flink/tls-cert/keystore.jks + security.ssl.rest.keystore-password: password1234 + security.ssl.rest.truststore: /opt/flink/tls-cert/truststore.jks + security.ssl.rest.key-password: password1234 + security.ssl.rest.truststore-password: password1234 + kubernetes.secrets: 'basic-secure-cert:/opt/flink/tls-cert' + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 diff --git a/examples/flink-tls-example/basic-secure-session-job-only.yaml b/examples/flink-tls-example/basic-secure-session-job-only.yaml new file mode 100644 index 00000000..60b85113 --- /dev/null +++ b/examples/flink-tls-example/basic-secure-session-job-only.yaml @@ -0,0 +1,42 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + name: basic-secure-session-job-only +spec: + flinkConfiguration: + security.ssl.internal.enabled: 'true' + security.ssl.internal.keystore: /opt/flink/tls-cert/keystore.jks + security.ssl.internal.keystore-password: password1234 + security.ssl.internal.truststore: /opt/flink/tls-cert/truststore.jks + security.ssl.internal.key-password: password1234 + security.ssl.internal.truststore-password: password1234 + security.ssl.rest.keystore: /opt/flink/tls-cert/keystore.jks + security.ssl.rest.truststore: /opt/flink/tls-cert/truststore.jks + security.ssl.rest.key-password: password1234 + security.ssl.rest.truststore-password: password1234 + security.ssl.rest.enabled: 'true' + security.ssl.rest.keystore-password: password1234 + kubernetes.secrets: 'basic-secure-cert:/opt/flink/tls-cert' + deploymentName: basic-secure-deployment-only + job: + jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar + parallelism: 4 + upgradeMode: stateless + diff --git a/examples/flink-tls-example/basic-secure.yaml b/examples/flink-tls-example/basic-secure.yaml new file mode 100644 index 00000000..a5245ca3 --- /dev/null +++ b/examples/flink-tls-example/basic-secure.yaml @@ -0,0 +1,47 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-secure +spec: + image: flink:1.17 + flinkVersion: v1_17 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + security.ssl.enabled: 'true' + security.ssl.truststore: /opt/flink/tls-cert/truststore.jks + security.ssl.truststore-password: password1234 + security.ssl.keystore: /opt/flink/tls-cert/keystore.jks + security.ssl.keystore-password: password1234 + security.ssl.key-password: password1234 + kubernetes.secrets: 'basic-secure-cert:/opt/flink/tls-cert' + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 2 + upgradeMode: stateless \ No newline at end of file diff --git a/examples/flink-tls-example/pre-install.yaml b/examples/flink-tls-example/pre-install.yaml new file mode 100644 index 00000000..235a2bdb --- /dev/null +++ b/examples/flink-tls-example/pre-install.yaml @@ -0,0 +1,106 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +--- +apiVersion: v1 +kind: Secret +metadata: + name: basic-secure-cert-secret-creds +type: Opaque +data: + password: cGFzc3dvcmQxMjM0 +--- +apiVersion: v1 +kind: Secret +metadata: + name: operator-certificate-password +type: Opaque +data: + password: cGFzc3dvcmQxMjM0 +--- +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: flink-selfsigned-issuer +spec: + selfSigned: {} +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-ca-cert +spec: + issuerRef: + kind: Issuer + name: flink-selfsigned-issuer + commonName: FlinkCA + isCA: true + secretName: flink-ca-cert + subject: + organizations: + - Apache Flink +--- +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: flink-ca-issuer +spec: + ca: + secretName: flink-ca-cert +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: basic-secure-cert +spec: + dnsNames: + - '*.flink.svc' + - '*.svc.cluster.local' + - 'basic-secure-rest' + - 'basic-secure-deployment-only-rest' + keystores: + jks: + create: true + passwordSecretRef: + name: basic-secure-cert-secret-creds + key: password + issuerRef: + kind: Issuer + name: flink-ca-issuer + commonName: FlinkDeployment + secretName: basic-secure-cert +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-operator-cert +spec: + dnsNames: + - '*.flink.svc' + - '*.svc.cluster.local' + keystores: + jks: + create: true + passwordSecretRef: + name: operator-certificate-password + key: password + issuerRef: + kind: Issuer + name: flink-ca-issuer + commonName: FlinkOperator + secretName: flink-operator-cert +--- \ No newline at end of file diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 3c4fe4aa..cd5a1137 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -25,6 +25,7 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -46,6 +47,7 @@ import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException; import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult; import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.execution.ExecutionState; @@ -90,7 +92,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; @@ -116,6 +117,7 @@ import java.net.Socket; import java.net.SocketAddress; import java.net.URI; import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -133,7 +135,6 @@ import java.util.stream.Collectors; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX; -import static org.apache.flink.runtime.rest.messages.queue.QueueStatus.Id.IN_PROGRESS; /** * An abstract {@link FlinkService} containing some common implementations for the native and @@ -170,6 +171,8 @@ public abstract class AbstractFlinkService implements FlinkService { protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception; + protected abstract void deploySessionCluster(Configuration conf) throws Exception; + @Override public KubernetesClient getKubernetesClient() { return kubernetesClient; @@ -199,6 +202,11 @@ public abstract class AbstractFlinkService implements FlinkService { deployApplicationCluster(jobSpec, removeOperatorConfigs(conf)); } + @Override + public void submitSessionCluster(Configuration conf) throws Exception { + deploySessionCluster(conf); + } + @Override public boolean isHaMetadataAvailable(Configuration conf) { if (FlinkUtils.isKubernetesHAActivated(conf)) { @@ -753,6 +761,10 @@ public abstract class AbstractFlinkService implements FlinkService { final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID); final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE); final int port = conf.getInteger(RestOptions.PORT); + Configuration operatorRestConf = conf; + if (SecurityOptions.isRestSSLEnabled(conf)) { + operatorRestConf = getOperatorRestConfig(conf); + } final String host = ObjectUtils.firstNonNull( operatorConfig.getFlinkServiceHostOverride(), @@ -761,7 +773,9 @@ public abstract class AbstractFlinkService implements FlinkService { final String restServerAddress = String.format("http://%s:%s", host, port); LOG.debug("Creating RestClusterClient({})", restServerAddress); return new RestClusterClient<>( - conf, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress)); + operatorRestConf, + clusterId, + (c, e) -> new StandaloneClientHAServices(restServerAddress)); } @VisibleForTesting @@ -838,8 +852,12 @@ public abstract class AbstractFlinkService implements FlinkService { } @VisibleForTesting - protected RestClient getRestClient(Configuration conf) throws ConfigurationException { - return new RestClient(conf, executorService); + protected RestClient getRestClient(Configuration conf) throws Exception { + Configuration operatorRestConf = conf; + if (SecurityOptions.isRestSSLEnabled(conf)) { + operatorRestConf = getOperatorRestConfig(operatorRestConf); + } + return new RestClient(operatorRestConf, executorService); } private String findJarURI(JobSpec jobSpec) { @@ -1077,4 +1095,44 @@ public abstract class AbstractFlinkService implements FlinkService { status.getJobStatus().setState(JobStatus.FINISHED.name()); } } + + private Configuration getOperatorRestConfig(Configuration origConfig) throws IOException { + Configuration conf = new Configuration(origConfig); + EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH) + .ifPresent( + path -> { + if (Files.notExists(Paths.get(path))) { + return; + } + conf.set( + SecurityOptions.SSL_REST_TRUSTSTORE, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)); + conf.set( + SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + if (SecurityOptions.isRestSSLAuthenticationEnabled(conf) + && EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH) + .isPresent()) { + conf.set( + SecurityOptions.SSL_REST_KEYSTORE, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)); + conf.set( + SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, + EnvUtils.getRequired( + EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + conf.set( + SecurityOptions.SSL_REST_KEY_PASSWORD, + EnvUtils.getRequired( + EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + } else { + conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE); + conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD); + } + conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE); + conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); + conf.removeConfig(SecurityOptions.SSL_KEYSTORE); + conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD); + }); + return conf; + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index 27d5a265..9781a5b0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -106,7 +106,7 @@ public class NativeFlinkService extends AbstractFlinkService { } @Override - public void submitSessionCluster(Configuration conf) throws Exception { + public void deploySessionCluster(Configuration conf) throws Exception { submitClusterInternal(removeOperatorConfigs(conf)); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index 55fa11d3..c0439d04 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -74,7 +74,7 @@ public class StandaloneFlinkService extends AbstractFlinkService { } @Override - public void submitSessionCluster(Configuration conf) throws Exception { + public void deploySessionCluster(Configuration conf) throws Exception { LOG.info("Deploying session cluster"); submitClusterInternal(removeOperatorConfigs(conf), Mode.SESSION); LOG.info("Session cluster successfully deployed"); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java index 39a2aaf8..e978b0fc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java @@ -56,6 +56,9 @@ public class EnvUtils { public static final String ENV_OPERATOR_NAME = "OPERATOR_NAME"; public static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE"; public static final String ENV_WATCH_NAMESPACES = "WATCH_NAMESPACES"; + public static final String ENV_OPERATOR_KEYSTORE_PASSWORD = "OPERATOR_KEYSTORE_PASSWORD"; + public static final String ENV_OPERATOR_KEYSTORE_PATH = "OPERATOR_KEYSTORE_PATH"; + public static final String ENV_OPERATOR_TRUSTSTORE_PATH = "OPERATOR_TRUSTSTORE_PATH"; private static final String PROP_FILE = ".flink-kubernetes-operator.version.properties"; private static final String FAIL_MESSAGE = diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 94453b06..fe61cc67 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -240,7 +240,7 @@ public class TestingFlinkService extends AbstractFlinkService { } @Override - public void submitSessionCluster(Configuration conf) throws Exception { + public void deploySessionCluster(Configuration conf) throws Exception { if (deployFailure) { throw new Exception("Deployment failure"); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index a108b8bb..86c1933e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -1073,7 +1073,7 @@ public class AbstractFlinkServiceTest { } @Override - public void submitSessionCluster(Configuration conf) { + public void deploySessionCluster(Configuration conf) { throw new UnsupportedOperationException(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.java new file mode 100644 index 00000000..08b8e933 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.service; + +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; +import org.apache.flink.kubernetes.operator.api.spec.JobSpec; +import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; +import org.apache.flink.kubernetes.operator.utils.EnvUtils; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.Executors; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; +import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * @link FlinkService unit tests + */ +@EnableKubernetesMockClient(crud = true) +public class SecureFlinkServiceTest { + KubernetesClient client; + private final Configuration configuration = new Configuration(); + + private final EventCollector eventCollector = new EventCollector(); + + private EventRecorder eventRecorder; + private FlinkOperatorConfiguration operatorConfig; + private ExecutorService executorService; + + @BeforeEach + public void setup() { + configuration.set(KubernetesConfigOptions.CLUSTER_ID, TestUtils.TEST_DEPLOYMENT_NAME); + configuration.set(KubernetesConfigOptions.NAMESPACE, TestUtils.TEST_NAMESPACE); + configuration.set(FLINK_VERSION, FlinkVersion.v1_18); + eventRecorder = new EventRecorder(eventCollector); + operatorConfig = FlinkOperatorConfiguration.fromConfiguration(configuration); + executorService = Executors.newDirectExecutorService(); + } + + @Test + public void testGetClusterClientWithoutCerts() throws Exception { + Configuration deployConfig = createOperatorConfig(); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + ConfigurationException thrown = + assertThrows( + ConfigurationException.class, + () -> { + flinkService.getClusterClient(deployConfig); + }); + assertEquals("Failed to initialize SSLContext for the REST client", thrown.getMessage()); + } + + @Test + public void testGetRestClientWithoutCerts() throws Exception { + Configuration deployConfig = createOperatorConfig(); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + ConfigurationException thrown = + assertThrows( + ConfigurationException.class, + () -> { + flinkService.getRestClient(deployConfig); + }); + assertEquals("Failed to initialize SSLContext for the REST client", thrown.getMessage()); + } + + @Test + public void testSubmitApplicationClusterWithoutCerts() throws Exception { + Configuration deployConfig = createOperatorConfig(); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + var testService = new TestingFlinkService(flinkService); + RuntimeException thrown = + assertThrows( + RuntimeException.class, + () -> { + final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + testService.submitApplicationCluster( + deployment.getSpec().getJob(), deployConfig, false); + }); + assertInstanceOf(ClusterRetrieveException.class, thrown.getCause()); + } + + @Test + public void testSubmitSessionClusterWithoutCerts() throws Exception { + Configuration deployConfig = createOperatorConfig(); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + var testService = new TestingFlinkService(flinkService); + RuntimeException thrown = + assertThrows( + RuntimeException.class, + () -> { + testService.submitSessionCluster(deployConfig); + }); + assertInstanceOf(ClusterRetrieveException.class, thrown.getCause()); + } + + @Test + public void testGetClusterClientWithCerts() throws Exception { + Map<String, String> originalEnv = System.getenv(); + Configuration deployConfig = createOperatorConfig(); + Map<String, String> systemEnv = new HashMap<>(originalEnv); + // Set the env var to define the certficates + systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, getAbsolutePath("/keystore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234"); + TestUtils.setEnv(systemEnv); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + + try { + flinkService.getClusterClient(deployConfig); + } finally { + TestUtils.setEnv(originalEnv); + } + } + + @Test + public void testAuthenticatedClusterClientWithNoKeystore() throws Exception { + Map<String, String> originalEnv = System.getenv(); + Configuration deployConfig = createOperatorConfig(); + deployConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true); + Map<String, String> systemEnv = new HashMap<>(originalEnv); + // Set the env var to define the certficates + systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, getAbsolutePath("/truststore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234"); + TestUtils.setEnv(systemEnv); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + + try { + ConfigurationException thrown = + assertThrows( + ConfigurationException.class, + () -> { + flinkService.getClusterClient(deployConfig); + }); + assertEquals( + "Failed to initialize SSLContext for the REST client", thrown.getMessage()); + } finally { + TestUtils.setEnv(originalEnv); + } + } + + @Test + public void testAuthenticatedClusterClientWithKeystore() throws Exception { + Map<String, String> originalEnv = System.getenv(); + Configuration deployConfig = createOperatorConfig(); + deployConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true); + Map<String, String> systemEnv = new HashMap<>(originalEnv); + // Set the env var to define the certficates + systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, getAbsolutePath("/truststore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH, getAbsolutePath("/keystore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234"); + TestUtils.setEnv(systemEnv); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + + try { + flinkService.getClusterClient(deployConfig); + } finally { + TestUtils.setEnv(originalEnv); + } + } + + @Test + public void testGetSecureRestClientWithCerts() throws Exception { + Map<String, String> originalEnv = System.getenv(); + Configuration deployConfig = createOperatorConfig(); + Map<String, String> systemEnv = new HashMap<>(originalEnv); + // Set the env var to define the certficates + systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, getAbsolutePath("/truststore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234"); + TestUtils.setEnv(systemEnv); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + + try { + flinkService.getRestClient(deployConfig); + } finally { + TestUtils.setEnv(originalEnv); + } + } + + @Test + public void testSubmitApplicationClusterWithCerts() throws Exception { + Map<String, String> originalEnv = System.getenv(); + Configuration deployConfig = createOperatorConfig(); + Map<String, String> systemEnv = new HashMap<>(originalEnv); + // Set the env var to define the certficates + systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, getAbsolutePath("/truststore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234"); + TestUtils.setEnv(systemEnv); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + var testService = new TestingFlinkService(flinkService); + + try { + final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + testService.submitApplicationCluster( + deployment.getSpec().getJob(), deployConfig, false); + } finally { + TestUtils.setEnv(originalEnv); + } + } + + @Test + public void testSubmitSessionClusterWithCerts() throws Exception { + Map<String, String> originalEnv = System.getenv(); + Configuration deployConfig = createOperatorConfig(); + Map<String, String> systemEnv = new HashMap<>(originalEnv); + // Set the env var to define the certficates + systemEnv.put(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH, getAbsolutePath("/truststore.jks")); + systemEnv.put(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD, "password1234"); + TestUtils.setEnv(systemEnv); + var flinkService = + new NativeFlinkService( + client, null, executorService, operatorConfig, eventRecorder); + var testService = new TestingFlinkService(flinkService); + + try { + testService.submitSessionCluster(deployConfig); + } finally { + TestUtils.setEnv(originalEnv); + } + } + + private String getAbsolutePath(String path) throws URISyntaxException { + return SecureFlinkServiceTest.class.getResource(path).toURI().getPath(); + } + + private Configuration createOperatorConfig() { + Configuration deployConfig = new Configuration(configuration); + deployConfig.setString(OPERATOR_HEALTH_PROBE_PORT.key(), "80"); + deployConfig.setBoolean(SecurityOptions.SSL_REST_ENABLED, true); + deployConfig.setString(SecurityOptions.SSL_REST_KEYSTORE, "/etc/certs/keystore.jks"); + deployConfig.setString(SecurityOptions.SSL_REST_TRUSTSTORE, "/etc/certs/truststore.jks"); + return deployConfig; + } + + class TestingFlinkService extends NativeFlinkService { + private Configuration runtimeConfig; + + public TestingFlinkService(NativeFlinkService nativeFlinkService) { + super( + nativeFlinkService.kubernetesClient, + nativeFlinkService.artifactManager, + nativeFlinkService.executorService, + nativeFlinkService.operatorConfig, + eventRecorder); + } + + @Override + public void submitApplicationCluster( + JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception { + try { + getClusterClient(conf); + } catch (ConfigurationException e) { + throw new RuntimeException( + new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + } + + @Override + public void submitSessionCluster(Configuration conf) throws Exception { + try { + getClusterClient(conf); + } catch (ConfigurationException e) { + throw new RuntimeException( + new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + } + } +} diff --git a/flink-kubernetes-operator/src/test/resources/keystore.jks b/flink-kubernetes-operator/src/test/resources/keystore.jks new file mode 100644 index 00000000..a69db8b7 Binary files /dev/null and b/flink-kubernetes-operator/src/test/resources/keystore.jks differ diff --git a/flink-kubernetes-operator/src/test/resources/truststore.jks b/flink-kubernetes-operator/src/test/resources/truststore.jks new file mode 100644 index 00000000..c43a5da0 Binary files /dev/null and b/flink-kubernetes-operator/src/test/resources/truststore.jks differ diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml index 0349612b..69e1f8f8 100644 --- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml +++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml @@ -110,6 +110,17 @@ spec: value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties - name: JVM_ARGS value: {{ .Values.jvmArgs.operator }} + {{- if .Values.tls.create }} + - name: OPERATOR_KEYSTORE_PATH + value: /opt/flink/tls-cert/keystore.jks + - name: OPERATOR_TRUSTSTORE_PATH + value: /opt/flink/tls-cert/truststore.jks + - name: OPERATOR_KEYSTORE_PASSWORD + valueFrom: + secretKeyRef: + {{- toYaml .Values.tls.secretKeyRef | nindent 18 }} + optional: true + {{- end }} {{- with .Values.operatorPod.env }} {{- toYaml . | nindent 12 }} {{- end }} @@ -130,6 +141,10 @@ spec: - name: flink-artifacts-volume mountPath: /opt/flink/artifacts {{- end }} + {{- if .Values.tls.create }} + - name: flink-operator-cert-secret + mountPath: /opt/flink/tls-cert + {{- end }} {{- if and (index .Values "operatorHealth") (index .Values.operatorHealth "livenessProbe") }} livenessProbe: {{- toYaml .Values.operatorHealth.livenessProbe | nindent 12 }} @@ -229,6 +244,12 @@ spec: - key: keystore.p12 path: keystore.p12 {{- end }} + {{- if .Values.tls.create }} + - name: flink-operator-cert-secret + secret: + secretName: {{ .Values.tls.secretName }} + optional: true + {{- end }} --- {{- if .Values.defaultConfiguration.create }} apiVersion: v1 diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml index 6b8c9042..12a97bf2 100644 --- a/helm/flink-kubernetes-operator/values.yaml +++ b/helm/flink-kubernetes-operator/values.yaml @@ -194,3 +194,11 @@ operatorHealth: # Set postStart hook of the main container postStart: {} + +# Configuration for tls +tls: + create: false + secretName: flink-operator-cert + secretKeyRef: + name: operator-certificate-password + key: password