This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f9b795  [pulsar-functions] enhance kubernetes manifest customizer 
with default options (#9445)
6f9b795 is described below

commit 6f9b795976b7b1a0f4bcdc6db76eb6c3d0c584e3
Author: Rui Fu <[email protected]>
AuthorDate: Fri Feb 5 18:56:53 2021 +0800

    [pulsar-functions] enhance kubernetes manifest customizer with default 
options (#9445)
    
    ### Motivation
    
    The KubernetesManifestCustomizer was introduced by customizing the stateful 
set of running Pulsar Functions. but no default value was loaded from 
`functions_worker.yaml`.
    
    ### Modifications
    
    Add load default runtime options in `BasicKubernetesManifestCustomizer`
    Add unit tests
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
---
 conf/functions_worker.yml                          |  14 ++
 .../BasicKubernetesManifestCustomizer.java         | 117 ++++++++++-
 .../BasicKubernetesManifestCustomizerTest.java     |  96 +++++++++
 .../kubernetes/KubernetesRuntimeFactoryTest.java   |   3 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 217 +++++++++++++++++----
 5 files changed, 400 insertions(+), 47 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index c2fd3b8..33cc9d3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -229,6 +229,20 @@ functionRuntimeFactoryConfigs:
 ## This class receives the customRuntimeOptions string and can customize 
details of how the runtime operates.
 #runtimeCustomizerClassName: 
"org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer"
 
+## This config will pass to RuntimeCustomizer's initialize function to do 
initializing.
+#runtimeCustomizerConfig:
+#   extractLabels:
+#       extraLabel: value
+#   extraAnnotations:
+#       extraAnnotation: value
+#   nodeSelectorLabels:
+#       customLabel: value
+#   jobNamespace: namespace
+#   tolerations:
+#   - key: custom-key
+#     value: value
+#     effect: NoSchedule
+
 ## Config admin CLI
 #configAdminCLI:
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
index a9a6929..c69d315 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
@@ -21,12 +21,18 @@ package org.apache.pulsar.functions.runtime.kubernetes;
 import com.google.gson.Gson;
 import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.openapi.models.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -39,6 +45,7 @@ import java.util.Map;
  * modify (for example, a service account must have permissions in the 
specified jobNamespace)
  *
  */
+@Slf4j
 public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCustomizer {
 
     private static final String RESOURCE_CPU = "cpu";
@@ -48,7 +55,9 @@ public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCust
     @Getter
     @Setter
     @NoArgsConstructor
-    static private class RuntimeOpts {
+    @AllArgsConstructor
+    @Builder(toBuilder = true)
+    static public class RuntimeOpts {
         private String jobNamespace;
         private String jobName;
         private Map<String, String> extraLabels;
@@ -58,13 +67,25 @@ public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCust
         private List<V1Toleration> tolerations;
     }
 
+    @Getter
+    private RuntimeOpts runtimeOpts = new RuntimeOpts();
+
     @Override
     public void initialize(Map<String, Object> config) {
+        if (config != null) {
+            RuntimeOpts opts = 
ObjectMapperFactory.getThreadLocal().convertValue(config, RuntimeOpts.class);
+            if (opts != null) {
+                runtimeOpts = opts.toBuilder().build();
+            }
+        } else {
+            log.warn("initialize with null config");
+        }
     }
 
     @Override
     public String customizeNamespace(Function.FunctionDetails funcDetails, 
String currentNamespace) {
         RuntimeOpts opts = getOptsFromDetails(funcDetails);
+        opts = mergeRuntimeOpts(runtimeOpts, opts);
         if (!StringUtils.isEmpty(opts.getJobNamespace())) {
             return opts.getJobNamespace();
         } else {
@@ -75,6 +96,7 @@ public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCust
     @Override
     public String customizeName(Function.FunctionDetails funcDetails, String 
currentName) {
         RuntimeOpts opts = getOptsFromDetails(funcDetails);
+        opts = mergeRuntimeOpts(runtimeOpts, opts);
         if (!StringUtils.isEmpty(opts.getJobName())) {
             return opts.getJobName();
         } else {
@@ -85,24 +107,27 @@ public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCust
     @Override
     public V1Service customizeService(Function.FunctionDetails funcDetails, 
V1Service service) {
         RuntimeOpts opts = getOptsFromDetails(funcDetails);
+        opts = mergeRuntimeOpts(runtimeOpts, opts);
         service.setMetadata(updateMeta(opts, service.getMetadata()));
         return service;
     }
 
     @Override
     public V1StatefulSet customizeStatefulSet(Function.FunctionDetails 
funcDetails, V1StatefulSet statefulSet) {
-        RuntimeOpts opts = getOptsFromDetails(funcDetails);
+        RuntimeOpts opts = mergeRuntimeOpts(runtimeOpts, 
getOptsFromDetails(funcDetails));
         statefulSet.setMetadata(updateMeta(opts, statefulSet.getMetadata()));
         V1PodTemplateSpec pt = statefulSet.getSpec().getTemplate();
         pt.setMetadata(updateMeta(opts, pt.getMetadata()));
         V1PodSpec ps = pt.getSpec();
-        if (opts.getNodeSelectorLabels() != null && 
opts.getNodeSelectorLabels().size() > 0) {
-            opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
-        }
-        if (opts.getTolerations() != null && opts.getTolerations().size() > 0) 
{
-            opts.getTolerations().forEach(ps::addTolerationsItem);
+        if (ps != null) {
+            if (opts.getNodeSelectorLabels() != null && 
opts.getNodeSelectorLabels().size() > 0) {
+                opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
+            }
+            if (opts.getTolerations() != null && opts.getTolerations().size() 
> 0) {
+                opts.getTolerations().forEach(ps::addTolerationsItem);
+            }
+            ps.getContainers().forEach(container -> 
updateContainerResources(container, opts));
         }
-        ps.getContainers().forEach(container -> 
updateContainerResources(container, opts));
         return statefulSet;
     }
 
@@ -113,10 +138,10 @@ public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCust
             Map<String, Quantity> limits = resourceRequirements.getLimits();
             Map<String, Quantity> requests = 
resourceRequirements.getRequests();
             for (String resource : RESOURCES) {
-                if (limits.containsKey(resource)) {
+                if (limits != null && limits.containsKey(resource)) {
                     containerResources.putLimitsItem(resource, 
limits.get(resource));
                 }
-                if (requests.containsKey(resource)) {
+                if (requests != null && requests.containsKey(resource)) {
                     containerResources.putRequestsItem(resource, 
requests.get(resource));
                 }
             }
@@ -143,4 +168,76 @@ public class BasicKubernetesManifestCustomizer implements 
KubernetesManifestCust
         return meta;
     }
 
+    public static RuntimeOpts mergeRuntimeOpts(RuntimeOpts oriOpts, 
RuntimeOpts newOpts) {
+        RuntimeOpts mergedOpts = oriOpts.toBuilder().build();
+        if (mergedOpts.getExtraLabels() == null) {
+            mergedOpts.setExtraLabels(new HashMap<>());
+        }
+        if (mergedOpts.getExtraAnnotations() == null) {
+            mergedOpts.setExtraAnnotations(new HashMap<>());
+        }
+        if (mergedOpts.getNodeSelectorLabels() == null) {
+            mergedOpts.setNodeSelectorLabels(new HashMap<>());
+        }
+        if (mergedOpts.getTolerations() == null) {
+            mergedOpts.setTolerations(new ArrayList<>());
+        }
+        if (mergedOpts.getResourceRequirements() == null) {
+            mergedOpts.setResourceRequirements(new V1ResourceRequirements());
+        }
+
+        if (!StringUtils.isEmpty(newOpts.getJobName())) {
+            mergedOpts.setJobName(newOpts.getJobName());
+        }
+        if (!StringUtils.isEmpty(newOpts.getJobNamespace())) {
+            mergedOpts.setJobNamespace(newOpts.getJobNamespace());
+        }
+        if (newOpts.getExtraLabels() != null && 
!newOpts.getExtraLabels().isEmpty()) {
+            newOpts.getExtraLabels().forEach((key, labelsItem) -> {
+                if (!mergedOpts.getExtraLabels().containsKey(key)) {
+                    log.debug("extra label {} has been changed to {}", key, 
labelsItem);
+                }
+                mergedOpts.getExtraLabels().put(key, labelsItem);
+            });
+        }
+        if (newOpts.getExtraAnnotations() != null && 
!newOpts.getExtraAnnotations().isEmpty()) {
+            newOpts.getExtraAnnotations().forEach((key, annotationsItem) -> {
+                if (!mergedOpts.getExtraAnnotations().containsKey(key)) {
+                    log.debug("extra annotation {} has been changed to {}", 
key, annotationsItem);
+                }
+                mergedOpts.getExtraAnnotations().put(key, annotationsItem);
+            });
+        }
+        if (newOpts.getNodeSelectorLabels() != null && 
!newOpts.getNodeSelectorLabels().isEmpty()) {
+            newOpts.getNodeSelectorLabels().forEach((key, nodeSelectorItem) -> 
{
+                if (!mergedOpts.getNodeSelectorLabels().containsKey(key)) {
+                    log.debug("node selector label {} has been changed to {}", 
key, nodeSelectorItem);
+                }
+                mergedOpts.getNodeSelectorLabels().put(key, nodeSelectorItem);
+            });
+        }
+
+        if (newOpts.getResourceRequirements() != null) {
+            V1ResourceRequirements mergedResourcesRequirements = 
mergedOpts.getResourceRequirements();
+            V1ResourceRequirements newResourcesRequirements = 
newOpts.getResourceRequirements();
+
+            Map<String, Quantity> limits = 
newResourcesRequirements.getLimits();
+            Map<String, Quantity> requests = 
newResourcesRequirements.getRequests();
+            for (String resource : RESOURCES) {
+                if (limits != null && limits.containsKey(resource)) {
+                    mergedResourcesRequirements.putLimitsItem(resource, 
limits.get(resource));
+                }
+                if (requests != null && requests.containsKey(resource)) {
+                    mergedResourcesRequirements.putRequestsItem(resource, 
requests.get(resource));
+                }
+            }
+            mergedOpts.setResourceRequirements(mergedResourcesRequirements);
+        }
+
+        if (newOpts.getTolerations() != null && 
!newOpts.getTolerations().isEmpty()) {
+            mergedOpts.getTolerations().addAll(newOpts.getTolerations());
+        }
+        return mergedOpts;
+    }
+
 }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizerTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizerTest.java
new file mode 100644
index 0000000..1b38306
--- /dev/null
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.kubernetes;
+
+import com.google.gson.Gson;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
+
+/**
+ * Unit test of {@link BasicKubernetesManifestCustomizerTest}.
+ */
+public class BasicKubernetesManifestCustomizerTest {
+
+    @Test
+    public void TestInitializeWithNullData() {
+        BasicKubernetesManifestCustomizer customizer = new 
BasicKubernetesManifestCustomizer();
+        customizer.initialize(null);
+        assertNotEquals(customizer.getRuntimeOpts(), null);
+        assertNull(customizer.getRuntimeOpts().getExtraLabels());
+        assertNull(customizer.getRuntimeOpts().getExtraAnnotations());
+        assertNull(customizer.getRuntimeOpts().getNodeSelectorLabels());
+        assertNull(customizer.getRuntimeOpts().getTolerations());
+        assertNull(customizer.getRuntimeOpts().getResourceRequirements());
+    }
+
+    @Test
+    public void TestInitializeWithData() {
+        BasicKubernetesManifestCustomizer customizer = new 
BasicKubernetesManifestCustomizer();
+        Map<String, Object> confs = new HashMap<>();
+        confs.put("jobNamespace", "custom-ns");
+        confs.put("jobName", "custom-name");
+        customizer.initialize(confs);
+        assertNotEquals(customizer.getRuntimeOpts(), null);
+        assertEquals(customizer.getRuntimeOpts().getJobName(), "custom-name");
+        assertEquals(customizer.getRuntimeOpts().getJobNamespace(), 
"custom-ns");
+    }
+
+    @Test
+    public void TestMergeRuntimeOpts() {
+        Map<String, Object> configs = new 
Gson().fromJson(KubernetesRuntimeTest.createRuntimeCustomizerConfig(), 
HashMap.class);
+        BasicKubernetesManifestCustomizer customizer = new 
BasicKubernetesManifestCustomizer();
+        customizer.initialize(configs);
+        BasicKubernetesManifestCustomizer.RuntimeOpts newOpts = new 
BasicKubernetesManifestCustomizer.RuntimeOpts();
+        newOpts.setJobName("merged-name");
+        newOpts.setTolerations(Collections.emptyList());
+        V1Toleration toleration = new V1Toleration();
+        toleration.setKey("merge-key");
+        toleration.setEffect("NoSchedule");
+        toleration.setOperator("Equal");
+        toleration.setTolerationSeconds(6000L);
+        newOpts.setTolerations(Collections.singletonList(toleration));
+        V1ResourceRequirements resourceRequirements = new 
V1ResourceRequirements();
+        resourceRequirements.putLimitsItem("cpu", new Quantity("20"));
+        resourceRequirements.putLimitsItem("memory", new Quantity("10240"));
+        newOpts.setResourceRequirements(resourceRequirements);
+        newOpts.setNodeSelectorLabels(Collections.singletonMap("disktype", 
"ssd"));
+        newOpts.setExtraAnnotations(Collections.singletonMap("functiontype", 
"sink"));
+        newOpts.setExtraLabels(Collections.singletonMap("functiontype", 
"sink"));
+        BasicKubernetesManifestCustomizer.RuntimeOpts mergedOpts = 
BasicKubernetesManifestCustomizer.mergeRuntimeOpts(
+                customizer.getRuntimeOpts(), newOpts);
+
+        assertEquals(mergedOpts.getJobName(), "merged-name");
+        assertEquals(mergedOpts.getTolerations().size(), 2);
+        assertEquals(mergedOpts.getExtraAnnotations().size(), 2);
+        assertEquals(mergedOpts.getExtraLabels().size(), 2);
+        assertEquals(mergedOpts.getNodeSelectorLabels().size(), 2);
+        
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("cpu").getNumber().intValue(),
 20);
+        
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("memory").getNumber().intValue(),
 10240);
+    }
+}
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index a281187..dec1014 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -36,7 +36,6 @@ import 
org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
@@ -58,7 +57,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
 /**
- * Unit test of {@link ThreadRuntime}.
+ * Unit test of {@link KubernetesRuntimeFactoryTest}.
  */
 public class KubernetesRuntimeFactoryTest {
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index ebc4878..5c1c901 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.protobuf.util.JsonFormat;
@@ -216,6 +217,8 @@ public class KubernetesRuntimeTest {
         workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
         workerConfig.setAuthenticationEnabled(false);
 
+        manifestCustomizer.ifPresent(runtimeCustomizer -> 
runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
+
         factory.initialize(workerConfig, null, new 
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
         return factory;
     }
@@ -689,41 +692,7 @@ public class KubernetesRuntimeTest {
     public void testBasicKubernetesManifestCustomizer() throws Exception {
         InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
         
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, 
false, (fb) -> {
-            JsonObject configObj = new JsonObject();
-            configObj.addProperty("jobNamespace", "custom-ns");
-            configObj.addProperty("jobName", "custom-name");
-
-            JsonObject extraAnn = new JsonObject();
-            extraAnn.addProperty("annotation", "test");
-            configObj.add("extraAnnotations", extraAnn);
-
-            JsonObject extraLabel = new JsonObject();
-            extraLabel.addProperty("label", "test");
-            configObj.add("extraLabels", extraLabel);
-
-            JsonObject nodeLabels = new JsonObject();
-            nodeLabels.addProperty("selector", "test");
-            configObj.add("nodeSelectorLabels", nodeLabels);
-
-            JsonArray tolerations = new JsonArray();
-            JsonObject toleration = new JsonObject();
-            toleration.addProperty("key", "test");
-            toleration.addProperty("value", "test");
-            toleration.addProperty("effect", "test");
-            tolerations.add(toleration);
-            configObj.add("tolerations", tolerations);
-
-            JsonObject resourceRequirements = new JsonObject();
-            JsonObject requests = new JsonObject();
-            JsonObject limits = new JsonObject();
-            requests.addProperty("cpu", 1);
-            requests.addProperty("memory", "4G");
-            limits.addProperty("cpu", 2);
-            limits.addProperty("memory", "8G");
-            resourceRequirements.add("requests", requests);
-            resourceRequirements.add("limits", limits);
-            configObj.add("resourceRequirements", resourceRequirements);
-
+            JsonObject configObj = createRuntimeCustomizerConfig();
             return fb.setCustomRuntimeOptions(configObj.toString());
         }));
 
@@ -893,4 +862,182 @@ public class KubernetesRuntimeTest {
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
     }
+
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String 
extraDepsDir, int percentMemoryPadding,
+                                                            double 
cpuOverCommitRatio, double memoryOverCommitRatio,
+                                                            String 
manifestCustomizerClassName,
+                                                            Map<String, 
Object> runtimeCustomizerConfig) throws Exception {
+        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+        doNothing().when(factory).setupClient();
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new 
KubernetesRuntimeFactoryConfig();
+        kubernetesRuntimeFactoryConfig.setK8Uri(null);
+        kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+        kubernetesRuntimeFactoryConfig.setJobName(null);
+        kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+        kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
+        kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+        kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+        kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+        kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+        kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+        
kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+        
kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+        kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+        
kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(percentMemoryPadding);
+        
kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(cpuOverCommitRatio);
+        
kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(memoryOverCommitRatio);
+        kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+        kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+        kubernetesRuntimeFactoryConfig.setGrpcPort(4332);
+        kubernetesRuntimeFactoryConfig.setMetricsPort(4331);
+        
kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory);
+        
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                
ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig,
 Map.class));
+        workerConfig.setFunctionInstanceMinResources(null);
+        workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+        workerConfig.setAuthenticationEnabled(false);
+        workerConfig.setRuntimeCustomizerConfig(runtimeCustomizerConfig);
+        
workerConfig.setRuntimeCustomizerClassName(manifestCustomizerClassName);
+
+        Optional<RuntimeCustomizer> manifestCustomizer = Optional.empty();
+        if 
(!org.apache.commons.lang3.StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName()))
 {
+            manifestCustomizer = 
Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
+            
manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
+        }
+
+        factory.initialize(workerConfig, null, new 
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+        return factory;
+    }
+
+    public static JsonObject createRuntimeCustomizerConfig() {
+        JsonObject configObj = new JsonObject();
+        configObj.addProperty("jobNamespace", "custom-ns");
+        configObj.addProperty("jobName", "custom-name");
+
+        JsonObject extraAnn = new JsonObject();
+        extraAnn.addProperty("annotation", "test");
+        configObj.add("extraAnnotations", extraAnn);
+
+        JsonObject extraLabel = new JsonObject();
+        extraLabel.addProperty("label", "test");
+        configObj.add("extraLabels", extraLabel);
+
+        JsonObject nodeLabels = new JsonObject();
+        nodeLabels.addProperty("selector", "test");
+        configObj.add("nodeSelectorLabels", nodeLabels);
+
+        JsonArray tolerations = new JsonArray();
+        JsonObject toleration = new JsonObject();
+        toleration.addProperty("key", "test");
+        toleration.addProperty("value", "test");
+        toleration.addProperty("effect", "test");
+        tolerations.add(toleration);
+        configObj.add("tolerations", tolerations);
+
+        JsonObject resourceRequirements = new JsonObject();
+        JsonObject requests = new JsonObject();
+        JsonObject limits = new JsonObject();
+        requests.addProperty("cpu", "1");
+        requests.addProperty("memory", "4G");
+        limits.addProperty("cpu", "2");
+        limits.addProperty("memory", "8G");
+        resourceRequirements.add("requests", requests);
+        resourceRequirements.add("limits", limits);
+        configObj.add("resourceRequirements", resourceRequirements);
+        return configObj;
+    }
+
+    @Test
+    public void 
testBasicKubernetesManifestCustomizerWithRuntimeCustomizerConfig() throws 
Exception {
+        InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+        Map<String, Object> configs = new 
Gson().fromJson(createRuntimeCustomizerConfig(), HashMap.class);
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+                
"org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer",
 configs);
+
+        verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
+        KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
+        V1StatefulSet spec = container.createStatefulSet();
+        assertEquals(spec.getMetadata().getAnnotations().get("annotation"), 
"test");
+        assertEquals(spec.getMetadata().getLabels().get("label"), "test");
+        
assertEquals(spec.getSpec().getTemplate().getSpec().getNodeSelector().get("selector"),
 "test");
+        List<V1Toleration> tols = 
spec.getSpec().getTemplate().getSpec().getTolerations();
+        // we add three by default, plus our custom
+        assertEquals(tols.size(), 4);
+        assertEquals(tols.get(3).getKey(), "test");
+        assertEquals(tols.get(3).getValue(), "test");
+        assertEquals(tols.get(3).getEffect(), "test");
+
+        V1Service serviceSpec = container.createService();
+        assertEquals(serviceSpec.getMetadata().getNamespace(), "custom-ns");
+        assertEquals(serviceSpec.getMetadata().getName(), 
"custom-name-2deb2c2b");
+        
assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"), 
"test");
+        assertEquals(serviceSpec.getMetadata().getLabels().get("label"), 
"test");
+
+        List<V1Container> containers = 
spec.getSpec().getTemplate().getSpec().getContainers();
+        containers.forEach(c -> {
+            V1ResourceRequirements resources = c.getResources();
+            Map<String, Quantity> limits = resources.getLimits();
+            Map<String, Quantity> requests = resources.getRequests();
+            assertEquals(requests.get("cpu").getNumber(), new BigDecimal(1) );
+            assertEquals(limits.get("cpu").getNumber(), new BigDecimal(2) );
+            assertEquals(requests.get("memory").getNumber(), new 
BigDecimal(4000000000L) );
+            assertEquals(limits.get("memory").getNumber(), new 
BigDecimal(8000000000L) );
+        });
+
+    }
+
+
+    @Test
+    public void 
testBasicKubernetesManifestCustomizerWithRuntimeCustomizerConfigOverwrite() 
throws Exception {
+        InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+        
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, 
false, (fb) -> {
+            JsonObject configObj = new JsonObject();
+            configObj.addProperty("jobNamespace", "custom-ns-overwrite");
+            configObj.addProperty("jobName", "custom-name-overwrite");
+            return fb.setCustomRuntimeOptions(configObj.toString());
+        }));
+
+        Map<String, Object> configs = new 
Gson().fromJson(createRuntimeCustomizerConfig(), HashMap.class);
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+                
"org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer",
 configs);
+
+        verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
+        KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
+        V1StatefulSet spec = container.createStatefulSet();
+        assertEquals(spec.getMetadata().getAnnotations().get("annotation"), 
"test");
+        assertEquals(spec.getMetadata().getLabels().get("label"), "test");
+        
assertEquals(spec.getSpec().getTemplate().getSpec().getNodeSelector().get("selector"),
 "test");
+        List<V1Toleration> tols = 
spec.getSpec().getTemplate().getSpec().getTolerations();
+        // we add three by default, plus our custom
+        assertEquals(tols.size(), 4);
+        assertEquals(tols.get(3).getKey(), "test");
+        assertEquals(tols.get(3).getValue(), "test");
+        assertEquals(tols.get(3).getEffect(), "test");
+
+        V1Service serviceSpec = container.createService();
+        assertEquals(serviceSpec.getMetadata().getNamespace(), 
"custom-ns-overwrite");
+        assertEquals(serviceSpec.getMetadata().getName(), 
"custom-name-overwrite-7757f1ff");
+        
assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"), 
"test");
+        assertEquals(serviceSpec.getMetadata().getLabels().get("label"), 
"test");
+
+        List<V1Container> containers = 
spec.getSpec().getTemplate().getSpec().getContainers();
+        containers.forEach(c -> {
+            V1ResourceRequirements resources = c.getResources();
+            Map<String, Quantity> limits = resources.getLimits();
+            Map<String, Quantity> requests = resources.getRequests();
+            assertEquals(requests.get("cpu").getNumber(), new BigDecimal(1) );
+            assertEquals(limits.get("cpu").getNumber(), new BigDecimal(2) );
+            assertEquals(requests.get("memory").getNumber(), new 
BigDecimal(4000000000L) );
+            assertEquals(limits.get("memory").getNumber(), new 
BigDecimal(8000000000L) );
+        });
+
+    }
 }

Reply via email to