This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 3a3c6f3645e [fix][fn] Make KubernetesRuntime translate characters in 
function tenant, namespace, and name during function removal to avoid label 
errors (#19584)
3a3c6f3645e is described below

commit 3a3c6f3645e2c18df47f11dd7a4e7ab932ba7d26
Author: csthomas1 <[email protected]>
AuthorDate: Tue Jun 13 00:35:49 2023 -0400

    [fix][fn] Make KubernetesRuntime translate characters in function tenant, 
namespace, and name during function removal to avoid label errors (#19584)
    
    Co-authored-by: tison <[email protected]>
    (cherry picked from commit 9340d6e24302a5c5c177955a8cf2a585739e5558)
---
 .../runtime/kubernetes/KubernetesRuntime.java      |   7 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 106 ++++++++++++++++-----
 2 files changed, 88 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index d79502d6757..0380ec8594f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -660,10 +660,11 @@ public class KubernetesRuntime implements Runtime {
                 .numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
                 
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS * 
2)
                 .supplier(() -> {
+                    Map<String, String> validLabels = 
getLabels(instanceConfig.getFunctionDetails());
                     String labels = 
String.format("tenant=%s,namespace=%s,name=%s",
-                            instanceConfig.getFunctionDetails().getTenant(),
-                            instanceConfig.getFunctionDetails().getNamespace(),
-                            instanceConfig.getFunctionDetails().getName());
+                            validLabels.get("tenant"),
+                            validLabels.get("namespace"),
+                            validLabels.get("name"));
 
                     V1PodList response;
                     try {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 75bce227c3e..39ec1fa0e18 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -27,13 +27,26 @@ import com.google.protobuf.util.JsonFormat;
 import io.kubernetes.client.openapi.apis.AppsV1Api;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1PodList;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1Toleration;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import okhttp3.Call;
+import okhttp3.Response;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -49,25 +62,26 @@ import 
org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
 import static 
org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
 import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
@@ -182,7 +196,7 @@ public class KubernetesRuntimeTest {
     public void setup() {
         System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
     }
-    
+
     @AfterMethod(alwaysRun = true)
     public void tearDown() {
         if (null != this.factory) {
@@ -548,9 +562,9 @@ public class KubernetesRuntimeTest {
         
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(),
 RESOURCES.getCpu());
         
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(),
 RESOURCES.getCpu());
     }
-    
+
     @Test
-    public void testCreateJobName() throws Exception {    
+    public void testCreateJobName() throws Exception {
         verifyCreateJobNameWithBackwardCompatibility();
         verifyCreateJobNameWithUpperCaseFunctionName();
         verifyCreateJobNameWithDotFunctionName();
@@ -652,28 +666,28 @@ public class KubernetesRuntimeTest {
         assertEquals(jobName, 
"pf-tenant-namespace-test-function-name-b5a215ad");
         KubernetesRuntime.doChecks(functionDetails, null);
     }
-    
+
     private void verifyCreateJobNameWithOverriddenK8sPodName() throws 
Exception {
         final FunctionDetails functionDetails = 
createFunctionDetails("clazz.testfunction");
         final String jobName = 
KubernetesRuntime.createJobName(functionDetails, "custom-k8s-pod-name");
         assertEquals(jobName, "custom-k8s-pod-name-dedfc7cf");
         KubernetesRuntime.doChecks(functionDetails, "custom-k8s-pod-name");
     }
-    
+
     private void verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks() 
throws Exception {
         final FunctionDetails functionDetails = 
createFunctionDetails("clazz.testfunction");
         final String jobName = 
KubernetesRuntime.createJobName(functionDetails, "invalid_pod*name");
         assertEquals(jobName, "invalid-pod-name-af8c3a6c");
         KubernetesRuntime.doChecks(functionDetails, "invalid_pod*name");
     }
-    
+
     private void 
verifyCreateJobNameWithOverriddenK8sPodNameNoCollisionWithSameName() throws 
Exception {
         final String CUSTOM_JOB_NAME = "custom-name";
         final String FUNCTION_NAME = "clazz.testfunction";
-        
+
        final FunctionDetails functionDetails1 = 
createFunctionDetails(FUNCTION_NAME);
         final String jobName1 = 
KubernetesRuntime.createJobName(functionDetails1, CUSTOM_JOB_NAME);
-        
+
         // create a second function with the same name, but in different 
tenant/namespace to make sure collision does not
         // happen. If tenant, namespace, and function name are the same 
kubernetes handles collision issues
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
@@ -683,7 +697,7 @@ public class KubernetesRuntimeTest {
         functionDetailsBuilder.setName(FUNCTION_NAME);
         final FunctionDetails functionDetails2 = 
functionDetailsBuilder.build();
         final String jobName2 = 
KubernetesRuntime.createJobName(functionDetails2, CUSTOM_JOB_NAME);
-        
+
         // create a third function with different name but in same 
tenant/namespace to make sure
         // collision does not happen. If tenant, namespace, and function name 
are the same kubernetes handles collision issues
         final FunctionDetails functionDetails3 = 
createFunctionDetails(FUNCTION_NAME + "-extra");
@@ -694,14 +708,14 @@ public class KubernetesRuntimeTest {
 
         assertEquals(jobName2, CUSTOM_JOB_NAME + "-c66edfe1");
         KubernetesRuntime.doChecks(functionDetails2, CUSTOM_JOB_NAME);
-        
+
         assertEquals(jobName3, CUSTOM_JOB_NAME + "-0fc9c728");
         KubernetesRuntime.doChecks(functionDetails3, CUSTOM_JOB_NAME);
     }
-    
+
     private void verifyCreateJobNameWithNameOverMaxCharLimit() throws 
Exception {
         final FunctionDetails functionDetails = 
createFunctionDetails("clazz.testfunction");
-        assertThrows(RuntimeException.class, () -> 
KubernetesRuntime.doChecks(functionDetails, 
+        assertThrows(RuntimeException.class, () -> 
KubernetesRuntime.doChecks(functionDetails,
                        
"custom-k8s-pod-name-over-kuberenetes-max-character-limit-123456789"));
     }
 
@@ -743,7 +757,7 @@ public class KubernetesRuntimeTest {
 
         V1Service serviceSpec = container.createService();
         assertEquals(serviceSpec.getMetadata().getNamespace(), "default");
-        assertEquals(serviceSpec.getMetadata().getName(), "pf-" + TEST_TENANT 
+ "-" + 
+        assertEquals(serviceSpec.getMetadata().getName(), "pf-" + TEST_TENANT 
+ "-" +
                        TEST_NAMESPACE + "-" + TEST_NAME);
     }
 
@@ -1216,4 +1230,52 @@ public class KubernetesRuntimeTest {
                     .contains("--metrics_port 0"));
         }
     }
+
+    @Test
+    public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() 
throws Exception {
+        InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+        
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, 
false,
+                (fb) -> 
fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn")));
+
+        CoreV1Api coreApi = mock(CoreV1Api.class);
+        AppsV1Api appsApi = mock(AppsV1Api.class);
+
+        Call successfulCall = mock(Call.class);
+        Response okResponse = mock(Response.class);
+        when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
+        when(okResponse.isSuccessful()).thenReturn(true);
+        when(okResponse.message()).thenReturn("");
+        when(successfulCall.execute()).thenReturn(okResponse);
+
+        final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", 
"c-tenant", "c-ns", "c-fn");
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+        factory.setCoreClient(coreApi);
+        factory.setAppsClient(appsApi);
+
+        ArgumentMatcher<String> hasTranslatedFunctionName = (String t) -> 
t.startsWith(expectedFunctionNamePrefix);
+
+        when(appsApi.deleteNamespacedStatefulSetCall(
+                argThat(hasTranslatedFunctionName),
+                anyString(), isNull(), isNull(), anyInt(), isNull(), 
anyString(), any(), isNull())).thenReturn(successfulCall);
+
+        ApiException notFoundException = mock(ApiException.class);
+        
when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND);
+        when(appsApi.readNamespacedStatefulSet(
+                argThat(hasTranslatedFunctionName), anyString(), 
isNull())).thenThrow(notFoundException);
+
+        V1PodList podList = mock(V1PodList.class);
+        when(podList.getItems()).thenReturn(Collections.emptyList());
+
+        String expectedLabels = 
String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");
+
+        when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), 
isNull(), isNull(),
+                eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), 
isNull())).thenReturn(podList);
+        KubernetesRuntime kr = factory.createContainer(config, "/test/code", 
"code.yml",
+                Long.MIN_VALUE);
+        kr.deleteStatefulSet();
+
+        verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), 
isNull(), isNull(),
+                eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), 
isNull());
+    }
 }

Reply via email to