This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 3a3c6f3645e [fix][fn] Make KubernetesRuntime translate characters in
function tenant, namespace, and name during function removal to avoid label
errors (#19584)
3a3c6f3645e is described below
commit 3a3c6f3645e2c18df47f11dd7a4e7ab932ba7d26
Author: csthomas1 <[email protected]>
AuthorDate: Tue Jun 13 00:35:49 2023 -0400
[fix][fn] Make KubernetesRuntime translate characters in function tenant,
namespace, and name during function removal to avoid label errors (#19584)
Co-authored-by: tison <[email protected]>
(cherry picked from commit 9340d6e24302a5c5c177955a8cf2a585739e5558)
---
.../runtime/kubernetes/KubernetesRuntime.java | 7 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 106 ++++++++++++++++-----
2 files changed, 88 insertions(+), 25 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index d79502d6757..0380ec8594f 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -660,10 +660,11 @@ public class KubernetesRuntime implements Runtime {
.numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS *
2)
.supplier(() -> {
+ Map<String, String> validLabels =
getLabels(instanceConfig.getFunctionDetails());
String labels =
String.format("tenant=%s,namespace=%s,name=%s",
- instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName());
+ validLabels.get("tenant"),
+ validLabels.get("namespace"),
+ validLabels.get("name"));
V1PodList response;
try {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 75bce227c3e..39ec1fa0e18 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -27,13 +27,26 @@ import com.google.protobuf.util.JsonFormat;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1Toleration;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import okhttp3.Call;
+import okhttp3.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -49,25 +62,26 @@ import
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
import static
org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -182,7 +196,7 @@ public class KubernetesRuntimeTest {
public void setup() {
System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
}
-
+
@AfterMethod(alwaysRun = true)
public void tearDown() {
if (null != this.factory) {
@@ -548,9 +562,9 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(),
RESOURCES.getCpu());
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
RESOURCES.getCpu());
}
-
+
@Test
- public void testCreateJobName() throws Exception {
+ public void testCreateJobName() throws Exception {
verifyCreateJobNameWithBackwardCompatibility();
verifyCreateJobNameWithUpperCaseFunctionName();
verifyCreateJobNameWithDotFunctionName();
@@ -652,28 +666,28 @@ public class KubernetesRuntimeTest {
assertEquals(jobName,
"pf-tenant-namespace-test-function-name-b5a215ad");
KubernetesRuntime.doChecks(functionDetails, null);
}
-
+
private void verifyCreateJobNameWithOverriddenK8sPodName() throws
Exception {
final FunctionDetails functionDetails =
createFunctionDetails("clazz.testfunction");
final String jobName =
KubernetesRuntime.createJobName(functionDetails, "custom-k8s-pod-name");
assertEquals(jobName, "custom-k8s-pod-name-dedfc7cf");
KubernetesRuntime.doChecks(functionDetails, "custom-k8s-pod-name");
}
-
+
private void verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks()
throws Exception {
final FunctionDetails functionDetails =
createFunctionDetails("clazz.testfunction");
final String jobName =
KubernetesRuntime.createJobName(functionDetails, "invalid_pod*name");
assertEquals(jobName, "invalid-pod-name-af8c3a6c");
KubernetesRuntime.doChecks(functionDetails, "invalid_pod*name");
}
-
+
private void
verifyCreateJobNameWithOverriddenK8sPodNameNoCollisionWithSameName() throws
Exception {
final String CUSTOM_JOB_NAME = "custom-name";
final String FUNCTION_NAME = "clazz.testfunction";
-
+
final FunctionDetails functionDetails1 =
createFunctionDetails(FUNCTION_NAME);
final String jobName1 =
KubernetesRuntime.createJobName(functionDetails1, CUSTOM_JOB_NAME);
-
+
// create a second function with the same name, but in different
tenant/namespace to make sure collision does not
// happen. If tenant, namespace, and function name are the same
kubernetes handles collision issues
FunctionDetails.Builder functionDetailsBuilder =
FunctionDetails.newBuilder();
@@ -683,7 +697,7 @@ public class KubernetesRuntimeTest {
functionDetailsBuilder.setName(FUNCTION_NAME);
final FunctionDetails functionDetails2 =
functionDetailsBuilder.build();
final String jobName2 =
KubernetesRuntime.createJobName(functionDetails2, CUSTOM_JOB_NAME);
-
+
// create a third function with different name but in same
tenant/namespace to make sure
// collision does not happen. If tenant, namespace, and function name
are the same kubernetes handles collision issues
final FunctionDetails functionDetails3 =
createFunctionDetails(FUNCTION_NAME + "-extra");
@@ -694,14 +708,14 @@ public class KubernetesRuntimeTest {
assertEquals(jobName2, CUSTOM_JOB_NAME + "-c66edfe1");
KubernetesRuntime.doChecks(functionDetails2, CUSTOM_JOB_NAME);
-
+
assertEquals(jobName3, CUSTOM_JOB_NAME + "-0fc9c728");
KubernetesRuntime.doChecks(functionDetails3, CUSTOM_JOB_NAME);
}
-
+
private void verifyCreateJobNameWithNameOverMaxCharLimit() throws
Exception {
final FunctionDetails functionDetails =
createFunctionDetails("clazz.testfunction");
- assertThrows(RuntimeException.class, () ->
KubernetesRuntime.doChecks(functionDetails,
+ assertThrows(RuntimeException.class, () ->
KubernetesRuntime.doChecks(functionDetails,
"custom-k8s-pod-name-over-kuberenetes-max-character-limit-123456789"));
}
@@ -743,7 +757,7 @@ public class KubernetesRuntimeTest {
V1Service serviceSpec = container.createService();
assertEquals(serviceSpec.getMetadata().getNamespace(), "default");
- assertEquals(serviceSpec.getMetadata().getName(), "pf-" + TEST_TENANT
+ "-" +
+ assertEquals(serviceSpec.getMetadata().getName(), "pf-" + TEST_TENANT
+ "-" +
TEST_NAMESPACE + "-" + TEST_NAME);
}
@@ -1216,4 +1230,52 @@ public class KubernetesRuntimeTest {
.contains("--metrics_port 0"));
}
}
+
+ @Test
+ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars()
throws Exception {
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA,
false,
+ (fb) ->
fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn")));
+
+ CoreV1Api coreApi = mock(CoreV1Api.class);
+ AppsV1Api appsApi = mock(AppsV1Api.class);
+
+ Call successfulCall = mock(Call.class);
+ Response okResponse = mock(Response.class);
+ when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
+ when(okResponse.isSuccessful()).thenReturn(true);
+ when(okResponse.message()).thenReturn("");
+ when(successfulCall.execute()).thenReturn(okResponse);
+
+ final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s",
"c-tenant", "c-ns", "c-fn");
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+ factory.setCoreClient(coreApi);
+ factory.setAppsClient(appsApi);
+
+ ArgumentMatcher<String> hasTranslatedFunctionName = (String t) ->
t.startsWith(expectedFunctionNamePrefix);
+
+ when(appsApi.deleteNamespacedStatefulSetCall(
+ argThat(hasTranslatedFunctionName),
+ anyString(), isNull(), isNull(), anyInt(), isNull(),
anyString(), any(), isNull())).thenReturn(successfulCall);
+
+ ApiException notFoundException = mock(ApiException.class);
+
when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND);
+ when(appsApi.readNamespacedStatefulSet(
+ argThat(hasTranslatedFunctionName), anyString(),
isNull())).thenThrow(notFoundException);
+
+ V1PodList podList = mock(V1PodList.class);
+ when(podList.getItems()).thenReturn(Collections.emptyList());
+
+ String expectedLabels =
String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");
+
+ when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(),
isNull(), isNull(),
+ eq(expectedLabels), isNull(), isNull(), isNull(), isNull(),
isNull())).thenReturn(podList);
+ KubernetesRuntime kr = factory.createContainer(config, "/test/code",
"code.yml",
+ Long.MIN_VALUE);
+ kr.deleteStatefulSet();
+
+ verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(),
isNull(), isNull(),
+ eq(expectedLabels), isNull(), isNull(), isNull(), isNull(),
isNull());
+ }
}