This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95ed9b9 Stop all functions gracefully on closing worker-service
(#2548)
95ed9b9 is described below
commit 95ed9b91af20bcdd396ee34d3ea6cc94a52fea31
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Sep 13 14:50:35 2018 -0700
Stop all functions gracefully on closing worker-service (#2548)
### Motivation
Right now, if stopping
[WorkerService](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java#L189)
doesn't stop functions and all the threads stayed alive event `WorkerService`
is stopped.
### Modifications
Stop all function resource gracefully while stopping worker service.
### Result
Function threads will not stay alive while stopping worker-service.
---
.../functions/worker/FunctionRuntimeManager.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index ee6eeec..43cd27b 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -389,6 +389,25 @@ public class FunctionRuntimeManager implements
AutoCloseable{
return Response.status(Status.OK).build();
}
+ /**
+ * It stops all functions instances owned by current worker
+ * @throws Exception
+ */
+ public void stopAllOwnedFunctions() throws Exception {
+ final String workerId = this.workerConfig.getWorkerId();
+ Map<String, Assignment> assignments =
workerIdToAssignments.get(workerId);
+ if (assignments != null) {
+ assignments.values().forEach(assignment -> {
+ String fullyQualifiedInstanceId =
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+ try {
+ stopFunction(fullyQualifiedInstanceId, false);
+ } catch (Exception e) {
+ log.warn("Failed to stop function {} - {}",
fullyQualifiedInstanceId, e.getMessage());
+ }
+ });
+ }
+ }
+
private void stopFunction(String fullyQualifiedInstanceId, boolean
restart) throws Exception {
log.info("[{}] {}..", restart ? "restarting" : "stopping",
fullyQualifiedInstanceId);
FunctionRuntimeInfo functionRuntimeInfo =
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
@@ -647,6 +666,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
@Override
public void close() throws Exception {
+ stopAllOwnedFunctions();
this.functionActioner.close();
this.functionAssignmentTailer.close();
if (runtimeFactory != null) {