This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 067e3c038ed [feat][fn] PIP-257: Support mounting k8s ServiceAccount
for OIDC auth (#19888)
067e3c038ed is described below
commit 067e3c038edf406b27f8f91847f688f559a2ba25
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Apr 6 08:45:58 2023 -0500
[feat][fn] PIP-257: Support mounting k8s ServiceAccount for OIDC auth
(#19888)
PIP: #19771
### Motivation
In order to make OIDC work with functions, we must give them a way to
authenticate with the broker using tokens that are able to be validated by an
using an Authorization Server. This PR introduces the
`KubernetesServiceAccountAuthProvider`.
### Modifications
* Create an `KubernetesServiceAccountAuthProvider` implementation. It adds
a service account token volume projection as defined in the k8s docs
[here](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection).
The implementation provides a way to specify the expiration time that the
token will receive.
* Instead of creating a secret with the broker's trusted `ca.crt` in it,
this new `KubernetesServiceAccountAuthProvider` expects a secret to already
exist with the `ca.crt`. The major advantage for this implementation is that
when the `ca.crt` is rotated, we can refresh it (assuming the client is
configured to observe the updated file).
* Add support for specifying the token's expiration and audience.
* One point of divergence from the `KubernetesSecretsTokenAuthProvider`
implementation is that I did not provide a way for functions to authenticate as
the anonymous role. It seems like a stretch that functions would use such
authentication because it will not be multi-tenant. However, if that is a
concern, we can add the support.
* The feature will be configured with the following yaml:
```yaml
functionRuntimeFactoryConfigs:
kubernetesFunctionAuthProviderConfig:
brokerClientTrustCertsSecretName: "secret-name"
serviceAccountTokenExpirationSeconds: 3600
serviceAccountTokenAudience: "pulsar-cluster-audience"
```
### Verifying this change
I verified the correctness of the code with unit tests. I'll verify the
integration with k8s once we've determined this PR's design is correct.
### Does this pull request potentially affect one of the following parts:
This adds new configuration options to the function worker.
### Documentation
- [x] `doc-required`
### Matching PR in forked repository
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/36
---
.../auth/KubernetesFunctionAuthProvider.java | 12 ++
.../KubernetesServiceAccountTokenAuthProvider.java | 208 +++++++++++++++++++++
.../kubernetes/KubernetesRuntimeFactory.java | 2 +-
.../kubernetes/KubernetesRuntimeFactoryConfig.java | 6 +
...ernetesServiceAccountTokenAuthProviderTest.java | 128 +++++++++++++
5 files changed, 355 insertions(+), 1 deletion(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index 29365424f54..3d05d519552 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.auth;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1StatefulSet;
+import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.proto.Function;
@@ -31,6 +32,11 @@ public interface KubernetesFunctionAuthProvider extends
FunctionAuthProvider {
void initialize(CoreV1Api coreClient);
+ /**
+ * @deprecated use
+ * {@link #initialize(CoreV1Api, byte[], java.util.function.Function, Map)}
+ */
+ @Deprecated(since = "3.0.0")
default void initialize(CoreV1Api coreClient, byte[] caBytes,
java.util.function.Function<Function.FunctionDetails, String>
namespaceCustomizerFunc) {
setCaBytes(caBytes);
@@ -38,6 +44,12 @@ public interface KubernetesFunctionAuthProvider extends
FunctionAuthProvider {
initialize(coreClient);
}
+ default void initialize(CoreV1Api coreClient, byte[] caBytes,
+
java.util.function.Function<Function.FunctionDetails, String>
namespaceCustomizerFunc,
+ Map<String, Object> config) {
+ initialize(coreClient, caBytes, namespaceCustomizerFunc);
+ }
+
default void setCaBytes(byte[] caBytes) {
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProvider.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProvider.java
new file mode 100644
index 00000000000..06fadec42d4
--- /dev/null
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProvider.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1KeyToPath;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1ProjectedVolumeSource;
+import io.kubernetes.client.openapi.models.V1SecretVolumeSource;
+import io.kubernetes.client.openapi.models.V1ServiceAccountTokenProjection;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeProjection;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.eclipse.jetty.util.StringUtil;
+
+/**
+ * Kubernetes Function Authentication Provider that adds Service Account Token
Projection to a function pod's container
+ * definition. This token can be used to authenticate the function instance
with the broker and the function worker via
+ * OpenId Connect when each server is configured to trust the kubernetes
issuer. See docs for additional details.
+ * Relevant settings:
+ * <p>
+ * brokerClientTrustCertsSecretName: The Kubernetes secret containing the
broker's trust certs. If it is not set,
+ * the function will not use a custom trust store. The secret must already
exist in each function's target
+ * namespace. The secret must contain a key named `ca.crt` with the trust
certs. Only the ca.crt will be mounted.
+ * </p>
+ * <p>
+ * serviceAccountTokenExpirationSeconds: The expiration for the token
created by the
+ * {@link KubernetesServiceAccountTokenAuthProvider}. The default value is
3600 seconds.
+ * </p>
+ * <p>
+ * serviceAccountTokenAudience: The audience for the token created by the
+ * {@link KubernetesServiceAccountTokenAuthProvider}.
+ * </p>
+ * Note: the pod inherits the namespace's default service account.
+ */
+public class KubernetesServiceAccountTokenAuthProvider implements
KubernetesFunctionAuthProvider {
+
+ private static final String BROKER_CLIENT_TRUST_CERTS_SECRET_NAME =
"brokerClientTrustCertsSecretName";
+ private static final String SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS =
"serviceAccountTokenExpirationSeconds";
+ private static final String SERVICE_ACCOUNT_TOKEN_AUDIENCE =
"serviceAccountTokenAudience";
+
+ private static final String SERVICE_ACCOUNT_VOLUME_NAME =
"service-account-token";
+ private static final String TRUST_CERT_VOLUME_NAME = "ca-cert";
+ private static final String DEFAULT_MOUNT_DIR = "/etc/auth";
+ private static final String FUNCTION_AUTH_TOKEN = "token";
+ private static final String FUNCTION_CA_CERT = "ca.crt";
+ private static final String DEFAULT_CERT_PATH = DEFAULT_MOUNT_DIR + "/" +
FUNCTION_CA_CERT;
+ private String brokerTrustCertsSecretName;
+ private long serviceAccountTokenExpirationSeconds;
+ private String serviceAccountTokenAudience;
+
+ @Override
+ public void initialize(CoreV1Api coreClient, byte[] caBytes,
+
java.util.function.Function<Function.FunctionDetails, String>
namespaceCustomizerFunc,
+ Map<String, Object> config) {
+ setNamespaceProviderFunc(namespaceCustomizerFunc);
+ Object certSecretName =
config.get(BROKER_CLIENT_TRUST_CERTS_SECRET_NAME);
+ if (certSecretName instanceof String) {
+ brokerTrustCertsSecretName = (String) certSecretName;
+ } else if (certSecretName != null) {
+ // Throw exception because user set this configuration, but it
isn't valid.
+ throw new IllegalArgumentException("Invalid value for " +
BROKER_CLIENT_TRUST_CERTS_SECRET_NAME
+ + ". Expected a string.");
+ }
+ Object tokenExpirationSeconds =
config.get(SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS);
+ if (tokenExpirationSeconds instanceof Long) {
+ serviceAccountTokenExpirationSeconds = (Long)
tokenExpirationSeconds;
+ } else if (tokenExpirationSeconds instanceof String) {
+ try {
+ serviceAccountTokenExpirationSeconds = Long.parseLong((String)
tokenExpirationSeconds);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid value for " +
SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
+ + ". Expected a long.");
+ }
+ } else if (tokenExpirationSeconds != null) {
+ // Throw exception because user set this configuration, but it
isn't valid.
+ throw new IllegalArgumentException("Invalid value for " +
SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
+ + ". Expected a long.");
+ }
+ Object tokenAudience = config.get(SERVICE_ACCOUNT_TOKEN_AUDIENCE);
+ if (tokenAudience instanceof String) {
+ serviceAccountTokenAudience = (String) tokenAudience;
+ } else if (tokenAudience != null) {
+ throw new IllegalArgumentException("Invalid value for " +
SERVICE_ACCOUNT_TOKEN_AUDIENCE
+ + ". Expected a string.");
+ }
+ }
+
+ @Override
+ public void configureAuthenticationConfig(AuthenticationConfig authConfig,
+ Optional<FunctionAuthData>
functionAuthData) {
+
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+
authConfig.setClientAuthenticationParameters(Paths.get(DEFAULT_MOUNT_DIR,
FUNCTION_AUTH_TOKEN)
+ .toUri().toString());
+ if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
+ authConfig.setTlsTrustCertsFilePath(DEFAULT_CERT_PATH);
+ }
+ }
+
+ /**
+ * No need to cache anything. Kubernetes generates the token used for
authentication.
+ */
+ @Override
+ public Optional<FunctionAuthData> cacheAuthData(Function.FunctionDetails
funcDetails,
+ AuthenticationDataSource
authenticationDataSource)
+ throws Exception {
+ return Optional.empty();
+ }
+
+ /**
+ * No need to update anything. Kubernetes updates the token used for
authentication.
+ */
+ @Override
+ public Optional<FunctionAuthData> updateAuthData(Function.FunctionDetails
funcDetails,
+
Optional<FunctionAuthData> existingFunctionAuthData,
+ AuthenticationDataSource
authenticationDataSource)
+ throws Exception {
+ return Optional.empty();
+ }
+
+ /**
+ * No need to clean up anything. Kubernetes cleans up the secret when the
pod is deleted.
+ */
+ @Override
+ public void cleanUpAuthData(Function.FunctionDetails funcDetails,
Optional<FunctionAuthData> functionAuthData)
+ throws Exception {
+
+ }
+
+ @Override
+ public void initialize(CoreV1Api coreClient) {
+ }
+
+ @Override
+ public void configureAuthDataStatefulSet(V1StatefulSet statefulSet,
Optional<FunctionAuthData> functionAuthData) {
+ V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
+ // configure pod mount secret with auth token
+ if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
+ podSpec.addVolumesItem(createTrustCertVolume());
+ }
+ podSpec.addVolumesItem(createServiceAccountVolume());
+ podSpec.getContainers().forEach(this::addVolumeMountsToContainer);
+ }
+
+ private V1Volume createServiceAccountVolume() {
+ V1ProjectedVolumeSource projectedVolumeSource = new
V1ProjectedVolumeSource();
+ V1VolumeProjection volumeProjection = new V1VolumeProjection();
+ volumeProjection.serviceAccountToken(
+ new V1ServiceAccountTokenProjection()
+ .audience(serviceAccountTokenAudience)
+
.expirationSeconds(serviceAccountTokenExpirationSeconds)
+ .path(FUNCTION_AUTH_TOKEN));
+ projectedVolumeSource.addSourcesItem(volumeProjection);
+ return new V1Volume()
+ .name(SERVICE_ACCOUNT_VOLUME_NAME)
+ .projected(projectedVolumeSource);
+ }
+
+ private V1Volume createTrustCertVolume() {
+ return new V1Volume()
+ .name(TRUST_CERT_VOLUME_NAME)
+ .secret(new V1SecretVolumeSource()
+ .secretName(brokerTrustCertsSecretName)
+ .addItemsItem(new V1KeyToPath()
+ .key(FUNCTION_CA_CERT)
+ .path(FUNCTION_CA_CERT)));
+ }
+
+ private void addVolumeMountsToContainer(V1Container container) {
+ container.addVolumeMountsItem(
+ new V1VolumeMount()
+ .name(SERVICE_ACCOUNT_VOLUME_NAME)
+ .mountPath(DEFAULT_MOUNT_DIR)
+ .readOnly(true));
+ if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
+ container.addVolumeMountsItem(
+ new V1VolumeMount()
+ .name(TRUST_CERT_VOLUME_NAME)
+ .mountPath(DEFAULT_MOUNT_DIR)
+ .readOnly(true));
+ }
+ }
+}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 692884a303d..895304138a5 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -252,7 +252,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
kubernetesFunctionAuthProvider.initialize(coreClient,
serverCaBytes,
(funcDetails) -> getRuntimeCustomizer()
.map((customizer) ->
customizer.customizeNamespace(funcDetails, jobNamespace))
- .orElse(jobNamespace));
+ .orElse(jobNamespace),
factoryConfig.getKubernetesFunctionAuthProviderConfig());
this.authProvider =
Optional.of(kubernetesFunctionAuthProvider);
}
} else {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index 5cbca7b65bf..43cdc035076 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.runtime.kubernetes;
+import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -169,4 +170,9 @@ public class KubernetesRuntimeFactoryConfig {
)
protected int gracePeriodSeconds = 5;
+ @FieldContext(
+ doc = "A map of custom configurations passed to implementations of
the KubernetesFunctionAuthProvider"
+ + " interface."
+ )
+ private Map<String, Object> kubernetesFunctionAuthProviderConfig = new
HashMap<>();
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProviderTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProviderTest.java
new file mode 100644
index 00000000000..2e8cf751510
--- /dev/null
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProviderTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ServiceAccountTokenProjection;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Volume;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class KubernetesServiceAccountTokenAuthProviderTest {
+ @Test
+ public void testConfigureAuthDataStatefulSet() {
+ HashMap<String, Object> config = new HashMap<>();
+ config.put("brokerClientTrustCertsSecretName", "my-secret");
+ config.put("serviceAccountTokenExpirationSeconds", "600");
+ config.put("serviceAccountTokenAudience", "my-audience");
+ KubernetesServiceAccountTokenAuthProvider provider = new
KubernetesServiceAccountTokenAuthProvider();
+ provider.initialize(null, null, (fd) -> "default", config);
+
+ // Create a stateful set with a container
+ V1StatefulSet statefulSet = new V1StatefulSet();
+ statefulSet.setSpec(
+ new V1StatefulSetSpec().template(
+ new V1PodTemplateSpec().spec(
+ new V1PodSpec().containers(
+ Collections.singletonList(new
V1Container())))));
+ provider.configureAuthDataStatefulSet(statefulSet, Optional.empty());
+
+ List<V1Volume> volumes =
statefulSet.getSpec().getTemplate().getSpec().getVolumes();
+ Assert.assertEquals(volumes.size(), 2);
+
+ Assert.assertEquals(volumes.get(0).getName(), "ca-cert");
+ Assert.assertEquals(volumes.get(0).getSecret().getSecretName(),
"my-secret");
+ Assert.assertEquals(volumes.get(0).getSecret().getItems().size(), 1);
+
Assert.assertEquals(volumes.get(0).getSecret().getItems().get(0).getKey(),
"ca.crt");
+
Assert.assertEquals(volumes.get(0).getSecret().getItems().get(0).getPath(),
"ca.crt");
+
+
+ Assert.assertEquals(volumes.get(1).getName(), "service-account-token");
+ Assert.assertEquals(volumes.get(1).getProjected().getSources().size(),
1);
+ V1ServiceAccountTokenProjection tokenProjection =
+
volumes.get(1).getProjected().getSources().get(0).getServiceAccountToken();
+ Assert.assertEquals(tokenProjection.getExpirationSeconds(), 600);
+ Assert.assertEquals(tokenProjection.getAudience(), "my-audience");
+
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(),
1);
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(),
2);
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(),
"service-account-token");
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(),
"/etc/auth");
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(1).getName(),
"ca-cert");
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(1).getMountPath(),
"/etc/auth");
+ }
+
+ @Test
+ public void testConfigureAuthDataStatefulSetNoCa() {
+
+ HashMap<String, Object> config = new HashMap<>();
+ config.put("serviceAccountTokenExpirationSeconds", "600");
+ config.put("serviceAccountTokenAudience", "pulsar-cluster");
+ KubernetesServiceAccountTokenAuthProvider provider = new
KubernetesServiceAccountTokenAuthProvider();
+ provider.initialize(null, null, (fd) -> "default", config);
+
+ // Create a stateful set with a container
+ V1StatefulSet statefulSet = new V1StatefulSet();
+ statefulSet.setSpec(
+ new V1StatefulSetSpec().template(
+ new V1PodTemplateSpec().spec(
+ new V1PodSpec().containers(
+ Collections.singletonList(new
V1Container())))));
+ provider.configureAuthDataStatefulSet(statefulSet, Optional.empty());
+
+ List<V1Volume> volumes =
statefulSet.getSpec().getTemplate().getSpec().getVolumes();
+ Assert.assertEquals(volumes.size(), 1);
+
+ Assert.assertEquals(volumes.get(0).getName(), "service-account-token");
+ Assert.assertEquals(volumes.get(0).getProjected().getSources().size(),
1);
+ V1ServiceAccountTokenProjection tokenProjection =
+
volumes.get(0).getProjected().getSources().get(0).getServiceAccountToken();
+ Assert.assertEquals(tokenProjection.getExpirationSeconds(), 600);
+ Assert.assertEquals(tokenProjection.getAudience(), "pulsar-cluster");
+
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(),
1);
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(),
1);
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(),
"service-account-token");
+
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(),
"/etc/auth");
+ }
+
+ @Test
+ public void configureAuthenticationConfig() {
+ HashMap<String, Object> config = new HashMap<>();
+ config.put("brokerClientTrustCertsSecretName", "my-secret");
+ KubernetesServiceAccountTokenAuthProvider provider = new
KubernetesServiceAccountTokenAuthProvider();
+ provider.initialize(null, null, (fd) -> "default", config);
+ AuthenticationConfig authenticationConfig =
AuthenticationConfig.builder().build();
+ provider.configureAuthenticationConfig(authenticationConfig,
Optional.empty());
+
+
Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(),
AuthenticationToken.class.getName());
+
Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(),
"file:///etc/auth/token");
+ Assert.assertEquals(authenticationConfig.getTlsTrustCertsFilePath(),
"/etc/auth/ca.crt");
+ }
+}