sijie closed pull request #2201: function-instance lookup: retrieve function
instance owner's workerId
URL: https://github.com/apache/incubator-pulsar/pull/2201
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 78fd2e2e40..a7b34ae857 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -104,6 +104,7 @@
final String tenant = "external-repl-prop";
String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
String primaryHost;
+ String workerId;
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
private final int brokerWebServicePort = PortManager.nextFreePort();
@@ -220,9 +221,9 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig.setWorkerPort(workerServicePort);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+ this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname +
"-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
- workerConfig
- .setWorkerId("c-" + config.getClusterName() + "-fw-" +
hostname + "-" + workerConfig.getWorkerPort());
+ workerConfig.setWorkerId(workerId);
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
workerConfig.setClientAuthenticationParameters(
@@ -368,8 +369,10 @@ public void testPulsarSinkStats() throws Exception {
double count =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount();
double success =
metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount();
+ String ownerWorkerId = stats.getWorkerId();
Assert.assertEquals((int) count, totalMsgs);
Assert.assertEquals((int) success, totalMsgs);
+ Assert.assertEquals(ownerWorkerId, workerId);
}
protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName, String sinkTopic, String
subscriptionName) {
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1a80f5ecf6..0078bc2544 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -50,6 +50,8 @@ message FunctionStatus {
int64 lastInvocationTime = 13;
string instanceId = 14;
MetricsData metrics = 15;
+ // owner of function-instance
+ string workerId = 16;
}
message FunctionStatusList {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index bc6dbe3686..05a79d8da5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -239,9 +239,10 @@ public synchronized void
removeAssignments(Collection<Assignment> assignments) {
*/
public InstanceCommunication.FunctionStatus
getFunctionInstanceStatus(String tenant, String namespace,
String functionName, int instanceId) {
- String workerId = this.workerConfig.getWorkerId();
-
Assignment assignment = this.findAssignment(tenant, namespace,
functionName, instanceId);
+ final String assignedWorkerId = assignment.getWorkerId();
+ final String workerId = this.workerConfig.getWorkerId();
+
if (assignment == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
@@ -252,13 +253,16 @@ public synchronized void
removeAssignments(Collection<Assignment> assignments) {
InstanceCommunication.FunctionStatus functionStatus = null;
// If I am running worker
- if (assignment.getWorkerId().equals(workerId)) {
+ if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(
Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
if (runtimeSpawner != null) {
try {
- functionStatus =
functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get();
+ InstanceCommunication.FunctionStatus.Builder
functionStatusBuilder = InstanceCommunication.FunctionStatus
+
.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
+ functionStatusBuilder.setWorkerId(assignedWorkerId);
+ functionStatus = functionStatusBuilder.build();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -270,6 +274,7 @@ public synchronized void
removeAssignments(Collection<Assignment> assignments) {
if (functionRuntimeInfo.getStartupException() != null) {
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
}
+ functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
}
} else {
@@ -306,6 +311,7 @@ public synchronized void
removeAssignments(Collection<Assignment> assignments) {
log.warn("Got invalid function status response from {}",
workerInfo, e);
throw new RuntimeException(e);
}
+ functionStatusBuilder.setWorkerId(assignedWorkerId);
functionStatus = functionStatusBuilder.build();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services