This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9340d6e2430 [fix][fn] Make KubernetesRuntime translate characters in
function tenant, namespace, and name during function removal to avoid label
errors (#19584)
9340d6e2430 is described below
commit 9340d6e24302a5c5c177955a8cf2a585739e5558
Author: csthomas1 <[email protected]>
AuthorDate: Tue Jun 13 00:35:49 2023 -0400
[fix][fn] Make KubernetesRuntime translate characters in function tenant,
namespace, and name during function removal to avoid label errors (#19584)
Co-authored-by: tison <[email protected]>
---
.../runtime/kubernetes/KubernetesRuntime.java | 7 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 86 +++++++++++++++++++---
2 files changed, 78 insertions(+), 15 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index d0e36ecb48c..72c72cf164e 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -684,10 +684,11 @@ public class KubernetesRuntime implements Runtime {
.numRetries(KubernetesRuntimeFactory.numRetries * 2)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs * 2)
.supplier(() -> {
+ Map<String, String> validLabels =
getLabels(instanceConfig.getFunctionDetails());
String labels =
String.format("tenant=%s,namespace=%s,name=%s",
- instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName());
+ validLabels.get("tenant"),
+ validLabels.get("namespace"),
+ validLabels.get("name"));
V1PodList response;
try {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 31f82adfe8c..7fa279bc1d2 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -24,16 +24,32 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.protobuf.util.JsonFormat;
import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1Toleration;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import okhttp3.Call;
+import okhttp3.Response;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.JavaVersion;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -49,27 +65,26 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.ArgumentMatcher;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
import static
org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
@@ -1290,4 +1305,51 @@ public class KubernetesRuntimeTest {
.contains("--metrics_port 0"));
}
}
+
+ @Test
+ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars()
throws Exception {
+ InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA,
false,
+ (fb) ->
fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn")));
+
+ CoreV1Api coreApi = mock(CoreV1Api.class);
+ AppsV1Api appsApi = mock(AppsV1Api.class);
+
+ Call successfulCall = mock(Call.class);
+ Response okResponse = mock(Response.class);
+ when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
+ when(okResponse.isSuccessful()).thenReturn(true);
+ when(okResponse.message()).thenReturn("");
+ when(successfulCall.execute()).thenReturn(okResponse);
+
+ final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s",
"c-tenant", "c-ns", "c-fn");
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+ factory.setCoreClient(coreApi);
+ factory.setAppsClient(appsApi);
+
+ ArgumentMatcher<String> hasTranslatedFunctionName = (String t) ->
t.startsWith(expectedFunctionNamePrefix);
+
+ when(appsApi.deleteNamespacedStatefulSetCall(
+ argThat(hasTranslatedFunctionName),
+ anyString(), isNull(), isNull(), anyInt(), isNull(),
anyString(), any(), isNull())).thenReturn(successfulCall);
+
+ ApiException notFoundException = mock(ApiException.class);
+
when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND);
+ when(appsApi.readNamespacedStatefulSet(
+ argThat(hasTranslatedFunctionName), anyString(),
isNull())).thenThrow(notFoundException);
+
+ V1PodList podList = mock(V1PodList.class);
+ when(podList.getItems()).thenReturn(Collections.emptyList());
+
+ String expectedLabels =
String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");
+
+ when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(),
isNull(), isNull(),
+ eq(expectedLabels), isNull(), isNull(), isNull(), isNull(),
isNull())).thenReturn(podList);
+ KubernetesRuntime kr = factory.createContainer(config, "/test/code",
"code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE);
+ kr.deleteStatefulSet();
+
+ verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(),
isNull(), isNull(),
+ eq(expectedLabels), isNull(), isNull(), isNull(), isNull(),
isNull());
+ }
}