This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6f9b795 [pulsar-functions] enhance kubernetes manifest customizer
with default options (#9445)
6f9b795 is described below
commit 6f9b795976b7b1a0f4bcdc6db76eb6c3d0c584e3
Author: Rui Fu <[email protected]>
AuthorDate: Fri Feb 5 18:56:53 2021 +0800
[pulsar-functions] enhance kubernetes manifest customizer with default
options (#9445)
### Motivation
The KubernetesManifestCustomizer was introduced by customizing the stateful
set of running Pulsar Functions. but no default value was loaded from
`functions_worker.yaml`.
### Modifications
Add load default runtime options in `BasicKubernetesManifestCustomizer`
Add unit tests
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
---
conf/functions_worker.yml | 14 ++
.../BasicKubernetesManifestCustomizer.java | 117 ++++++++++-
.../BasicKubernetesManifestCustomizerTest.java | 96 +++++++++
.../kubernetes/KubernetesRuntimeFactoryTest.java | 3 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 217 +++++++++++++++++----
5 files changed, 400 insertions(+), 47 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index c2fd3b8..33cc9d3 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -229,6 +229,20 @@ functionRuntimeFactoryConfigs:
## This class receives the customRuntimeOptions string and can customize
details of how the runtime operates.
#runtimeCustomizerClassName:
"org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer"
+## This config will pass to RuntimeCustomizer's initialize function to do
initializing.
+#runtimeCustomizerConfig:
+# extractLabels:
+# extraLabel: value
+# extraAnnotations:
+# extraAnnotation: value
+# nodeSelectorLabels:
+# customLabel: value
+# jobNamespace: namespace
+# tolerations:
+# - key: custom-key
+# value: value
+# effect: NoSchedule
+
## Config admin CLI
#configAdminCLI:
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
index a9a6929..c69d315 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
@@ -21,12 +21,18 @@ package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.gson.Gson;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.openapi.models.*;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +45,7 @@ import java.util.Map;
* modify (for example, a service account must have permissions in the
specified jobNamespace)
*
*/
+@Slf4j
public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCustomizer {
private static final String RESOURCE_CPU = "cpu";
@@ -48,7 +55,9 @@ public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCust
@Getter
@Setter
@NoArgsConstructor
- static private class RuntimeOpts {
+ @AllArgsConstructor
+ @Builder(toBuilder = true)
+ static public class RuntimeOpts {
private String jobNamespace;
private String jobName;
private Map<String, String> extraLabels;
@@ -58,13 +67,25 @@ public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCust
private List<V1Toleration> tolerations;
}
+ @Getter
+ private RuntimeOpts runtimeOpts = new RuntimeOpts();
+
@Override
public void initialize(Map<String, Object> config) {
+ if (config != null) {
+ RuntimeOpts opts =
ObjectMapperFactory.getThreadLocal().convertValue(config, RuntimeOpts.class);
+ if (opts != null) {
+ runtimeOpts = opts.toBuilder().build();
+ }
+ } else {
+ log.warn("initialize with null config");
+ }
}
@Override
public String customizeNamespace(Function.FunctionDetails funcDetails,
String currentNamespace) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
+ opts = mergeRuntimeOpts(runtimeOpts, opts);
if (!StringUtils.isEmpty(opts.getJobNamespace())) {
return opts.getJobNamespace();
} else {
@@ -75,6 +96,7 @@ public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCust
@Override
public String customizeName(Function.FunctionDetails funcDetails, String
currentName) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
+ opts = mergeRuntimeOpts(runtimeOpts, opts);
if (!StringUtils.isEmpty(opts.getJobName())) {
return opts.getJobName();
} else {
@@ -85,24 +107,27 @@ public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCust
@Override
public V1Service customizeService(Function.FunctionDetails funcDetails,
V1Service service) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
+ opts = mergeRuntimeOpts(runtimeOpts, opts);
service.setMetadata(updateMeta(opts, service.getMetadata()));
return service;
}
@Override
public V1StatefulSet customizeStatefulSet(Function.FunctionDetails
funcDetails, V1StatefulSet statefulSet) {
- RuntimeOpts opts = getOptsFromDetails(funcDetails);
+ RuntimeOpts opts = mergeRuntimeOpts(runtimeOpts,
getOptsFromDetails(funcDetails));
statefulSet.setMetadata(updateMeta(opts, statefulSet.getMetadata()));
V1PodTemplateSpec pt = statefulSet.getSpec().getTemplate();
pt.setMetadata(updateMeta(opts, pt.getMetadata()));
V1PodSpec ps = pt.getSpec();
- if (opts.getNodeSelectorLabels() != null &&
opts.getNodeSelectorLabels().size() > 0) {
- opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
- }
- if (opts.getTolerations() != null && opts.getTolerations().size() > 0)
{
- opts.getTolerations().forEach(ps::addTolerationsItem);
+ if (ps != null) {
+ if (opts.getNodeSelectorLabels() != null &&
opts.getNodeSelectorLabels().size() > 0) {
+ opts.getNodeSelectorLabels().forEach(ps::putNodeSelectorItem);
+ }
+ if (opts.getTolerations() != null && opts.getTolerations().size()
> 0) {
+ opts.getTolerations().forEach(ps::addTolerationsItem);
+ }
+ ps.getContainers().forEach(container ->
updateContainerResources(container, opts));
}
- ps.getContainers().forEach(container ->
updateContainerResources(container, opts));
return statefulSet;
}
@@ -113,10 +138,10 @@ public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCust
Map<String, Quantity> limits = resourceRequirements.getLimits();
Map<String, Quantity> requests =
resourceRequirements.getRequests();
for (String resource : RESOURCES) {
- if (limits.containsKey(resource)) {
+ if (limits != null && limits.containsKey(resource)) {
containerResources.putLimitsItem(resource,
limits.get(resource));
}
- if (requests.containsKey(resource)) {
+ if (requests != null && requests.containsKey(resource)) {
containerResources.putRequestsItem(resource,
requests.get(resource));
}
}
@@ -143,4 +168,76 @@ public class BasicKubernetesManifestCustomizer implements
KubernetesManifestCust
return meta;
}
+ public static RuntimeOpts mergeRuntimeOpts(RuntimeOpts oriOpts,
RuntimeOpts newOpts) {
+ RuntimeOpts mergedOpts = oriOpts.toBuilder().build();
+ if (mergedOpts.getExtraLabels() == null) {
+ mergedOpts.setExtraLabels(new HashMap<>());
+ }
+ if (mergedOpts.getExtraAnnotations() == null) {
+ mergedOpts.setExtraAnnotations(new HashMap<>());
+ }
+ if (mergedOpts.getNodeSelectorLabels() == null) {
+ mergedOpts.setNodeSelectorLabels(new HashMap<>());
+ }
+ if (mergedOpts.getTolerations() == null) {
+ mergedOpts.setTolerations(new ArrayList<>());
+ }
+ if (mergedOpts.getResourceRequirements() == null) {
+ mergedOpts.setResourceRequirements(new V1ResourceRequirements());
+ }
+
+ if (!StringUtils.isEmpty(newOpts.getJobName())) {
+ mergedOpts.setJobName(newOpts.getJobName());
+ }
+ if (!StringUtils.isEmpty(newOpts.getJobNamespace())) {
+ mergedOpts.setJobNamespace(newOpts.getJobNamespace());
+ }
+ if (newOpts.getExtraLabels() != null &&
!newOpts.getExtraLabels().isEmpty()) {
+ newOpts.getExtraLabels().forEach((key, labelsItem) -> {
+ if (!mergedOpts.getExtraLabels().containsKey(key)) {
+ log.debug("extra label {} has been changed to {}", key,
labelsItem);
+ }
+ mergedOpts.getExtraLabels().put(key, labelsItem);
+ });
+ }
+ if (newOpts.getExtraAnnotations() != null &&
!newOpts.getExtraAnnotations().isEmpty()) {
+ newOpts.getExtraAnnotations().forEach((key, annotationsItem) -> {
+ if (!mergedOpts.getExtraAnnotations().containsKey(key)) {
+ log.debug("extra annotation {} has been changed to {}",
key, annotationsItem);
+ }
+ mergedOpts.getExtraAnnotations().put(key, annotationsItem);
+ });
+ }
+ if (newOpts.getNodeSelectorLabels() != null &&
!newOpts.getNodeSelectorLabels().isEmpty()) {
+ newOpts.getNodeSelectorLabels().forEach((key, nodeSelectorItem) ->
{
+ if (!mergedOpts.getNodeSelectorLabels().containsKey(key)) {
+ log.debug("node selector label {} has been changed to {}",
key, nodeSelectorItem);
+ }
+ mergedOpts.getNodeSelectorLabels().put(key, nodeSelectorItem);
+ });
+ }
+
+ if (newOpts.getResourceRequirements() != null) {
+ V1ResourceRequirements mergedResourcesRequirements =
mergedOpts.getResourceRequirements();
+ V1ResourceRequirements newResourcesRequirements =
newOpts.getResourceRequirements();
+
+ Map<String, Quantity> limits =
newResourcesRequirements.getLimits();
+ Map<String, Quantity> requests =
newResourcesRequirements.getRequests();
+ for (String resource : RESOURCES) {
+ if (limits != null && limits.containsKey(resource)) {
+ mergedResourcesRequirements.putLimitsItem(resource,
limits.get(resource));
+ }
+ if (requests != null && requests.containsKey(resource)) {
+ mergedResourcesRequirements.putRequestsItem(resource,
requests.get(resource));
+ }
+ }
+ mergedOpts.setResourceRequirements(mergedResourcesRequirements);
+ }
+
+ if (newOpts.getTolerations() != null &&
!newOpts.getTolerations().isEmpty()) {
+ mergedOpts.getTolerations().addAll(newOpts.getTolerations());
+ }
+ return mergedOpts;
+ }
+
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizerTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizerTest.java
new file mode 100644
index 0000000..1b38306
--- /dev/null
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.kubernetes;
+
+import com.google.gson.Gson;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
+
+/**
+ * Unit test of {@link BasicKubernetesManifestCustomizerTest}.
+ */
+public class BasicKubernetesManifestCustomizerTest {
+
+ @Test
+ public void TestInitializeWithNullData() {
+ BasicKubernetesManifestCustomizer customizer = new
BasicKubernetesManifestCustomizer();
+ customizer.initialize(null);
+ assertNotEquals(customizer.getRuntimeOpts(), null);
+ assertNull(customizer.getRuntimeOpts().getExtraLabels());
+ assertNull(customizer.getRuntimeOpts().getExtraAnnotations());
+ assertNull(customizer.getRuntimeOpts().getNodeSelectorLabels());
+ assertNull(customizer.getRuntimeOpts().getTolerations());
+ assertNull(customizer.getRuntimeOpts().getResourceRequirements());
+ }
+
+ @Test
+ public void TestInitializeWithData() {
+ BasicKubernetesManifestCustomizer customizer = new
BasicKubernetesManifestCustomizer();
+ Map<String, Object> confs = new HashMap<>();
+ confs.put("jobNamespace", "custom-ns");
+ confs.put("jobName", "custom-name");
+ customizer.initialize(confs);
+ assertNotEquals(customizer.getRuntimeOpts(), null);
+ assertEquals(customizer.getRuntimeOpts().getJobName(), "custom-name");
+ assertEquals(customizer.getRuntimeOpts().getJobNamespace(),
"custom-ns");
+ }
+
+ @Test
+ public void TestMergeRuntimeOpts() {
+ Map<String, Object> configs = new
Gson().fromJson(KubernetesRuntimeTest.createRuntimeCustomizerConfig(),
HashMap.class);
+ BasicKubernetesManifestCustomizer customizer = new
BasicKubernetesManifestCustomizer();
+ customizer.initialize(configs);
+ BasicKubernetesManifestCustomizer.RuntimeOpts newOpts = new
BasicKubernetesManifestCustomizer.RuntimeOpts();
+ newOpts.setJobName("merged-name");
+ newOpts.setTolerations(Collections.emptyList());
+ V1Toleration toleration = new V1Toleration();
+ toleration.setKey("merge-key");
+ toleration.setEffect("NoSchedule");
+ toleration.setOperator("Equal");
+ toleration.setTolerationSeconds(6000L);
+ newOpts.setTolerations(Collections.singletonList(toleration));
+ V1ResourceRequirements resourceRequirements = new
V1ResourceRequirements();
+ resourceRequirements.putLimitsItem("cpu", new Quantity("20"));
+ resourceRequirements.putLimitsItem("memory", new Quantity("10240"));
+ newOpts.setResourceRequirements(resourceRequirements);
+ newOpts.setNodeSelectorLabels(Collections.singletonMap("disktype",
"ssd"));
+ newOpts.setExtraAnnotations(Collections.singletonMap("functiontype",
"sink"));
+ newOpts.setExtraLabels(Collections.singletonMap("functiontype",
"sink"));
+ BasicKubernetesManifestCustomizer.RuntimeOpts mergedOpts =
BasicKubernetesManifestCustomizer.mergeRuntimeOpts(
+ customizer.getRuntimeOpts(), newOpts);
+
+ assertEquals(mergedOpts.getJobName(), "merged-name");
+ assertEquals(mergedOpts.getTolerations().size(), 2);
+ assertEquals(mergedOpts.getExtraAnnotations().size(), 2);
+ assertEquals(mergedOpts.getExtraLabels().size(), 2);
+ assertEquals(mergedOpts.getNodeSelectorLabels().size(), 2);
+
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("cpu").getNumber().intValue(),
20);
+
assertEquals(mergedOpts.getResourceRequirements().getLimits().get("memory").getNumber().intValue(),
10240);
+ }
+}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index a281187..dec1014 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -36,7 +36,6 @@ import
org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import
org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
@@ -58,7 +57,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
/**
- * Unit test of {@link ThreadRuntime}.
+ * Unit test of {@link KubernetesRuntimeFactoryTest}.
*/
public class KubernetesRuntimeFactoryTest {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index ebc4878..5c1c901 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.functions.runtime.kubernetes;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.protobuf.util.JsonFormat;
@@ -216,6 +217,8 @@ public class KubernetesRuntimeTest {
workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
workerConfig.setAuthenticationEnabled(false);
+ manifestCustomizer.ifPresent(runtimeCustomizer ->
runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
+
factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
return factory;
}
@@ -689,41 +692,7 @@ public class KubernetesRuntimeTest {
public void testBasicKubernetesManifestCustomizer() throws Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA,
false, (fb) -> {
- JsonObject configObj = new JsonObject();
- configObj.addProperty("jobNamespace", "custom-ns");
- configObj.addProperty("jobName", "custom-name");
-
- JsonObject extraAnn = new JsonObject();
- extraAnn.addProperty("annotation", "test");
- configObj.add("extraAnnotations", extraAnn);
-
- JsonObject extraLabel = new JsonObject();
- extraLabel.addProperty("label", "test");
- configObj.add("extraLabels", extraLabel);
-
- JsonObject nodeLabels = new JsonObject();
- nodeLabels.addProperty("selector", "test");
- configObj.add("nodeSelectorLabels", nodeLabels);
-
- JsonArray tolerations = new JsonArray();
- JsonObject toleration = new JsonObject();
- toleration.addProperty("key", "test");
- toleration.addProperty("value", "test");
- toleration.addProperty("effect", "test");
- tolerations.add(toleration);
- configObj.add("tolerations", tolerations);
-
- JsonObject resourceRequirements = new JsonObject();
- JsonObject requests = new JsonObject();
- JsonObject limits = new JsonObject();
- requests.addProperty("cpu", 1);
- requests.addProperty("memory", "4G");
- limits.addProperty("cpu", 2);
- limits.addProperty("memory", "8G");
- resourceRequirements.add("requests", requests);
- resourceRequirements.add("limits", limits);
- configObj.add("resourceRequirements", resourceRequirements);
-
+ JsonObject configObj = createRuntimeCustomizerConfig();
return fb.setCustomRuntimeOptions(configObj.toString());
}));
@@ -893,4 +862,182 @@ public class KubernetesRuntimeTest {
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
}
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String
extraDepsDir, int percentMemoryPadding,
+ double
cpuOverCommitRatio, double memoryOverCommitRatio,
+ String
manifestCustomizerClassName,
+ Map<String,
Object> runtimeCustomizerConfig) throws Exception {
+ KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+ doNothing().when(factory).setupClient();
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new
KubernetesRuntimeFactoryConfig();
+ kubernetesRuntimeFactoryConfig.setK8Uri(null);
+ kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+ kubernetesRuntimeFactoryConfig.setJobName(null);
+ kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+ kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
+ kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+ kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+ kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+ kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+ kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+
kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+
kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+ kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+
kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(percentMemoryPadding);
+
kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(cpuOverCommitRatio);
+
kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(memoryOverCommitRatio);
+ kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+ kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+ kubernetesRuntimeFactoryConfig.setGrpcPort(4332);
+ kubernetesRuntimeFactoryConfig.setMetricsPort(4331);
+
kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory);
+
workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+
ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig,
Map.class));
+ workerConfig.setFunctionInstanceMinResources(null);
+ workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+ workerConfig.setAuthenticationEnabled(false);
+ workerConfig.setRuntimeCustomizerConfig(runtimeCustomizerConfig);
+
workerConfig.setRuntimeCustomizerClassName(manifestCustomizerClassName);
+
+ Optional<RuntimeCustomizer> manifestCustomizer = Optional.empty();
+ if
(!org.apache.commons.lang3.StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName()))
{
+ manifestCustomizer =
Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
+
manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
+ }
+
+ factory.initialize(workerConfig, null, new
TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+ return factory;
+ }
+
+ public static JsonObject createRuntimeCustomizerConfig() {
+ JsonObject configObj = new JsonObject();
+ configObj.addProperty("jobNamespace", "custom-ns");
+ configObj.addProperty("jobName", "custom-name");
+
+ JsonObject extraAnn = new JsonObject();
+ extraAnn.addProperty("annotation", "test");
+ configObj.add("extraAnnotations", extraAnn);
+
+ JsonObject extraLabel = new JsonObject();
+ extraLabel.addProperty("label", "test");
+ configObj.add("extraLabels", extraLabel);
+
+ JsonObject nodeLabels = new JsonObject();
+ nodeLabels.addProperty("selector", "test");
+ configObj.add("nodeSelectorLabels", nodeLabels);
+
+ JsonArray tolerations = new JsonArray();
+ JsonObject toleration = new JsonObject();
+ toleration.addProperty("key", "test");
+ toleration.addProperty("value", "test");
+ toleration.addProperty("effect", "test");
+ tolerations.add(toleration);
+ configObj.add("tolerations", tolerations);
+
+ JsonObject resourceRequirements = new JsonObject();
+ JsonObject requests = new JsonObject();
+ JsonObject limits = new JsonObject();
+ requests.addProperty("cpu", "1");
+ requests.addProperty("memory", "4G");
+ limits.addProperty("cpu", "2");
+ limits.addProperty("memory", "8G");
+ resourceRequirements.add("requests", requests);
+ resourceRequirements.add("limits", limits);
+ configObj.add("resourceRequirements", resourceRequirements);
+ return configObj;
+ }
+
+ @Test
+ public void
testBasicKubernetesManifestCustomizerWithRuntimeCustomizerConfig() throws
Exception {
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+ Map<String, Object> configs = new
Gson().fromJson(createRuntimeCustomizerConfig(), HashMap.class);
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+
"org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer",
configs);
+
+ verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
+ KubernetesRuntime container = factory.createContainer(config,
userJarFile, userJarFile, 30l);
+ V1StatefulSet spec = container.createStatefulSet();
+ assertEquals(spec.getMetadata().getAnnotations().get("annotation"),
"test");
+ assertEquals(spec.getMetadata().getLabels().get("label"), "test");
+
assertEquals(spec.getSpec().getTemplate().getSpec().getNodeSelector().get("selector"),
"test");
+ List<V1Toleration> tols =
spec.getSpec().getTemplate().getSpec().getTolerations();
+ // we add three by default, plus our custom
+ assertEquals(tols.size(), 4);
+ assertEquals(tols.get(3).getKey(), "test");
+ assertEquals(tols.get(3).getValue(), "test");
+ assertEquals(tols.get(3).getEffect(), "test");
+
+ V1Service serviceSpec = container.createService();
+ assertEquals(serviceSpec.getMetadata().getNamespace(), "custom-ns");
+ assertEquals(serviceSpec.getMetadata().getName(),
"custom-name-2deb2c2b");
+
assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"),
"test");
+ assertEquals(serviceSpec.getMetadata().getLabels().get("label"),
"test");
+
+ List<V1Container> containers =
spec.getSpec().getTemplate().getSpec().getContainers();
+ containers.forEach(c -> {
+ V1ResourceRequirements resources = c.getResources();
+ Map<String, Quantity> limits = resources.getLimits();
+ Map<String, Quantity> requests = resources.getRequests();
+ assertEquals(requests.get("cpu").getNumber(), new BigDecimal(1) );
+ assertEquals(limits.get("cpu").getNumber(), new BigDecimal(2) );
+ assertEquals(requests.get("memory").getNumber(), new
BigDecimal(4000000000L) );
+ assertEquals(limits.get("memory").getNumber(), new
BigDecimal(8000000000L) );
+ });
+
+ }
+
+
+ @Test
+ public void
testBasicKubernetesManifestCustomizerWithRuntimeCustomizerConfigOverwrite()
throws Exception {
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA,
false, (fb) -> {
+ JsonObject configObj = new JsonObject();
+ configObj.addProperty("jobNamespace", "custom-ns-overwrite");
+ configObj.addProperty("jobName", "custom-name-overwrite");
+ return fb.setCustomRuntimeOptions(configObj.toString());
+ }));
+
+ Map<String, Object> configs = new
Gson().fromJson(createRuntimeCustomizerConfig(), HashMap.class);
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+
"org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer",
configs);
+
+ verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
+ KubernetesRuntime container = factory.createContainer(config,
userJarFile, userJarFile, 30l);
+ V1StatefulSet spec = container.createStatefulSet();
+ assertEquals(spec.getMetadata().getAnnotations().get("annotation"),
"test");
+ assertEquals(spec.getMetadata().getLabels().get("label"), "test");
+
assertEquals(spec.getSpec().getTemplate().getSpec().getNodeSelector().get("selector"),
"test");
+ List<V1Toleration> tols =
spec.getSpec().getTemplate().getSpec().getTolerations();
+ // we add three by default, plus our custom
+ assertEquals(tols.size(), 4);
+ assertEquals(tols.get(3).getKey(), "test");
+ assertEquals(tols.get(3).getValue(), "test");
+ assertEquals(tols.get(3).getEffect(), "test");
+
+ V1Service serviceSpec = container.createService();
+ assertEquals(serviceSpec.getMetadata().getNamespace(),
"custom-ns-overwrite");
+ assertEquals(serviceSpec.getMetadata().getName(),
"custom-name-overwrite-7757f1ff");
+
assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"),
"test");
+ assertEquals(serviceSpec.getMetadata().getLabels().get("label"),
"test");
+
+ List<V1Container> containers =
spec.getSpec().getTemplate().getSpec().getContainers();
+ containers.forEach(c -> {
+ V1ResourceRequirements resources = c.getResources();
+ Map<String, Quantity> limits = resources.getLimits();
+ Map<String, Quantity> requests = resources.getRequests();
+ assertEquals(requests.get("cpu").getNumber(), new BigDecimal(1) );
+ assertEquals(limits.get("cpu").getNumber(), new BigDecimal(2) );
+ assertEquals(requests.get("memory").getNumber(), new
BigDecimal(4000000000L) );
+ assertEquals(limits.get("memory").getNumber(), new
BigDecimal(8000000000L) );
+ });
+
+ }
}