This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98eeaf5 function-instance lookup: retrieve function instance owner's
workerId (#2201)
98eeaf5 is described below
commit 98eeaf5fb8344e021e1ff3a20c6c36759a6644bd
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Jul 19 09:50:46 2018 -0700
function-instance lookup: retrieve function instance owner's workerId
(#2201)
### Motivation
for administrative purpose, we need to know worker-owner of the
function-instance. so, adding workerId along with function-stats to find out
owner of the instance.
---
.../test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java | 7 +++++--
.../proto/src/main/proto/InstanceCommunication.proto | 2 ++
.../pulsar/functions/worker/FunctionRuntimeManager.java | 14 ++++++++++----
3 files changed, 17 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 78fd2e2..a7b34ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -104,6 +104,7 @@ public class PulsarSinkE2ETest {
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
String primaryHost;
+ String workerId;
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
private final int brokerWebServicePort = PortManager.nextFreePort();
@@ -220,9 +221,9 @@ public class PulsarSinkE2ETest {
workerConfig.setWorkerPort(workerServicePort);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+ this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname +
"-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
- workerConfig
- .setWorkerId("c-" + config.getClusterName() + "-fw-" +
hostname + "-" + workerConfig.getWorkerPort());
+ workerConfig.setWorkerId(workerId);
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
@@ -368,8 +369,10 @@ public class PulsarSinkE2ETest {
double count =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
double success =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
+ String ownerWorkerId = stats.getWorkerId();
Assert.assertEquals((int) count, totalMsgs);
Assert.assertEquals((int) success, totalMsgs);
+ Assert.assertEquals(ownerWorkerId, workerId);
}
protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName, String sinkTopic, String
subscriptionName) {
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1a80f5e..0078bc2 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -50,6 +50,8 @@ message FunctionStatus {
int64 lastInvocationTime = 13;
string instanceId = 14;
MetricsData metrics = 15;
+ // owner of function-instance
+ string workerId = 16;
}
message FunctionStatusList {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bc6dbe3..05a79d8 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -239,9 +239,10 @@ public class FunctionRuntimeManager implements
AutoCloseable{
*/
public InstanceCommunication.FunctionStatus
getFunctionInstanceStatus(String tenant, String namespace,
String functionName, int instanceId) {
- String workerId = this.workerConfig.getWorkerId();
-
Assignment assignment = this.findAssignment(tenant, namespace,
functionName, instanceId);
+ final String assignedWorkerId = assignment.getWorkerId();
+ final String workerId = this.workerConfig.getWorkerId();
+
if (assignment == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
@@ -252,13 +253,16 @@ public class FunctionRuntimeManager implements
AutoCloseable{
InstanceCommunication.FunctionStatus functionStatus = null;
// If I am running worker
- if (assignment.getWorkerId().equals(workerId)) {
+ if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
try {
- functionStatus =
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get();
+ InstanceCommunication.FunctionStatus.Builder
functionStatusBuilder = InstanceCommunication.FunctionStatus
+
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
+ functionStatusBuilder.setWorkerId(assignedWorkerId);
+ functionStatus = functionStatusBuilder.build();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -270,6 +274,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
if (functionRuntimeInfo.getStartupException() != null) {
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
}
+ functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
}
} else {
@@ -306,6 +311,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
log.warn("Got invalid function status response from {}",
workerInfo, e);
throw new RuntimeException(e);
}
+ functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
}