This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 067e3c038ed [feat][fn] PIP-257: Support mounting k8s ServiceAccount 
for OIDC auth (#19888)
067e3c038ed is described below

commit 067e3c038edf406b27f8f91847f688f559a2ba25
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Apr 6 08:45:58 2023 -0500

    [feat][fn] PIP-257: Support mounting k8s ServiceAccount for OIDC auth 
(#19888)
    
    PIP: #19771
    
    ### Motivation
    
    In order to make OIDC work with functions, we must give them a way to 
authenticate with the broker using tokens that are able to be validated by an 
using an Authorization Server. This PR introduces the 
`KubernetesServiceAccountAuthProvider`.
    
    ### Modifications
    
    * Create an `KubernetesServiceAccountAuthProvider` implementation. It adds 
a service account token volume projection as defined in the k8s docs 
[here](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection).
 The implementation provides a way to specify the expiration time that the 
token will receive.
    
    * Instead of creating a secret with the broker's trusted `ca.crt` in it, 
this new `KubernetesServiceAccountAuthProvider` expects a secret to already 
exist with the `ca.crt`. The major advantage for this implementation is that 
when the `ca.crt` is rotated, we can refresh it (assuming the client is 
configured to observe the updated file).
    
    * Add support for specifying the token's expiration and audience.
    
    * One point of divergence from the `KubernetesSecretsTokenAuthProvider` 
implementation is that I did not provide a way for functions to authenticate as 
the anonymous role. It seems like a stretch that functions would use such 
authentication because it will not be multi-tenant. However, if that is a 
concern, we can add the support.
    
    * The feature will be configured with the following yaml:
    
    ```yaml
    functionRuntimeFactoryConfigs:
      kubernetesFunctionAuthProviderConfig:
        brokerClientTrustCertsSecretName: "secret-name"
        serviceAccountTokenExpirationSeconds: 3600
        serviceAccountTokenAudience: "pulsar-cluster-audience"
    ```
    
    ### Verifying this change
    
    I verified the correctness of the code with unit tests. I'll verify the 
integration with k8s once we've determined this PR's design is correct.
    
    ### Does this pull request potentially affect one of the following parts:
    
    This adds new configuration options to the function worker.
    
    ### Documentation
    
    - [x] `doc-required`
    
    ### Matching PR in forked repository
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/36
---
 .../auth/KubernetesFunctionAuthProvider.java       |  12 ++
 .../KubernetesServiceAccountTokenAuthProvider.java | 208 +++++++++++++++++++++
 .../kubernetes/KubernetesRuntimeFactory.java       |   2 +-
 .../kubernetes/KubernetesRuntimeFactoryConfig.java |   6 +
 ...ernetesServiceAccountTokenAuthProviderTest.java | 128 +++++++++++++
 5 files changed, 355 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index 29365424f54..3d05d519552 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.auth;
 
 import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.proto.Function;
@@ -31,6 +32,11 @@ public interface KubernetesFunctionAuthProvider extends 
FunctionAuthProvider {
 
     void initialize(CoreV1Api coreClient);
 
+    /**
+     * @deprecated use
+     * {@link #initialize(CoreV1Api, byte[], java.util.function.Function, Map)}
+     */
+    @Deprecated(since = "3.0.0")
     default void initialize(CoreV1Api coreClient, byte[] caBytes,
                             
java.util.function.Function<Function.FunctionDetails, String> 
namespaceCustomizerFunc) {
         setCaBytes(caBytes);
@@ -38,6 +44,12 @@ public interface KubernetesFunctionAuthProvider extends 
FunctionAuthProvider {
         initialize(coreClient);
     }
 
+    default void initialize(CoreV1Api coreClient, byte[] caBytes,
+                            
java.util.function.Function<Function.FunctionDetails, String> 
namespaceCustomizerFunc,
+                            Map<String, Object> config) {
+        initialize(coreClient, caBytes, namespaceCustomizerFunc);
+    }
+
     default void setCaBytes(byte[] caBytes) {
 
     }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProvider.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProvider.java
new file mode 100644
index 00000000000..06fadec42d4
--- /dev/null
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProvider.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1KeyToPath;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1ProjectedVolumeSource;
+import io.kubernetes.client.openapi.models.V1SecretVolumeSource;
+import io.kubernetes.client.openapi.models.V1ServiceAccountTokenProjection;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeProjection;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.eclipse.jetty.util.StringUtil;
+
+/**
+ * Kubernetes Function Authentication Provider that adds Service Account Token 
Projection to a function pod's container
+ * definition. This token can be used to authenticate the function instance 
with the broker and the function worker via
+ * OpenId Connect when each server is configured to trust the kubernetes 
issuer. See docs for additional details.
+ * Relevant settings:
+ * <p>
+ *     brokerClientTrustCertsSecretName: The Kubernetes secret containing the 
broker's trust certs. If it is not set,
+ *     the function will not use a custom trust store. The secret must already 
exist in each function's target
+ *     namespace. The secret must contain a key named `ca.crt` with the trust 
certs. Only the ca.crt will be mounted.
+ * </p>
+ * <p>
+ *     serviceAccountTokenExpirationSeconds: The expiration for the token 
created by the
+ *     {@link KubernetesServiceAccountTokenAuthProvider}. The default value is 
3600 seconds.
+ * </p>
+ * <p>
+ *     serviceAccountTokenAudience: The audience for the token created by the
+ *     {@link KubernetesServiceAccountTokenAuthProvider}.
+ * </p>
+ * Note: the pod inherits the namespace's default service account.
+ */
+public class KubernetesServiceAccountTokenAuthProvider implements 
KubernetesFunctionAuthProvider {
+
+    private static final String BROKER_CLIENT_TRUST_CERTS_SECRET_NAME = 
"brokerClientTrustCertsSecretName";
+    private static final String SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS = 
"serviceAccountTokenExpirationSeconds";
+    private static final String SERVICE_ACCOUNT_TOKEN_AUDIENCE = 
"serviceAccountTokenAudience";
+
+    private static final String SERVICE_ACCOUNT_VOLUME_NAME = 
"service-account-token";
+    private static final String TRUST_CERT_VOLUME_NAME = "ca-cert";
+    private static final String DEFAULT_MOUNT_DIR = "/etc/auth";
+    private static final String FUNCTION_AUTH_TOKEN = "token";
+    private static final String FUNCTION_CA_CERT = "ca.crt";
+    private static final String DEFAULT_CERT_PATH = DEFAULT_MOUNT_DIR + "/" + 
FUNCTION_CA_CERT;
+    private String brokerTrustCertsSecretName;
+    private long serviceAccountTokenExpirationSeconds;
+    private String serviceAccountTokenAudience;
+
+    @Override
+    public void initialize(CoreV1Api coreClient, byte[] caBytes,
+                           
java.util.function.Function<Function.FunctionDetails, String> 
namespaceCustomizerFunc,
+                           Map<String, Object> config) {
+        setNamespaceProviderFunc(namespaceCustomizerFunc);
+        Object certSecretName = 
config.get(BROKER_CLIENT_TRUST_CERTS_SECRET_NAME);
+        if (certSecretName instanceof String) {
+            brokerTrustCertsSecretName = (String) certSecretName;
+        } else if (certSecretName != null) {
+            // Throw exception because user set this configuration, but it 
isn't valid.
+            throw new IllegalArgumentException("Invalid value for " + 
BROKER_CLIENT_TRUST_CERTS_SECRET_NAME
+                    + ". Expected a string.");
+        }
+        Object tokenExpirationSeconds = 
config.get(SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS);
+        if (tokenExpirationSeconds instanceof Long) {
+            serviceAccountTokenExpirationSeconds = (Long) 
tokenExpirationSeconds;
+        } else if (tokenExpirationSeconds instanceof String) {
+            try {
+                serviceAccountTokenExpirationSeconds = Long.parseLong((String) 
tokenExpirationSeconds);
+            } catch (NumberFormatException e) {
+                throw new IllegalArgumentException("Invalid value for " + 
SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
+                        + ". Expected a long.");
+            }
+        } else if (tokenExpirationSeconds != null) {
+            // Throw exception because user set this configuration, but it 
isn't valid.
+            throw new IllegalArgumentException("Invalid value for " + 
SERVICE_ACCOUNT_TOKEN_EXPIRATION_SECONDS
+                    + ". Expected a long.");
+        }
+        Object tokenAudience = config.get(SERVICE_ACCOUNT_TOKEN_AUDIENCE);
+        if (tokenAudience instanceof String) {
+            serviceAccountTokenAudience = (String) tokenAudience;
+        } else if (tokenAudience != null) {
+            throw new IllegalArgumentException("Invalid value for " + 
SERVICE_ACCOUNT_TOKEN_AUDIENCE
+                    + ". Expected a string.");
+        }
+    }
+
+    @Override
+    public void configureAuthenticationConfig(AuthenticationConfig authConfig,
+                                              Optional<FunctionAuthData> 
functionAuthData) {
+        
authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        
authConfig.setClientAuthenticationParameters(Paths.get(DEFAULT_MOUNT_DIR, 
FUNCTION_AUTH_TOKEN)
+                .toUri().toString());
+        if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
+            authConfig.setTlsTrustCertsFilePath(DEFAULT_CERT_PATH);
+        }
+    }
+
+    /**
+     * No need to cache anything. Kubernetes generates the token used for 
authentication.
+     */
+    @Override
+    public Optional<FunctionAuthData> cacheAuthData(Function.FunctionDetails 
funcDetails,
+                                                    AuthenticationDataSource 
authenticationDataSource)
+            throws Exception {
+        return Optional.empty();
+    }
+
+    /**
+     * No need to update anything. Kubernetes updates the token used for 
authentication.
+     */
+    @Override
+    public Optional<FunctionAuthData> updateAuthData(Function.FunctionDetails 
funcDetails,
+                                                     
Optional<FunctionAuthData> existingFunctionAuthData,
+                                                     AuthenticationDataSource 
authenticationDataSource)
+            throws Exception {
+        return Optional.empty();
+    }
+
+    /**
+     * No need to clean up anything. Kubernetes cleans up the secret when the 
pod is deleted.
+     */
+    @Override
+    public void cleanUpAuthData(Function.FunctionDetails funcDetails, 
Optional<FunctionAuthData> functionAuthData)
+            throws Exception {
+
+    }
+
+    @Override
+    public void initialize(CoreV1Api coreClient) {
+    }
+
+    @Override
+    public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, 
Optional<FunctionAuthData> functionAuthData) {
+        V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
+        // configure pod mount secret with auth token
+        if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
+            podSpec.addVolumesItem(createTrustCertVolume());
+        }
+        podSpec.addVolumesItem(createServiceAccountVolume());
+        podSpec.getContainers().forEach(this::addVolumeMountsToContainer);
+    }
+
+    private V1Volume createServiceAccountVolume() {
+        V1ProjectedVolumeSource projectedVolumeSource = new 
V1ProjectedVolumeSource();
+        V1VolumeProjection volumeProjection = new V1VolumeProjection();
+        volumeProjection.serviceAccountToken(
+                new V1ServiceAccountTokenProjection()
+                        .audience(serviceAccountTokenAudience)
+                        
.expirationSeconds(serviceAccountTokenExpirationSeconds)
+                        .path(FUNCTION_AUTH_TOKEN));
+        projectedVolumeSource.addSourcesItem(volumeProjection);
+        return new V1Volume()
+                .name(SERVICE_ACCOUNT_VOLUME_NAME)
+                .projected(projectedVolumeSource);
+    }
+
+    private V1Volume createTrustCertVolume() {
+        return new V1Volume()
+                .name(TRUST_CERT_VOLUME_NAME)
+                .secret(new V1SecretVolumeSource()
+                        .secretName(brokerTrustCertsSecretName)
+                        .addItemsItem(new V1KeyToPath()
+                                .key(FUNCTION_CA_CERT)
+                                .path(FUNCTION_CA_CERT)));
+    }
+
+    private void addVolumeMountsToContainer(V1Container container) {
+        container.addVolumeMountsItem(
+                new V1VolumeMount()
+                        .name(SERVICE_ACCOUNT_VOLUME_NAME)
+                        .mountPath(DEFAULT_MOUNT_DIR)
+                        .readOnly(true));
+        if (StringUtil.isNotBlank(brokerTrustCertsSecretName)) {
+            container.addVolumeMountsItem(
+                    new V1VolumeMount()
+                            .name(TRUST_CERT_VOLUME_NAME)
+                            .mountPath(DEFAULT_MOUNT_DIR)
+                            .readOnly(true));
+        }
+    }
+}
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 692884a303d..895304138a5 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -252,7 +252,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
                 kubernetesFunctionAuthProvider.initialize(coreClient, 
serverCaBytes,
                         (funcDetails) -> getRuntimeCustomizer()
                                 .map((customizer) -> 
customizer.customizeNamespace(funcDetails, jobNamespace))
-                                .orElse(jobNamespace));
+                                .orElse(jobNamespace), 
factoryConfig.getKubernetesFunctionAuthProviderConfig());
                 this.authProvider = 
Optional.of(kubernetesFunctionAuthProvider);
             }
         } else {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index 5cbca7b65bf..43cdc035076 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.runtime.kubernetes;
 
+import java.util.HashMap;
 import java.util.Map;
 import lombok.Data;
 import lombok.experimental.Accessors;
@@ -169,4 +170,9 @@ public class KubernetesRuntimeFactoryConfig {
     )
     protected int gracePeriodSeconds = 5;
 
+    @FieldContext(
+            doc = "A map of custom configurations passed to implementations of 
the KubernetesFunctionAuthProvider"
+                    + " interface."
+    )
+    private Map<String, Object> kubernetesFunctionAuthProviderConfig = new 
HashMap<>();
 }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProviderTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProviderTest.java
new file mode 100644
index 00000000000..2e8cf751510
--- /dev/null
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesServiceAccountTokenAuthProviderTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ServiceAccountTokenProjection;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Volume;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class KubernetesServiceAccountTokenAuthProviderTest {
+    @Test
+    public void testConfigureAuthDataStatefulSet() {
+        HashMap<String, Object> config = new HashMap<>();
+        config.put("brokerClientTrustCertsSecretName", "my-secret");
+        config.put("serviceAccountTokenExpirationSeconds", "600");
+        config.put("serviceAccountTokenAudience", "my-audience");
+        KubernetesServiceAccountTokenAuthProvider provider = new 
KubernetesServiceAccountTokenAuthProvider();
+        provider.initialize(null, null, (fd) -> "default", config);
+
+        // Create a stateful set with a container
+        V1StatefulSet statefulSet = new V1StatefulSet();
+        statefulSet.setSpec(
+                new V1StatefulSetSpec().template(
+                        new V1PodTemplateSpec().spec(
+                                new V1PodSpec().containers(
+                                        Collections.singletonList(new 
V1Container())))));
+        provider.configureAuthDataStatefulSet(statefulSet, Optional.empty());
+
+        List<V1Volume> volumes = 
statefulSet.getSpec().getTemplate().getSpec().getVolumes();
+        Assert.assertEquals(volumes.size(), 2);
+
+        Assert.assertEquals(volumes.get(0).getName(), "ca-cert");
+        Assert.assertEquals(volumes.get(0).getSecret().getSecretName(), 
"my-secret");
+        Assert.assertEquals(volumes.get(0).getSecret().getItems().size(), 1);
+        
Assert.assertEquals(volumes.get(0).getSecret().getItems().get(0).getKey(), 
"ca.crt");
+        
Assert.assertEquals(volumes.get(0).getSecret().getItems().get(0).getPath(), 
"ca.crt");
+
+
+        Assert.assertEquals(volumes.get(1).getName(), "service-account-token");
+        Assert.assertEquals(volumes.get(1).getProjected().getSources().size(), 
1);
+        V1ServiceAccountTokenProjection tokenProjection =
+                
volumes.get(1).getProjected().getSources().get(0).getServiceAccountToken();
+        Assert.assertEquals(tokenProjection.getExpirationSeconds(), 600);
+        Assert.assertEquals(tokenProjection.getAudience(), "my-audience");
+
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(),
 1);
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(),
 2);
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(),
 "service-account-token");
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(),
 "/etc/auth");
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(1).getName(),
 "ca-cert");
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(1).getMountPath(),
 "/etc/auth");
+    }
+
+    @Test
+    public void testConfigureAuthDataStatefulSetNoCa() {
+
+        HashMap<String, Object> config = new HashMap<>();
+        config.put("serviceAccountTokenExpirationSeconds", "600");
+        config.put("serviceAccountTokenAudience", "pulsar-cluster");
+        KubernetesServiceAccountTokenAuthProvider provider = new 
KubernetesServiceAccountTokenAuthProvider();
+        provider.initialize(null, null, (fd) -> "default", config);
+
+        // Create a stateful set with a container
+        V1StatefulSet statefulSet = new V1StatefulSet();
+        statefulSet.setSpec(
+                new V1StatefulSetSpec().template(
+                        new V1PodTemplateSpec().spec(
+                                new V1PodSpec().containers(
+                                        Collections.singletonList(new 
V1Container())))));
+        provider.configureAuthDataStatefulSet(statefulSet, Optional.empty());
+
+        List<V1Volume> volumes = 
statefulSet.getSpec().getTemplate().getSpec().getVolumes();
+        Assert.assertEquals(volumes.size(), 1);
+
+        Assert.assertEquals(volumes.get(0).getName(), "service-account-token");
+        Assert.assertEquals(volumes.get(0).getProjected().getSources().size(), 
1);
+        V1ServiceAccountTokenProjection tokenProjection =
+                
volumes.get(0).getProjected().getSources().get(0).getServiceAccountToken();
+        Assert.assertEquals(tokenProjection.getExpirationSeconds(), 600);
+        Assert.assertEquals(tokenProjection.getAudience(), "pulsar-cluster");
+
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(),
 1);
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(),
 1);
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(),
 "service-account-token");
+        
Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(),
 "/etc/auth");
+    }
+
+    @Test
+    public void configureAuthenticationConfig() {
+        HashMap<String, Object> config = new HashMap<>();
+        config.put("brokerClientTrustCertsSecretName", "my-secret");
+        KubernetesServiceAccountTokenAuthProvider provider = new 
KubernetesServiceAccountTokenAuthProvider();
+        provider.initialize(null, null, (fd) -> "default", config);
+        AuthenticationConfig authenticationConfig = 
AuthenticationConfig.builder().build();
+        provider.configureAuthenticationConfig(authenticationConfig, 
Optional.empty());
+
+        
Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), 
AuthenticationToken.class.getName());
+        
Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), 
"file:///etc/auth/token");
+        Assert.assertEquals(authenticationConfig.getTlsTrustCertsFilePath(), 
"/etc/auth/ca.crt");
+    }
+}

Reply via email to