This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e231d22 [k8s] convert to valid pod name part with k8s function
runtime (#4996)
e231d22 is described below
commit e231d22e5b04266348b5146c869a0b019a71725a
Author: Rui Fu <[email protected]>
AuthorDate: Fri Aug 30 05:43:50 2019 +0800
[k8s] convert to valid pod name part with k8s function runtime (#4996)
### Motivation
k8s runtime use tenant, namespace, functionName to create k8s pod name, but
functionName might not meet the k8s pod name rule.
`"[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*"`, which
will cause pod create failed.
### Modifications
This PR create a new function called `toValidPodName` which convert a
string to valid pod name part, including convert to lower case and replace all
non-char part to "-".
---
.../functions/runtime/KubernetesRuntime.java | 17 +++-
.../functions/runtime/KubernetesRuntimeTest.java | 97 ++++++++++++++++++++++
2 files changed, 112 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 4f9b77a..a5c6d1a 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -52,6 +52,7 @@ import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -1015,14 +1016,26 @@ public class KubernetesRuntime implements Runtime {
return ports;
}
- private static String createJobName(Function.FunctionDetails
functionDetails) {
+ public static String createJobName(Function.FunctionDetails
functionDetails) {
return createJobName(functionDetails.getTenant(),
functionDetails.getNamespace(),
functionDetails.getName());
}
+ private static String toValidPodName(String ori) {
+ return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
+ }
+
private static String createJobName(String tenant, String namespace,
String functionName) {
- return "pf-" + tenant + "-" + namespace + "-" + functionName;
+ final String jobNameContent = String.format("%s-%s-%s", tenant,
namespace,functionName);
+ final String jobName = "pf-" + jobNameContent;
+ final String convertedJobName = toValidPodName(jobName);
+ if (jobName.equals(convertedJobName)) {
+ return jobName;
+ }
+ // toValidPodName may cause naming collisions, add a short hash here
to avoid it
+ final String shortHash =
DigestUtils.sha1Hex(jobNameContent).toLowerCase().substring(0, 8);
+ return convertedJobName + "-" + shortHash;
}
private static String getServiceUrl(String jobName, String jobNamespace,
int instanceId) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index e0214b2..812c003 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -49,6 +49,7 @@ import static
org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
/**
* Unit test of {@link ThreadRuntime}.
@@ -448,4 +449,100 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
RESOURCES.getCpu());
}
+ @Test
+ public void testCreateJobName() throws Exception {
+ verifyCreateJobNameWithBackwardCompatibility();
+ verifyCreateJobNameWithUpperCaseFunctionName();
+ verifyCreateJobNameWithDotFunctionName();
+ verifyCreateJobNameWithDotAndUpperCaseFunctionName();
+ verifyCreateJobNameWithInvalidMarksFunctionName();
+ verifyCreateJobNameWithCollisionalFunctionName();
+ verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName();
+ }
+
+ FunctionDetails createFunctionDetails(final String functionName) {
+ FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
+ functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+ functionDetailsBuilder.setTenant(TEST_TENANT);
+ functionDetailsBuilder.setNamespace(TEST_NAMESPACE);
+ functionDetailsBuilder.setName(functionName);
+
functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
+ functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
+ .setTopic(TEST_NAME + "-output")
+
.setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+ .setClassName("org.pulsar.pulsar.TestSink")
+ .setTypeClassName(String.class.getName())
+ .build());
+ functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+ functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
+ .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+ .putAllInputSpecs(topicsToSchema)
+ .setClassName("org.pulsar.pulsar.TestSource")
+ .setTypeClassName(String.class.getName()));
+ functionDetailsBuilder.setSecretsMap("SomeMap");
+ functionDetailsBuilder.setResources(RESOURCES);
+ return functionDetailsBuilder.build();
+ }
+
+ // used for backward compatibility test
+ private String bcCreateJobName(String tenant, String namespace, String
functionName) {
+ return "pf-" + tenant + "-" + namespace + "-" + functionName;
+ }
+
+ private void verifyCreateJobNameWithBackwardCompatibility() throws
Exception {
+ final FunctionDetails functionDetails =
createFunctionDetails(TEST_NAME);
+ final String bcJobName = bcCreateJobName(functionDetails.getTenant(),
functionDetails.getNamespace(), functionDetails.getName());
+ final String jobName =
KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(bcJobName, jobName);
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithUpperCaseFunctionName() throws
Exception {
+ FunctionDetails functionDetails =
createFunctionDetails("UpperCaseFunction");
+ final String jobName =
KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName,
"pf-tenant-namespace-uppercasefunction-f0c5ca9a");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithDotFunctionName() throws Exception {
+ final FunctionDetails functionDetails =
createFunctionDetails("clazz.testfunction");
+ final String jobName =
KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithDotAndUpperCaseFunctionName() throws
Exception {
+ final FunctionDetails functionDetails =
createFunctionDetails("Clazz.TestFunction");
+ final String jobName =
KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName,
"pf-tenant-namespace-clazz.testfunction-92ec5bf6");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithInvalidMarksFunctionName() throws
Exception {
+ final FunctionDetails functionDetails =
createFunctionDetails("test_function*name");
+ final String jobName =
KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName,
"pf-tenant-namespace-test-function-name-b5a215ad");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithCollisionalFunctionName() throws
Exception {
+ final FunctionDetails functionDetail1 =
createFunctionDetails("testfunction");
+ final FunctionDetails functionDetail2 =
createFunctionDetails("testFunction");
+ final String jobName1 =
KubernetesRuntime.createJobName(functionDetail1);
+ final String jobName2 =
KubernetesRuntime.createJobName(functionDetail2);
+ assertNotEquals(jobName1, jobName2);
+ KubernetesRuntime.doChecks(functionDetail1);
+ KubernetesRuntime.doChecks(functionDetail2);
+ }
+
+ private void
verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName() throws
Exception {
+ final FunctionDetails functionDetail1 =
createFunctionDetails("test_function*name");
+ final FunctionDetails functionDetail2 =
createFunctionDetails("test+function*name");
+ final String jobName1 =
KubernetesRuntime.createJobName(functionDetail1);
+ final String jobName2 =
KubernetesRuntime.createJobName(functionDetail2);
+ assertNotEquals(jobName1, jobName2);
+ KubernetesRuntime.doChecks(functionDetail1);
+ KubernetesRuntime.doChecks(functionDetail2);
+ }
+
}