This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 034f6ba   Cleanup consumer subscriptions and fix graceful shutdown for 
functions/sinks (#3299)
034f6ba is described below

commit 034f6ba04e9c48abec1517668cb4fa46efdf02bc
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Jan 14 09:54:23 2019 -0800

     Cleanup consumer subscriptions and fix graceful shutdown for 
functions/sinks (#3299)
    
    * Cleanup consumer subscriptions and fix graceful shutdown for functions
    
    * cleaning up
    
    * removing testing files
    
    * add unit tests
    
    * adding integration testing
    
    * refactoring
    
    * refactoring and adding tests
    
    * cleaning up
---
 .../worker/PulsarWorkerAssignmentTest.java         |   2 +
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 168 ++++++++++++++++++++-
 .../pulsar/common/functions/FunctionConfig.java    |   2 +
 .../org/apache/pulsar/common/io/SinkConfig.java    |   3 +-
 .../pulsar/functions/instance/InstanceUtils.java   |  12 ++
 .../functions/instance/JavaInstanceRunnable.java   |   4 +-
 .../functions/source/PulsarSourceConfig.java       |   1 +
 .../instance/src/main/python/Function_pb2.py       |  47 +++---
 .../instance/src/main/python/function_stats.py     |   2 +-
 .../instance/src/main/python/python_instance.py    |  22 ++-
 .../src/main/python/python_instance_main.py        |   4 +-
 pulsar-functions/instance/src/main/python/util.py  |   4 +-
 .../proto/src/main/proto/Function.proto            |   1 +
 .../pulsar/functions/runtime/ProcessRuntime.java   |  29 +++-
 .../pulsar/functions/runtime/ThreadRuntime.java    |   2 +
 .../functions/utils/FunctionConfigUtils.java       |   5 +
 .../pulsar/functions/utils/SinkConfigUtils.java    |   6 +
 .../pulsar/functions/worker/FunctionAction.java    |   3 +-
 .../pulsar/functions/worker/FunctionActioner.java  |  85 +++++++++--
 .../functions/worker/FunctionRuntimeManager.java   |  34 ++++-
 .../pulsar/functions/worker/WorkerService.java     |   8 +-
 .../functions/worker/FunctionActionerTest.java     |   5 +-
 .../worker/FunctionRuntimeManagerTest.java         |  25 +--
 .../functions/worker/MembershipManagerTest.java    |  32 ++--
 .../integration/functions/PulsarFunctionsTest.java |  20 +++
 25 files changed, 439 insertions(+), 87 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 5cdf5a4..4b4597e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import com.google.gson.Gson;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -213,6 +214,7 @@ public class PulsarWorkerAssignmentTest {
             }
         }, 5, 150);
         // validate pulsar sink consumer has started on the topic
+        log.info("admin.topics().getStats(sinkTopic): {}", new 
Gson().toJson(admin.topics().getStats(sinkTopic)));
         
assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(),
 1);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 94e7bfa..f04fede 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,11 +20,13 @@ package org.apache.pulsar.io;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.gson.Gson;
 import lombok.ToString;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.client.admin.BrokerStats;
@@ -42,11 +44,13 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -119,6 +123,7 @@ public class PulsarFunctionE2ETest {
     private final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
     private final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
 
@@ -148,14 +153,24 @@ public class PulsarFunctionE2ETest {
         config.setBrokerServicePort(brokerServicePort);
         config.setBrokerServicePortTls(brokerServiceTlsPort);
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+        config.setTlsAllowInsecureConnection(true);
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);
         config.setAuthenticationProviders(providers);
+
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config.setTlsAllowInsecureConnection(true);
+        config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        config.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        config.setBrokerClientTlsEnabled(true);
+
+
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
@@ -170,7 +185,7 @@ public class PulsarFunctionE2ETest {
         authTls.configure(authParams);
 
         admin = spy(
-                
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
+                
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
                         
.allowTlsInsecureConnection(true).authentication(authTls).build());
 
         brokerStatsClient = admin.brokerStats();
@@ -211,6 +226,7 @@ public class PulsarFunctionE2ETest {
     }
 
     private WorkerService createPulsarFunctionWorker(ServiceConfiguration 
config) {
+
         workerConfig = new WorkerConfig();
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
@@ -237,7 +253,7 @@ public class PulsarFunctionE2ETest {
                 String.format("tlsCertFile:%s,tlsKeyFile:%s", 
TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
         workerConfig.setUseTls(true);
         workerConfig.setTlsAllowInsecureConnection(true);
-        workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+        workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
 
         workerConfig.setAuthenticationEnabled(true);
         workerConfig.setAuthorizationEnabled(true);
@@ -260,6 +276,7 @@ public class PulsarFunctionE2ETest {
         
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
         return functionConfig;
     }
 
@@ -283,6 +300,7 @@ public class PulsarFunctionE2ETest {
         
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         sinkConfig.setInputs(Collections.singleton(sourceTopic));
         sinkConfig.setSourceSubscriptionName(subName);
+        sinkConfig.setCleanupSubscription(true);
         return sinkConfig;
     }
     /**
@@ -350,6 +368,20 @@ public class PulsarFunctionE2ETest {
         
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
                 totalMsgs);
 
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
     }
 
     @Test(timeOut = 20000)
@@ -535,6 +567,21 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
         assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertTrue(m.value > 0.0);
+
+
+        // delete functions
+        admin.sink().deleteSink(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
     }
 
     @Test(timeOut = 20000)
@@ -945,6 +992,20 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, 
namespacePortion));
         assertEquals(m.tags.get("fqfn"), 
FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, 
functionName));
         assertEquals(m.value, (double) totalMsgs);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
     }
 
     @Test(timeOut = 20000)
@@ -1012,6 +1073,20 @@ public class PulsarFunctionE2ETest {
         assertEquals((int)count, totalMsgs);
         assertEquals((int) success, totalMsgs);
         assertEquals(ownerWorkerId, workerId);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
     }
 
     @Test(dataProvider = "validRoleName")
@@ -1110,6 +1185,93 @@ public class PulsarFunctionE2ETest {
         producer.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testFunctionAutomaticSubCleanup() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespacePortion);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        functionConfig.setInputs(Collections.singleton(sourceTopic));
+        
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+
+        admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar source consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        int totalMsgs = 10;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, 
propertyValue).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(
+                        InstanceUtils.getDefaultSubscriptionName(tenant, 
namespacePortion, functionName));
+                return subStats.unackedMessages == 0 && 
subStats.msgThroughputOut == totalMsgs;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 200);
+
+        FunctionStatus functionStatus = 
admin.functions().getFunctionStatus(tenant, namespacePortion,
+                functionName);
+
+        int numInstances = functionStatus.getNumInstances();
+        assertEquals(numInstances, 1);
+
+        FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status
+                = functionStatus.getInstances().get(0).getStatus();
+
+        double count = status.getNumReceived();
+        double success = status.getNumSuccessfullyProcessed();
+        String ownerWorkerId = status.getWorkerId();
+        assertEquals((int)count, totalMsgs);
+        assertEquals((int) success, totalMsgs);
+        assertEquals(ownerWorkerId, workerId);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+    }
+
+
     public static String getPrometheusMetrics(int metricsPort) throws 
IOException {
         StringBuilder result = new StringBuilder();
         URL url = new URL(String.format("http://%s:%s/metrics";, 
InetAddress.getLocalHost().getHostAddress(), metricsPort));
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index bf27155..68a9a7d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -93,4 +93,6 @@ public class FunctionConfig {
     private Long timeoutMs;
     private String jar;
     private String py;
+    // Whether the subscriptions the functions created/used should be deleted 
when the functions is deleted
+    private Boolean cleanupSubscription;
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index b85de06..abb2067 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -66,6 +66,7 @@ public class SinkConfig {
     private Resources resources;
     private Boolean autoAck;
     private Long timeoutMs;
-
     private String archive;
+    // Whether the subscriptions the functions created/used should be deleted 
when the functions is deleted
+    private Boolean cleanupSubscription;
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 88a9df3..9db47cf 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 
 import net.jodah.typetools.TypeResolver;
@@ -107,6 +108,17 @@ public class InstanceUtils {
         return SINK;
     }
 
+    public static String getDefaultSubscriptionName(String tenant, String 
namespace, String name) {
+        return FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, 
name);
+    }
+
+    public static String getDefaultSubscriptionName(Function.FunctionDetails 
functionDetails) {
+        return getDefaultSubscriptionName(
+                functionDetails.getTenant(),
+                functionDetails.getNamespace(),
+                functionDetails.getName());
+    }
+
     public static Map<String, String> getProperties(Utils.ComponentType 
componentType,
                                                     String fullyQualifiedName, 
int instanceId) {
         Map<String, String> properties = new HashMap<>();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fde9628..668b8bd 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -299,7 +299,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     private void loadJars() throws Exception {
         try {
-            log.info("jarFile: {}", jarFile);
+            log.info("Load JAR: {}", jarFile);
             // Let's first try to treat it as a nar archive
             fnCache.registerFunctionInstanceWithArchive(
                 instanceConfig.getFunctionId(),
@@ -634,7 +634,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
             pulsarSourceConfig.setSubscriptionName(
                     StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? 
sourceSpec.getSubscriptionName()
-                            : 
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+                            : 
InstanceUtils.getDefaultSubscriptionName(instanceConfig.getFunctionDetails()));
             pulsarSourceConfig.setProcessingGuarantees(
                     FunctionConfig.ProcessingGuarantees.valueOf(
                             
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index cf843be..ba7de68 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,6 +37,7 @@ public class PulsarSourceConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     SubscriptionType subscriptionType;
     private String subscriptionName;
+    // Whether the subscriptions the functions created/used should be deleted 
when the functions is deleted
     private Integer maxMessageRetries = -1;
     private String deadLetterTopic;
 
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 3979d49..f5141d4 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
+  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1744,
-  serialized_end=1823,
+  serialized_start=1773,
+  serialized_end=1852,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1825,
-  serialized_end=1869,
+  serialized_start=1854,
+  serialized_end=1898,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -420,8 +420,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1073,
-  serialized_end=1134,
+  serialized_start=1102,
+  serialized_end=1163,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -457,8 +457,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1136,
-  serialized_end=1206,
+  serialized_start=1165,
+  serialized_end=1235,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -538,6 +538,13 @@ _SOURCESPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='cleanupSubscription', 
full_name='proto.SourceSpec.cleanupSubscription', index=10,
+      number=11, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -551,7 +558,7 @@ _SOURCESPEC = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=722,
-  serialized_end=1206,
+  serialized_end=1235,
 )
 
 
@@ -623,8 +630,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1209,
-  serialized_end=1354,
+  serialized_start=1238,
+  serialized_end=1383,
 )
 
 
@@ -661,8 +668,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1356,
-  serialized_end=1428,
+  serialized_start=1385,
+  serialized_end=1457,
 )
 
 
@@ -713,8 +720,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1431,
-  serialized_end=1592,
+  serialized_start=1460,
+  serialized_end=1621,
 )
 
 
@@ -751,8 +758,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1594,
-  serialized_end=1675,
+  serialized_start=1623,
+  serialized_end=1704,
 )
 
 
@@ -789,8 +796,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1677,
-  serialized_end=1742,
+  serialized_start=1706,
+  serialized_end=1771,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_PROCESSINGGUARANTEES
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py 
b/pulsar-functions/instance/src/main/python/function_stats.py
index d3dc322..34793a5 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -103,7 +103,7 @@ class Stats(object):
     self._stat_total_received_1min = 
self.stat_total_received_1min.labels(*self.metrics_labels)
 
     # start time for windowed metrics
-    util.FixedTimer(60, self.reset).start()
+    util.FixedTimer(60, self.reset, name="windowed-metrics-timer").start()
 
   def get_total_received(self):
     return self._stat_total_received._value.get();
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index cfd9eae..d86173b 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -42,7 +42,6 @@ import InstanceCommunication_pb2
 
 from functools import partial
 from collections import namedtuple
-from threading import Timer
 from function_stats import Stats
 
 Log = log.Log
@@ -111,8 +110,6 @@ class PythonInstance(object):
       os.kill(os.getpid(), signal.SIGKILL)
       sys.exit(1)
 
-    Timer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer).start()
-
   def run(self):
     # Setup consumers and input deserializers
     mode = pulsar._pulsar.ConsumerType.Shared
@@ -187,7 +184,8 @@ class PythonInstance(object):
     # start proccess spawner health check timer
     self.last_health_check_ts = time.time()
     if self.expected_healthcheck_interval > 0:
-      Timer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer).start()
+      timer = util.FixedTimer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer, name="health-check-timer")
+      timer.start()
 
   def actual_execution(self):
     Log.debug("Started Thread for executing the function")
@@ -384,3 +382,19 @@ class PythonInstance(object):
   def join(self):
     self.queue.put(InternalQuitMessage(True), True)
     self.execution_thread.join()
+    self.close()
+
+  def close(self):
+    Log.info("Closing python instance...")
+    if self.producer:
+      self.producer.close()
+
+    if self.consumers:
+      for consumer in self.consumers.values():
+        try:
+          consumer.close()
+        except:
+          pass
+
+    if self.pulsar_client:
+      self.pulsar_client.close()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index b7b1bfc..c93d6e1 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -31,6 +31,7 @@ import time
 import zipfile
 import json
 import inspect
+import threading
 
 import pulsar
 
@@ -193,7 +194,8 @@ def main():
     time.sleep(1)
 
   pyinstance.join()
-  sys.exit(1)
+  # make sure to close all non-daemon threads before this!
+  sys.exit(0)
 
 if __name__ == '__main__':
   main()
diff --git a/pulsar-functions/instance/src/main/python/util.py 
b/pulsar-functions/instance/src/main/python/util.py
index 0978f39..390aed1 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -76,10 +76,12 @@ def get_properties(fullyQualifiedName, instanceId):
 
 class FixedTimer():
 
-    def __init__(self, t, hFunction):
+    def __init__(self, t, hFunction, name="timer-thread"):
         self.t = t
         self.hFunction = hFunction
         self.thread = Timer(self.t, self.handle_function)
+        self.thread.setName(name)
+        self.thread.setDaemon(True)
 
     def handle_function(self):
         self.hFunction()
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 8d93764..cb5021b 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -98,6 +98,7 @@ message SourceSpec {
      * already present in the server */
     string builtin = 8;
     string subscriptionName = 9;
+    bool cleanupSubscription = 11;
 }
 
 message SinkSpec {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d3046ba..14e68cc 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -179,18 +179,39 @@ class ProcessRuntime implements Runtime {
     }
 
     @Override
-    public void stop() {
+    public void stop() throws InterruptedException {
         if (timer != null) {
             timer.cancel(false);
         }
-        if (process != null) {
-            process.destroyForcibly();
-        }
         if (channel != null) {
             channel.shutdown();
         }
         channel = null;
         stub = null;
+
+        // kill process
+        if (process != null) {
+            process.destroy();
+            int i = 0;
+            // gracefully terminate at first
+            while(process.isAlive()) {
+                Thread.sleep(100);
+                if (i > 100) {
+                    break;
+                }
+                i++;
+            }
+
+            // forcibly kill after timeout
+            if (process.isAlive()) {
+                log.warn("Process for instance {} did not exit within timeout. 
Forcibly killing process...",
+                        Utils.getFullyQualifiedInstanceId(
+                                
instanceConfig.getFunctionDetails().getTenant(),
+                                
instanceConfig.getFunctionDetails().getNamespace(),
+                                instanceConfig.getFunctionDetails().getName(), 
instanceConfig.getInstanceId()));
+                process.destroyForcibly();
+            }
+        }
     }
 
     @Override
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index be049c3..2bd4644 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -124,6 +124,8 @@ class ThreadRuntime implements Runtime {
             } catch (InterruptedException e) {
                 // ignore this
             }
+            // make sure JavaInstanceRunnable is closed
+            this.javaInstanceRunnable.close();
         }
     }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 2459cfd..04300db 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -118,6 +118,11 @@ public class FunctionConfigUtils {
         if (functionConfig.getTimeoutMs() != null) {
             sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
         }
+        if (functionConfig.getCleanupSubscription() != null) {
+            
sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription());
+        } else {
+            sourceSpecBuilder.setCleanupSubscription(true);
+        }
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
         // Setup sink
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 413070a..a972947 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -151,6 +151,12 @@ public class SinkConfigUtils {
             sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
         }
 
+        if (sinkConfig.getCleanupSubscription() != null) {
+            
sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
+        } else {
+            sourceSpecBuilder.setCleanupSubscription(true);
+        }
+
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
         // set up sink spec
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
index 23a8154..ded8268 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
@@ -31,7 +31,8 @@ public class FunctionAction {
 
     public enum Action {
         START,
-        STOP
+        STOP,
+        TERMINATE
     }
 
     private Action action;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 46343a5..2ab4828 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.common.functions.Utils.FILE;
 import static org.apache.pulsar.common.functions.Utils.HTTP;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@@ -37,10 +38,13 @@ import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
+import com.google.gson.Gson;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -50,8 +54,12 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder;
@@ -78,35 +86,43 @@ public class FunctionActioner implements AutoCloseable {
     private volatile boolean running;
     private Thread actioner;
     private final ConnectorsManager connectorsManager;
+    private final PulsarAdmin pulsarAdmin;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
                             LinkedBlockingQueue<FunctionAction> actionQueue,
-                            ConnectorsManager connectorsManager) {
+                            ConnectorsManager connectorsManager, PulsarAdmin 
pulsarAdmin) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
         this.actionQueue = actionQueue;
         this.connectorsManager = connectorsManager;
+        this.pulsarAdmin = pulsarAdmin;
         actioner = new Thread(() -> {
             log.info("Starting Actioner Thread...");
             while(running) {
                 try {
                     FunctionAction action = actionQueue.poll(1, 
TimeUnit.SECONDS);
                     if (action == null) continue;
-                    if (action.getAction() == FunctionAction.Action.START) {
-                        try {
-                            startFunction(action.getFunctionRuntimeInfo());
-                        } catch (Exception ex) {
-                            FunctionDetails details = 
action.getFunctionRuntimeInfo().getFunctionInstance()
-                                    
.getFunctionMetaData().getFunctionDetails();
-                            log.info("{}/{}/{} Error starting function", 
details.getTenant(), details.getNamespace(),
-                                    details.getName(), ex);
-                            
action.getFunctionRuntimeInfo().setStartupException(ex);
-                        }
-                    } else {
-                        stopFunction(action.getFunctionRuntimeInfo());
+                    switch (action.getAction()) {
+                        case START:
+                            try {
+                                startFunction(action.getFunctionRuntimeInfo());
+                            } catch (Exception ex) {
+                                FunctionDetails details = 
action.getFunctionRuntimeInfo().getFunctionInstance()
+                                        
.getFunctionMetaData().getFunctionDetails();
+                                log.info("{}/{}/{} Error starting function", 
details.getTenant(), details.getNamespace(),
+                                        details.getName(), ex);
+                                
action.getFunctionRuntimeInfo().setStartupException(ex);
+                            }
+                            break;
+                        case STOP:
+                            stopFunction(action.getFunctionRuntimeInfo());
+                            break;
+                        case TERMINATE:
+                            terminateFunction(action.getFunctionRuntimeInfo());
+                            break;
                     }
                 } catch (InterruptedException ex) {
                 }
@@ -266,6 +282,49 @@ public class FunctionActioner implements AutoCloseable {
         }
     }
 
+    private void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+        FunctionDetails details = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                .getFunctionDetails();
+        log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), 
details.getNamespace(), details.getName(),
+                functionRuntimeInfo.getFunctionInstance().getInstanceId());
+
+        stopFunction(functionRuntimeInfo);
+        //cleanup subscriptions
+        if (details.getSource().getCleanupSubscription()) {
+            Map<String, Function.ConsumerSpec> consumerSpecMap = 
details.getSource().getInputSpecsMap();
+            consumerSpecMap.entrySet().forEach(new Consumer<Map.Entry<String, 
Function.ConsumerSpec>>() {
+                @Override
+                public void accept(Map.Entry<String, Function.ConsumerSpec> 
stringConsumerSpecEntry) {
+
+                    Function.ConsumerSpec consumerSpec = 
stringConsumerSpecEntry.getValue();
+                    String topic = stringConsumerSpecEntry.getKey();
+                    String subscriptionName = functionRuntimeInfo
+                            .getFunctionInstance().getFunctionMetaData()
+                            
.getFunctionDetails().getSource().getSubscriptionName();
+                    // if user specified subscription name is empty use 
default subscription name
+                    if (isBlank(subscriptionName)) {
+                        subscriptionName =  
InstanceUtils.getDefaultSubscriptionName(
+                                functionRuntimeInfo.getFunctionInstance()
+                                        
.getFunctionMetaData().getFunctionDetails());
+                    }
+
+                    try {
+                        if (consumerSpec.getIsRegexPattern()) {
+                            
pulsarAdmin.namespaces().unsubscribeNamespace(TopicName.get(topic).getNamespace(),
 subscriptionName);
+                        } else {
+                            pulsarAdmin.topics().deleteSubscription(topic, 
subscriptionName);
+                        }
+                    } catch (PulsarAdminException e) {
+                        log.warn("Failed to cleanup {} subscription for {}", 
subscriptionName,
+                                FunctionDetailsUtils.getFullyQualifiedName(
+                                        
functionRuntimeInfo.getFunctionInstance()
+                                                
.getFunctionMetaData().getFunctionDetails()), e);
+                    }
+                }
+            });
+        }
+    }
+
     private String getDownloadPackagePath(FunctionMetaData functionMetaData, 
int instanceId) {
         return StringUtils.join(
                 new String[]{
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 152c1db..fdd62d7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
@@ -102,8 +103,11 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     @Getter
     boolean isInitializePhase = false;
 
+    private final FunctionMetaDataManager functionMetaDataManager;
+
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
-            MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
+                                  MembershipManager membershipManager, 
ConnectorsManager connectorsManager,
+                                  FunctionMetaDataManager 
functionMetaDataManager) throws Exception {
         this.workerConfig = workerConfig;
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
@@ -168,9 +172,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         this.actionQueue = new LinkedBlockingQueue<>();
 
         this.functionActioner = new FunctionActioner(this.workerConfig, 
runtimeFactory,
-                dlogNamespace, actionQueue, connectorsManager);
+                dlogNamespace, actionQueue, connectorsManager, 
workerService.getBrokerAdmin());
 
         this.membershipManager = membershipManager;
+        this.functionMetaDataManager = functionMetaDataManager;
     }
 
     /**
@@ -636,7 +641,17 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     public synchronized void deleteAssignment(String fullyQualifiedInstanceId) 
{
         FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
         if (functionRuntimeInfo != null) {
-            this.insertStopAction(functionRuntimeInfo);
+            Function.FunctionDetails functionDetails = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+
+            // check if this is part of a function delete operation or update 
operation
+            // TODO could be a race condition here if functionMetaDataTailer 
somehow does not receive the functionMeta prior to the 
functionAssignmentsTailer gets the assignment for the function.
+            if 
(this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), 
functionDetails.getNamespace(), functionDetails.getName())) {
+                // function still exists thus probably an update or stop 
operation
+                this.insertStopAction(functionRuntimeInfo);
+            } else {
+                // function doesn't exist anymore thus we should terminate
+                this.insertTerminateAction(functionRuntimeInfo);
+            }
             this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
         }
         
@@ -729,6 +744,19 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
     }
 
+    void insertTerminateAction(FunctionRuntimeInfo functionRuntimeInfo) {
+        if (!this.isInitializePhase) {
+            FunctionAction functionAction = new FunctionAction();
+            functionAction.setAction(FunctionAction.Action.TERMINATE);
+            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
+            try {
+                actionQueue.put(functionAction);
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Interrupted while putting action");
+            }
+        }
+    }
+
     private Assignment findAssignment(String tenant, String namespace, String 
functionName, int instanceId) {
         String fullyQualifiedInstanceId
                 = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, 
namespace, functionName, instanceId);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 43404ce..30d5cbd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -163,10 +163,7 @@ public class WorkerService {
 
             // create function runtime manager
             this.functionRuntimeManager = new FunctionRuntimeManager(
-                    this.workerConfig, this, this.dlogNamespace, 
this.membershipManager, connectorsManager);
-
-            // initialize function runtime manager
-            this.functionRuntimeManager.initialize();
+                    this.workerConfig, this, this.dlogNamespace, 
this.membershipManager, connectorsManager, functionMetaDataManager);
 
             // Setting references to managers in scheduler
             
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -176,6 +173,9 @@ public class WorkerService {
             // initialize function metadata manager
             this.functionMetaDataManager.initialize();
 
+            // initialize function runtime manager
+            this.functionRuntimeManager.initialize();
+
             authenticationService = new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
 
             // Starting cluster services
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index a4926e3..65ead73 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.runtime.Runtime;
@@ -69,7 +70,7 @@ public class FunctionActionerTest {
 
         @SuppressWarnings("resource")
         FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
-                new ConnectorsManager(workerConfig));
+                new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
         Runtime runtime = mock(Runtime.class);
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -112,7 +113,7 @@ public class FunctionActionerTest {
 
         @SuppressWarnings("resource")
         FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
-                new ConnectorsManager(workerConfig));
+                new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
 
         // (1) test with file url. functionActioner should be able to consider 
file-url and it should be able to call
         // RuntimeSpawner
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 651e5d0..17d6642 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -85,8 +85,8 @@ public class FunctionRuntimeManagerTest {
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class)));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -179,8 +179,8 @@ public class FunctionRuntimeManagerTest {
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class)));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -225,8 +225,8 @@ public class FunctionRuntimeManagerTest {
                 .get("worker-2").get("test-tenant/test-namespace/func-2:0"), 
assignment2);
 
         verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+        verify(functionRuntimeManager, 
times(1)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+        verify(functionRuntimeManager).insertTerminateAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
                 if (o instanceof FunctionRuntimeInfo) {
@@ -244,7 +244,7 @@ public class FunctionRuntimeManagerTest {
         Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
         Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
                 new FunctionAction()
-                        .setAction(FunctionAction.Action.STOP)
+                        .setAction(FunctionAction.Action.TERMINATE)
                         .setFunctionRuntimeInfo(new 
FunctionRuntimeInfo().setFunctionInstance(
                                 
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                         .build()))));
@@ -277,8 +277,8 @@ public class FunctionRuntimeManagerTest {
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class)));
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -323,6 +323,9 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.processAssignment(assignment3);
 
         verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+        // make sure terminate is not called since this is a update operation
+        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+
         verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
@@ -470,8 +473,8 @@ public class FunctionRuntimeManagerTest {
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
+                mock(ConnectorsManager.class),
+                mock(FunctionMetaDataManager.class)));
 
 
         functionRuntimeManager.initialize();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 678d338..527557b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -138,15 +138,15 @@ public class MembershipManagerTest {
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-        
+
+        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
-        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+                mock(ConnectorsManager.class),
+                functionMetaDataManager));
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, pulsarClient));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -209,16 +209,16 @@ public class MembershipManagerTest {
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-        
+
+        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
+                mock(ConnectorsManager.class),
+                functionMetaDataManager));
 
-        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -305,15 +305,15 @@ public class MembershipManagerTest {
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-        
+
+        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
-        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+                mock(ConnectorsManager.class),
+                functionMetaDataManager));
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -380,15 +380,15 @@ public class MembershipManagerTest {
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(workerConfig).when(workerService).getWorkerConfig();
         
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-        
+
+        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
                 workerConfig,
                 workerService,
                 mock(Namespace.class),
                 mock(MembershipManager.class),
-                mock(ConnectorsManager.class)
-        ));
-        FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
+                mock(ConnectorsManager.class),
+                functionMetaDataManager));
         MembershipManager membershipManager = spy(new 
MembershipManager(workerService, mockPulsarClient()));
 
         List<WorkerInfo> workerInfoList = new LinkedList<>();
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0469d80..23b7259 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -751,6 +752,10 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         // get function info
         getFunctionInfoNotFound(functionName);
+
+        // make sure subscriptions are cleanup
+        checkSubscriptionsCleanup(inputTopicName);
+
     }
 
     private static void submitExclamationFunction(Runtime runtime,
@@ -943,6 +948,21 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         }
     }
 
+    private static void checkSubscriptionsCleanup(String topic) throws 
Exception {
+        try {
+            ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "stats",
+                     topic);
+            TopicStats topicStats = new Gson().fromJson(result.getStdout(), 
TopicStats.class);
+            assertEquals(topicStats.subscriptions.size(), 0);
+
+        } catch (ContainerExecException e) {
+            fail("Command should have exited with non-zero");
+        }
+    }
+
     private static void getFunctionStatus(String functionName, int 
numMessages) throws Exception {
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
             PulsarCluster.ADMIN_SCRIPT,

Reply via email to