This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c3445a  Fix: Add kubernetes namespace to function instance url (#4701)
8c3445a is described below

commit 8c3445ad6746df93fef80d2c661374cdab00bc38
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Jul 20 23:21:22 2019 -0700

    Fix: Add kubernetes namespace to function instance url (#4701)
    
    ### Motivation
    
    
    Currently, if the kubernetes namespace set to deploy functions in is 
different than the one in which brokers/workers reside, get status and stats 
doesn't work because the url for instances does not specify the namespace.
---
 .../org/apache/pulsar/functions/runtime/KubernetesRuntime.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index b06e405..28f37f3 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -250,9 +250,9 @@ public class KubernetesRuntime implements Runtime {
         if (channel == null && stub == null) {
             channel = new 
ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
             stub = new 
InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
+            String jobName = 
createJobName(instanceConfig.getFunctionDetails());
             for (int i = 0; i < 
instanceConfig.getFunctionDetails().getParallelism(); ++i) {
-                String address = 
createJobName(instanceConfig.getFunctionDetails()) + "-" +
-                        i + "." + 
createJobName(instanceConfig.getFunctionDetails());
+                String address = getServiceUrl(jobName, jobNamespace, i);
                 channel[i] = ManagedChannelBuilder.forAddress(address, 
GRPC_PORT)
                         .usePlaintext(true)
                         .build();
@@ -993,6 +993,10 @@ public class KubernetesRuntime implements Runtime {
         return "pf-" + tenant + "-" + namespace + "-" + functionName;
     }
 
+    private static String getServiceUrl(String jobName, String jobNamespace, 
int instanceId) {
+        return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, 
instanceId, jobName, jobNamespace);
+    }
+
     public static void doChecks(Function.FunctionDetails functionDetails) {
         final String jobName = createJobName(functionDetails);
         if (!jobName.equals(jobName.toLowerCase())) {

Reply via email to