This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 73a0258276fdf6bc67571ec8d32eeb3e5845c313 Author: iantowey <[email protected]> AuthorDate: Wed Jun 3 21:00:08 2026 +0100 [improve][functions] Allow customizing Kubernetes service domain suffix in Function Worker (#25872) Co-authored-by: Ian <[email protected]> (cherry picked from commit cc9fddcd02939ba6c735d88a2bfe702a22fb2ff4) --- conf/functions_worker.yml | 3 +++ .../runtime/kubernetes/KubernetesRuntime.java | 12 ++++++----- .../kubernetes/KubernetesRuntimeFactory.java | 4 +++- .../kubernetes/KubernetesRuntimeFactoryConfig.java | 7 ++++++- .../runtime/kubernetes/KubernetesRuntimeTest.java | 23 ++++++++++++++++++++++ 5 files changed, 42 insertions(+), 7 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 6f995576ebd..794cfba1d51 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -198,6 +198,9 @@ functionRuntimeFactoryConfigs: # # The Kubernetes pod name to run the function instances. It is set to # # `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty # jobName: +# # Optional domain suffix to use when the Function Worker constructs the gRPC address to connect to function instances. +# # If left blank, it defaults to `.svc.cluster.local`. Set this if your Function Worker is outside the cluster and connects via an external Gateway/Ingress. +# kubernetesServiceDomainSuffix: # # the docker image to run function instance. by default it is `apachepulsar/pulsar` # pulsarDockerImageName: # # the docker image to run function instance according to different configurations provided by users. diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 7a69b822cbd..3e460f7b97a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -153,11 +153,13 @@ public class KubernetesRuntime implements Runtime { private final Optional<KubernetesManifestCustomizer> manifestCustomizer; private String functionInstanceClassPath; private String downloadDirectory; + private final String kubernetesServiceDomainSuffix; KubernetesRuntime(AppsV1Api appsClient, CoreV1Api coreClient, String jobNamespace, String jobName, + String kubernetesServiceDomainSuffix, Map<String, String> customLabels, Boolean installUserCodeDependencies, String pythonDependencyRepository, @@ -196,6 +198,7 @@ public class KubernetesRuntime implements Runtime { this.instanceConfig = instanceConfig; this.jobNamespace = jobNamespace; this.jobName = jobName; + this.kubernetesServiceDomainSuffix = kubernetesServiceDomainSuffix; this.customLabels = customLabels; this.functionDockerImages = functionDockerImages; this.pulsarDockerImageName = pulsarDockerImageName; @@ -320,7 +323,6 @@ public class KubernetesRuntime implements Runtime { channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()]; stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails() .getParallelism()]; - String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName); for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) { String address = getServiceUrl(jobName, jobNamespace, i); @@ -1194,11 +1196,11 @@ public class KubernetesRuntime implements Runtime { final String shortHash = DigestUtils.sha1Hex(jobNameBase).toLowerCase().substring(0, 8); return convertedJobName + "-" + shortHash; } - - private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) { - return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, instanceId, jobName, jobNamespace); + @VisibleForTesting + String getServiceUrl(String jobName, String jobNamespace, int instanceId) { + String suffix = isNotBlank(kubernetesServiceDomainSuffix) ? kubernetesServiceDomainSuffix : "svc.cluster.local"; + return String.format("%s-%d.%s.%s.%s", jobName, instanceId, jobName, jobNamespace, suffix); } - public static void doChecks(Function.FunctionDetails functionDetails, String overridenJobName) { final String jobName = createJobName(functionDetails, overridenJobName); if (!jobName.equals(jobName.toLowerCase())) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index bbb6e3992a0..cba13acd5b0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -101,6 +101,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private String functionInstanceClassPath; private String downloadDirectory; private int gracePeriodSeconds; + private String kubernetesServiceDomainSuffix; @ToString.Exclude @EqualsAndHashCode.Exclude @@ -178,7 +179,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { if (!Paths.get(this.downloadDirectory).isAbsolute()) { this.downloadDirectory = this.pulsarRootDir + "/" + this.downloadDirectory; } - + this.kubernetesServiceDomainSuffix = factoryConfig.getKubernetesServiceDomainSuffix(); this.submittingInsidePod = factoryConfig.getSubmittingInsidePod(); this.installUserCodeDependencies = factoryConfig.getInstallUserCodeDependencies(); this.pythonDependencyRepository = factoryConfig.getPythonDependencyRepository(); @@ -318,6 +319,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { // get the namespace for this function overriddenNamespace, overriddenName, + kubernetesServiceDomainSuffix, customLabels, installUserCodeDependencies, pythonDependencyRepository, diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java index 43cdc035076..ea196923497 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java @@ -47,7 +47,12 @@ public class KubernetesRuntimeFactoryConfig { doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`" ) protected String pulsarDockerImageName; - + @FieldContext( + doc = "Optional domain suffix to use when the Function Worker constructs the gRPC address " + + "to connect to function instances. If left blank, it defaults to `.svc.cluster.local`. " + + "Set this if your Function Worker is outside the cluster and connects via an external Gateway/Ingress." + ) + protected String kubernetesServiceDomainSuffix; @FieldContext( doc = "The function docker images used to run function instance according to different " + "configurations provided by users. By default it is `apachepulsar/pulsar`" diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index f8069efe299..585c7e92009 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -341,6 +341,29 @@ public class KubernetesRuntimeTest { return config; } + @Test + public void testGetServiceUrl() throws Exception { + factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true); + + KubernetesRuntime container1 = factory.createContainer( + config, userJarFile, userJarFile, null, null, 30L); + assertEquals(container1.getServiceUrl("my-job", "my-namespace", 0), + "my-job-0.my-job.my-namespace.svc.cluster.local"); + + KubernetesRuntimeFactory factory2 = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); + java.lang.reflect.Field field = KubernetesRuntimeFactory.class.getDeclaredField( + "kubernetesServiceDomainSuffix"); + field.setAccessible(true); + field.set(factory2, "custom.gateway.internal"); + + KubernetesRuntime container2 = factory2.createContainer( + config, userJarFile, userJarFile, null, null, 30L); + assertEquals(container2.getServiceUrl("my-job", "my-namespace", 0), + "my-job-0.my-job.my-namespace.custom.gateway.internal"); + } + + @Test public void testRamPadding() throws Exception { verifyRamPadding(0, 1000, 1000);
