This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c3445a Fix: Add kubernetes namespace to function instance url (#4701)
8c3445a is described below
commit 8c3445ad6746df93fef80d2c661374cdab00bc38
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Jul 20 23:21:22 2019 -0700
Fix: Add kubernetes namespace to function instance url (#4701)
### Motivation
Currently, if the kubernetes namespace set to deploy functions in is
different than the one in which brokers/workers reside, get status and stats
doesn't work because the url for instances does not specify the namespace.
---
.../org/apache/pulsar/functions/runtime/KubernetesRuntime.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index b06e405..28f37f3 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -250,9 +250,9 @@ public class KubernetesRuntime implements Runtime {
if (channel == null && stub == null) {
channel = new
ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new
InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
+ String jobName =
createJobName(instanceConfig.getFunctionDetails());
for (int i = 0; i <
instanceConfig.getFunctionDetails().getParallelism(); ++i) {
- String address =
createJobName(instanceConfig.getFunctionDetails()) + "-" +
- i + "." +
createJobName(instanceConfig.getFunctionDetails());
+ String address = getServiceUrl(jobName, jobNamespace, i);
channel[i] = ManagedChannelBuilder.forAddress(address,
GRPC_PORT)
.usePlaintext(true)
.build();
@@ -993,6 +993,10 @@ public class KubernetesRuntime implements Runtime {
return "pf-" + tenant + "-" + namespace + "-" + functionName;
}
+ private static String getServiceUrl(String jobName, String jobNamespace,
int instanceId) {
+ return String.format("%s-%d.%s.%s.svc.cluster.local", jobName,
instanceId, jobName, jobNamespace);
+ }
+
public static void doChecks(Function.FunctionDetails functionDetails) {
final String jobName = createJobName(functionDetails);
if (!jobName.equals(jobName.toLowerCase())) {