This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49fc5e5  [Function] avoid creating assignment snapshot and publish 
individual assigment msg (#2549)
49fc5e5 is described below

commit 49fc5e508a996cfe59949effbcaf0abfa46028ce
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Sep 18 15:22:12 2018 -0700

    [Function] avoid creating assignment snapshot and publish individual 
assigment msg (#2549)
    
    Fix: Compaction with last deleted keys not completing compaction
    
    Delete assignment with empty payload
---
 conf/functions_worker.yml                          |   2 +
 .../pulsar/compaction/TwoPhaseCompactor.java       |   2 +-
 .../apache/pulsar/compaction/CompactionTest.java   |   2 +-
 .../worker/PulsarWorkerAssignmentTest.java         | 370 +++++++++++++++++++++
 .../proto/src/main/proto/Request.proto             |   5 -
 .../functions/worker/FunctionAssignmentTailer.java |  47 ++-
 .../functions/worker/FunctionRuntimeManager.java   | 283 +++++++---------
 .../pulsar/functions/worker/SchedulerManager.java  | 126 ++++---
 .../pulsar/functions/worker/WorkerConfig.java      |   5 +-
 .../pulsar/functions/worker/WorkerService.java     |  15 +-
 .../worker/scheduler/RoundRobinScheduler.java      |  10 +-
 .../worker/FunctionRuntimeManagerTest.java         |  40 +--
 .../functions/worker/MembershipManagerTest.java    |   3 +
 .../functions/worker/SchedulerManagerTest.java     | 298 +++++++++--------
 14 files changed, 791 insertions(+), 417 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 444b7fb..0c7b8af 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -44,6 +44,8 @@ rescheduleTimeoutMs: 60000
 initialBrokerReconnectMaxRetries: 60
 assignmentWriteMaxRetries: 60
 instanceLivenessCheckFreqMs: 30000
+# Frequency how often worker performs compaction on function-topics
+topicCompactionFrequencySec: 1800
 metricsSamplingPeriodSec: 60
 # Enforce authentication
 authenticationEnabled: false
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 425e049..612b336 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -359,4 +359,4 @@ public class TwoPhaseCompactor extends Compactor {
             this.latestForKey = latestForKey;
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 32c93b4..8e7b292 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1268,4 +1268,4 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
new file mode 100644
index 0000000..fe28a51
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+
+import jersey.repackaged.com.google.common.collect.Lists;
+
+/**
+ * Test Pulsar sink on function
+ *
+ */
+public class PulsarWorkerAssignmentTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    PulsarService pulsar;
+    PulsarAdmin admin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String primaryHost;
+    String workerId;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarWorkerAssignmentTest.class);
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, 
PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        String brokerServiceUrl = "http://127.0.0.1:"; + brokerServicePort;
+        String brokerWeServiceUrl = "http://127.0.0.1:"; + brokerWebServicePort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setWebServicePort(brokerWebServicePort);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(brokerServicePort);
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+
+        functionsWorkerService = createPulsarFunctionWorker(config);
+        Optional<WorkerService> functionWorkerService = 
Optional.of(functionsWorkerService);
+        pulsar = new PulsarService(config, functionWorkerService);
+        pulsar.start();
+
+        admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerWeServiceUrl).build());
+
+        brokerStatsClient = admin.brokerStats();
+        primaryHost = String.format("http://%s:%d";, 
InetAddress.getLocalHost().getHostName(), brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(brokerServiceUrl);
+        admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
+        
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsarClient.close();
+        admin.close();
+        functionsWorkerService.stop();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration 
config) {
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + 
config.getBrokerServicePort());
+        workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:"; + 
config.getWebServicePort());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + 
"-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+        workerConfig.setTopicCompactionFrequencySec(1);
+
+        return new WorkerService(workerConfig);
+    }
+
+    @Test
+    public void testFunctionAssignments() throws Exception {
+
+        final String namespacePortion = "assignment-test";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String functionName = "assign";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails.Builder functionDetailsBuilder = 
createFunctionDetails(jarFilePathUrl, tenant, namespacePortion,
+                functionName, "my.*", sinkTopic, subscriptionName);
+        functionDetailsBuilder.setParallelism(2);
+        FunctionDetails functionDetails = functionDetailsBuilder.build();
+
+        // (1) Create function with 2 instance
+        admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sinkTopic).subscriptions.size() 
== 1
+                        && 
admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers
+                                .size() == 2;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate 2 instances have been started
+        assertEquals(admin.topics().getStats(sinkTopic).subscriptions.size(), 
1);
+        
assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(),
 2);
+
+        // (2) Update function with 1 instance
+        functionDetailsBuilder.setParallelism(1);
+        functionDetails = functionDetailsBuilder.build();
+        // try to update function to test: update-function functionality
+        admin.functions().updateFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sinkTopic).subscriptions.size() 
== 1
+                        && 
admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers
+                                .size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(),
 1);
+    }
+
+    @Test(timeOut=20000)
+    public void testFunctionAssignmentsWithRestart() throws Exception {
+
+        final String namespacePortion = "assignment-test";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + 
"/my-topic1";
+        final String baseFunctionName = "assign-restart";
+        final String subscriptionName = "test-sub";
+        final int totalFunctions = 5;
+        final int parallelism = 2;
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+        final FunctionRuntimeManager runtimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
+
+        String jarFilePathUrl = Utils.FILE + ":"
+                + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        FunctionDetails.Builder functionDetailsBuilder = null;
+        // (1) Register functions with 2 instances
+        for (int i = 0; i < totalFunctions; i++) {
+            String functionName = baseFunctionName + i;
+            functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                    "my.*", sinkTopic, subscriptionName);
+            functionDetailsBuilder.setParallelism(parallelism);
+            // set-auto-ack prop =true
+            functionDetailsBuilder.setAutoAck(true);
+            FunctionDetails functionDetails = functionDetailsBuilder.build();
+            admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        }
+        retryStrategically((test) -> {
+            try {
+                Map<String, Assignment> assgn = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+                return assgn.size() == (totalFunctions * parallelism);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // Validate registered assignments
+        Map<String, Assignment> assignments = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+        assertEquals(assignments.size(), (totalFunctions * parallelism));
+
+        // (2) Update function with prop=auto-ack and Delete 2 functions
+        for (int i = 0; i < totalFunctions; i++) {
+            String functionName = baseFunctionName + i;
+            functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, 
tenant, namespacePortion, functionName,
+                    "my.*", sinkTopic, subscriptionName);
+            functionDetailsBuilder.setParallelism(parallelism);
+            // set-auto-ack prop =false
+            functionDetailsBuilder.setAutoAck(false);
+            FunctionDetails functionDetails = functionDetailsBuilder.build();
+            admin.functions().updateFunctionWithUrl(functionDetails, 
jarFilePathUrl);
+        }
+
+        int totalDeletedFunction = 2;
+        for (int i = (totalFunctions - 1); i >= (totalFunctions - 
totalDeletedFunction); i--) {
+            String functionName = baseFunctionName + i;
+            admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+        }
+        retryStrategically((test) -> {
+            try {
+                Map<String, Assignment> assgn = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+                return assgn.size() == ((totalFunctions - 
totalDeletedFunction) * parallelism);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // Validate registered assignments
+        assignments = 
runtimeManager.getCurrentAssignments().values().iterator().next();
+        assertEquals(assignments.size(), ((totalFunctions - 
totalDeletedFunction) * parallelism));
+
+        // (3) Restart worker service and check registered functions
+        URI dlUri = functionsWorkerService.getDlogUri();
+        functionsWorkerService.stop();
+        functionsWorkerService = new WorkerService(workerConfig);
+        functionsWorkerService.start(dlUri);
+        FunctionRuntimeManager runtimeManager2 = 
functionsWorkerService.getFunctionRuntimeManager();
+        retryStrategically((test) -> {
+            try {
+                Map<String, Assignment> assgn = 
runtimeManager2.getCurrentAssignments().values().iterator().next();
+                return assgn.size() == ((totalFunctions - 
totalDeletedFunction) * parallelism);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // Validate registered assignments
+        assignments = 
runtimeManager2.getCurrentAssignments().values().iterator().next();
+        assertEquals(assignments.size(), ((totalFunctions - 
totalDeletedFunction) * parallelism));
+
+        // validate updated function prop = auto-ack=false and instnaceid
+        for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
+            String functionName = baseFunctionName + i;
+            assertFalse(admin.functions().getFunction(tenant, 
namespacePortion, functionName).getAutoAck());
+        }
+    }
+
+    protected static FunctionDetails.Builder createFunctionDetails(String 
jarFile, String tenant, String namespace,
+            String functionName, String sourceTopic, String sinkTopic, String 
subscriptionName) {
+
+        File file = new File(jarFile);
+        try {
+            Reflections.loadJar(file);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException("Failed to load user jar " + file, e);
+        }
+        String sourceTopicPattern = String.format("persistent://%s/%s/%s", 
tenant, namespace, sourceTopic);
+        Class<?> typeArg = byte[].class;
+
+        FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+        functionDetailsBuilder.setTenant(tenant);
+        functionDetailsBuilder.setNamespace(namespace);
+        functionDetailsBuilder.setName(functionName);
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setParallelism(1);
+        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
+        // set source spec
+        // source spec classname should be empty so that the default pulsar 
source will be used
+        SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        sourceSpecBuilder.setTypeClassName(typeArg.getName());
+        sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
+        sourceSpecBuilder.setSubscriptionName(subscriptionName);
+        sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, "");
+        functionDetailsBuilder.setAutoAck(true);
+        functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+        // set up sink spec
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+        // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
+        sinkSpecBuilder.setTopic(sinkTopic);
+        Map<String, Object> sinkConfigMap = Maps.newHashMap();
+        sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
+        sinkSpecBuilder.setTypeClassName(typeArg.getName());
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
+        return functionDetailsBuilder;
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-functions/proto/src/main/proto/Request.proto 
b/pulsar-functions/proto/src/main/proto/Request.proto
index c8e31d7..5743469 100644
--- a/pulsar-functions/proto/src/main/proto/Request.proto
+++ b/pulsar-functions/proto/src/main/proto/Request.proto
@@ -37,8 +37,3 @@ message ServiceRequest {
     FunctionMetaData functionMetaData = 3;
     string workerId = 4;
 }
-
-message AssignmentsUpdate {
-    repeated Assignment assignments = 1;
-    uint64 version = 2;
-}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 366eaba..3ad6c7c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -18,14 +18,16 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.function.Function;
+
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 
-import java.io.IOException;
-import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class FunctionAssignmentTailer
@@ -34,15 +36,16 @@ public class FunctionAssignmentTailer
         private final FunctionRuntimeManager functionRuntimeManager;
         private final Reader<byte[]> reader;
 
-    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager,
-                Reader<byte[]> reader)
+    public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager)
             throws PulsarClientException {
-            this.functionRuntimeManager = functionRuntimeManager;
-            this.reader = reader;
-        }
+        this.functionRuntimeManager = functionRuntimeManager;
 
-    public void start() {
+        this.reader = 
functionRuntimeManager.getWorkerService().getClient().newReader()
+                
.topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
+                .startMessageId(MessageId.earliest).create();
+    }
 
+    public void start() {
         receiveOne();
     }
 
@@ -65,29 +68,21 @@ public class FunctionAssignmentTailer
 
     @Override
     public void accept(Message<byte[]> msg) {
-
-        // check if latest
-        boolean hasMessageAvailable;
-        try {
-            hasMessageAvailable = this.reader.hasMessageAvailable();
-        } catch (PulsarClientException e) {
-            throw new RuntimeException(e);
-        }
-        if (!hasMessageAvailable) {
-            Request.AssignmentsUpdate assignmentsUpdate;
+        if(msg.getData()==null || (msg.getData().length==0)) {
+            log.info("Received assignment delete: {}", msg.getKey());
+            this.functionRuntimeManager.deleteAssignment(msg.getKey());
+        } else {
+            Assignment assignment;
             try {
-                assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(msg.getData());
+                assignment = Assignment.parseFrom(msg.getData());
             } catch (IOException e) {
                 log.error("[{}] Received bad assignment update at message {}", 
reader.getTopic(), msg.getMessageId(),
                         e);
                 // TODO: find a better way to handle bad request
                 throw new RuntimeException(e);
             }
-            if (log.isDebugEnabled()) {
-                log.debug("Received assignment update: {}", assignmentsUpdate);
-            }
-
-            
this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), 
assignmentsUpdate);
+            log.info("Received assignment update: {}", assignment);
+            this.functionRuntimeManager.processAssignment(assignment);    
         }
         // receive next request
         receiveOne();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 43cd27b..e634130 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,48 +18,45 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.Response.Status;
+import com.google.common.annotations.VisibleForTesting;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * This class managers all aspects of functions assignments and running of 
function assignments for this worker
@@ -78,13 +75,12 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
ConcurrentHashMap<>();
 
     @VisibleForTesting
+    @Getter
     final WorkerConfig workerConfig;
 
     @VisibleForTesting
     LinkedBlockingQueue<FunctionAction> actionQueue;
 
-    private long currentAssignmentVersion = 0;
-
     private final FunctionAssignmentTailer functionAssignmentTailer;
 
     private FunctionActioner functionActioner;
@@ -92,22 +88,19 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
-    private final ConnectorsManager connectorsManager;
     
     private final PulsarAdmin functionAdmin;
+    
+    @Getter
+    private WorkerService workerService;
 
     public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService 
workerService, Namespace dlogNamespace,
             MembershipManager membershipManager, ConnectorsManager 
connectorsManager) throws Exception {
         this.workerConfig = workerConfig;
-        this.connectorsManager = connectorsManager;
+        this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
 
-        Reader<byte[]> reader = workerService.getClient().newReader()
-                .topic(this.workerConfig.getFunctionAssignmentTopic())
-                .startMessageId(MessageId.earliest)
-                .create();
-
-        this.functionAssignmentTailer = new FunctionAssignmentTailer(this, 
reader);
+        this.functionAssignmentTailer = new FunctionAssignmentTailer(this);
 
         AuthenticationConfig authConfig = AuthenticationConfig.builder()
                 
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
@@ -222,13 +215,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return assignments;
     }
 
-    /**
-     * get the current version number of assignments
-     * @return assignments version number
-     */
-    public synchronized long getCurrentAssignmentVersion() {
-        return new Long(this.currentAssignmentVersion);
-    }
 
     /**
      * Removes a collection of assignments
@@ -461,105 +447,108 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     /**
      * Process an assignment update from the assignment topic
      * @param messageId the message id of the update assignment
-     * @param assignmentsUpdate the assignment update
+     * @param newAssignment the assignment
      */
-    public synchronized void processAssignmentUpdate(MessageId messageId, 
AssignmentsUpdate assignmentsUpdate) {
+    public synchronized void processAssignment(Assignment newAssignment) {
 
-        if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {
+        Map<String, Assignment> existingAssignmentMap = new HashMap<>();
+        for (Map<String, Assignment> entry : 
this.workerIdToAssignments.values()) {
+            existingAssignmentMap.putAll(entry);
+        }
 
-            Map<String, Assignment> assignmentMap = new HashMap<>();
-            for (Assignment assignment : 
assignmentsUpdate.getAssignmentsList()) {
-                assignmentMap.put(
-                        
Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
-                        assignment);
+        if 
(existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance())))
 {
+            updateAssignment(newAssignment);
+        } else {
+            addAssignment(newAssignment);
+        }
+    }
+
+    private void updateAssignment(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        Assignment existingAssignment = this.findAssignment(assignment);
+        // potential updates need to happen
+        if (!existingAssignment.equals(assignment)) {
+            FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+            //stop function
+            if (functionRuntimeInfo != null) {
+                this.insertStopAction(functionRuntimeInfo);
             }
-            Map<String, Assignment> existingAssignmentMap = new HashMap<>();
-            for (Map<String, Assignment> entry : 
this.workerIdToAssignments.values()) {
-                existingAssignmentMap.putAll(entry);
+            // still assigned to me, need to restart
+            if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
+                //start again
+                FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
+                
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
+                this.insertStartAction(newFunctionRuntimeInfo);
+                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
             }
 
-            Map<String, Assignment> assignmentsToAdd = diff(assignmentMap, 
existingAssignmentMap);
-
-            Map<String, Assignment> assignmentsToDelete = 
diff(existingAssignmentMap, assignmentMap);
-
-            Map<String, Assignment> existingAssignments = 
inCommon(assignmentMap, existingAssignmentMap);
-
-            // functions to add
-            for (Map.Entry<String, Assignment> assignmentEntry : 
assignmentsToAdd.entrySet()) {
-                String fullyQualifiedInstanceId = assignmentEntry.getKey();
-                Assignment assignment = assignmentEntry.getValue();
-
-                //add new function
-                this.setAssignment(assignment);
-
-                //Assigned to me
-                if 
(assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
-                    if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
-                        this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
new FunctionRuntimeInfo()
-                                
.setFunctionInstance(assignment.getInstance()));
-
-                    } else {
-                        //Somehow this function is already started
-                        log.warn("Function {} already running. Going to 
restart function.",
-                                
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-                        
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
-                    }
-                    FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-                    this.insertStartAction(functionRuntimeInfo);
-                }
+            // find existing assignment
+            Assignment existing_assignment = this.findAssignment(assignment);
+            if (existing_assignment != null) {
+                // delete old assignment that could have old data
+                this.deleteAssignment(existing_assignment);
             }
+            // set to newest assignment
+            this.setAssignment(assignment);
+        }
+    }
+   
+    public synchronized void deleteAssignment(String fullyQualifiedInstanceId) 
{
+        FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+        if (functionRuntimeInfo != null) {
+            this.insertStopAction(functionRuntimeInfo);
+            this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
+        }
+        
+        String workerId = null;
+        for(Entry<String, Map<String, Assignment>> workerAssignments : 
workerIdToAssignments.entrySet()) {
+            
if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) {
+                workerId = workerAssignments.getKey();
+                break;
+            }
+        }
+        Map<String, Assignment> worker;
+        if (workerId != null && ((worker = 
workerIdToAssignments.get(workerId)) != null && worker.isEmpty())) {
+            this.workerIdToAssignments.remove(workerId);
+        }
+    }
 
-            // functions to delete
-            for (Map.Entry<String, Assignment> assignmentEntry : 
assignmentsToDelete.entrySet()) {
-                String fullyQualifiedInstanceId = assignmentEntry.getKey();
-                Assignment assignment = assignmentEntry.getValue();
-
-                FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-                if (functionRuntimeInfo != null) {
-                    this.insertStopAction(functionRuntimeInfo);
-                    this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
-                }
-                this.deleteAssignment(assignment);
+    @VisibleForTesting
+    void deleteAssignment(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        Map<String, Assignment> assignmentMap = 
this.workerIdToAssignments.get(assignment.getWorkerId());
+        if (assignmentMap != null) {
+            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
+                assignmentMap.remove(fullyQualifiedInstanceId);
             }
+            if (assignmentMap.isEmpty()) {
+                this.workerIdToAssignments.remove(assignment.getWorkerId());
+            }
+        }
+    }
 
-            // functions to update
-            for (Map.Entry<String, Assignment> assignmentEntry : 
existingAssignments.entrySet()) {
-                String fullyQualifiedInstanceId = assignmentEntry.getKey();
-                Assignment assignment = assignmentEntry.getValue();
-                Assignment existingAssignment = 
this.findAssignment(assignment);
-                // potential updates need to happen
-                if (!existingAssignment.equals(assignment)) {
-                    FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
-                    //stop function
-                    if (functionRuntimeInfo != null) {
-                        this.insertStopAction(functionRuntimeInfo);
-                    }
-                    // still assigned to me, need to restart
-                    if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
-                        //start again
-                        FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
-                        
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
-                        this.insertStartAction(newFunctionRuntimeInfo);
-                        this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
-                    }
+    private void addAssignment(Assignment assignment) {
+        String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
 
-                    // find existing assignment
-                    Assignment existing_assignment = 
this.findAssignment(assignment);
-                    if (existing_assignment != null) {
-                        // delete old assignment that could have old data
-                        this.deleteAssignment(existing_assignment);
-                    }
-                    // set to newest assignment
-                    this.setAssignment(assignment);
-                }
-            }
+        //add new function
+        this.setAssignment(assignment);
 
-            // set as current assignment
-            this.currentAssignmentVersion = assignmentsUpdate.getVersion();
+        //Assigned to me
+        if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
+            if 
(!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
+                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new 
FunctionRuntimeInfo()
+                        .setFunctionInstance(assignment.getInstance()));
 
-        } else {
-            log.debug("Received out of date assignment update: {}", 
assignmentsUpdate);
+            } else {
+                //Somehow this function is already started
+                log.warn("Function {} already running. Going to restart 
function.",
+                        
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
+                
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
+            }
+            FunctionRuntimeInfo functionRuntimeInfo = 
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
+            this.insertStartAction(functionRuntimeInfo);
         }
+        
     }
 
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
@@ -642,20 +631,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 assignment);
     }
 
-    @VisibleForTesting
-    void deleteAssignment(Assignment assignment) {
-        Map<String, Assignment> assignmentMap = 
this.workerIdToAssignments.get(assignment.getWorkerId());
-        if (assignmentMap != null) {
-            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
-            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
-                assignmentMap.remove(fullyQualifiedInstanceId);
-            }
-            if (assignmentMap.isEmpty()) {
-                this.workerIdToAssignments.remove(assignment.getWorkerId());
-            }
-        }
-    }
-
     private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
         this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
     }
@@ -674,28 +649,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
     }
 
-    private Map<String, Assignment> diff(Map<String, Assignment> 
assignmentMap1, Map<String, Assignment> assignmentMap2) {
-        Map<String, Assignment> result = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
-            if (!assignmentMap2.containsKey(entry.getKey())) {
-                result.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return result;
-    }
-
-    private Map<String, Assignment> inCommon(Map<String, Assignment> 
assignmentMap1, Map<String, Assignment> assignmentMap2) {
-
-        Map<String, Assignment> result = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : assignmentMap1.entrySet()) {
-            if (assignmentMap2.containsKey(entry.getKey())) {
-                result.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return result;
-    }
-
     private FunctionRuntimeInfo getFunctionRuntimeInfo(String 
fullyQualifiedInstanceId) {
         return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
     }
+
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index ed00958..db7785a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -23,30 +23,29 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
@@ -65,16 +64,22 @@ public class SchedulerManager implements AutoCloseable {
 
     private final Producer<byte[]> producer;
 
-    private final ExecutorService executorService;
+    private final ScheduledExecutorService executorService;
+    
+    private final PulsarAdmin admin;
+    
+    AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
+    private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; 
 
-    public SchedulerManager(WorkerConfig workerConfig, PulsarClient 
pulsarClient) {
+    public SchedulerManager(WorkerConfig workerConfig, PulsarClient 
pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
         this.workerConfig = workerConfig;
+        this.admin = admin;
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
 
         try {
             this.producer = 
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
-                    
.enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
+                    
.enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
                     sendTimeout(0, TimeUnit.MILLISECONDS).create();
         } catch (PulsarClientException e) {
             log.error("Failed to create producer to function assignment topic "
@@ -82,9 +87,9 @@ public class SchedulerManager implements AutoCloseable {
             throw new RuntimeException(e);
         }
 
-        this.executorService =
-                new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                        new LinkedBlockingQueue<>());
+        this.executorService = executor;
+        
+        scheduleCompaction(executor, 
workerConfig.getTopicCompactionFrequencySec());
     }
 
     public Future<?> schedule() {
@@ -92,13 +97,31 @@ public class SchedulerManager implements AutoCloseable {
             synchronized (SchedulerManager.this) {
                 boolean isLeader = membershipManager.isLeader();
                 if (isLeader) {
-                    invokeScheduler();
+                    try {
+                        invokeScheduler();
+                    } catch (Exception e) {
+                        log.warn("Failed to invoke scheduler", e);
+                        schedule();
+                    }
                 }
             }
         });
     }
 
-    private void invokeScheduler() {
+    private void scheduleCompaction(ScheduledExecutorService executor, long 
scheduleFrequencySec) {
+        if (executor != null) {
+            executor.scheduleWithFixedDelay(() -> {
+                if (membershipManager.isLeader() && isCompactionNeeded.get()) {
+                    compactAssignmentTopic();
+                    isCompactionNeeded.set(false);
+                }
+            }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
+        }
+    }
+    
+    @VisibleForTesting
+    public void invokeScheduler() {
+        
         List<String> currentMembership = 
this.membershipManager.getCurrentMembership()
                 .stream().map(workerInfo -> 
workerInfo.getWorkerId()).collect(Collectors.toList());
 
@@ -111,12 +134,16 @@ public class SchedulerManager implements AutoCloseable {
         while (it.hasNext()) {
             Map.Entry<String, Map<String, Assignment>> 
workerIdToAssignmentEntry = it.next();
             Map<String, Assignment> functionMap = 
workerIdToAssignmentEntry.getValue();
+
             // remove instances that don't exist anymore
-            functionMap.entrySet().removeIf(
-                    entry -> {
-                        String fullyQualifiedInstanceId = entry.getKey();
-                        return 
!allInstances.containsKey(fullyQualifiedInstanceId);
-                    });
+            functionMap.entrySet().removeIf(entry -> {
+                String fullyQualifiedInstanceId = entry.getKey();
+                boolean deleted = 
!allInstances.containsKey(fullyQualifiedInstanceId);
+                if (deleted) {
+                    publishNewAssignment(entry.getValue().toBuilder().build(), 
true);
+                }
+                return deleted;
+            });
 
             // update assignment instances in case attributes of a function 
gets updated
             for (Map.Entry<String, Assignment> entry : functionMap.entrySet()) 
{
@@ -143,36 +170,40 @@ public class SchedulerManager implements AutoCloseable {
         List<Assignment> assignments = this.scheduler.schedule(
                 needsAssignment, currentAssignments, currentMembership);
 
-        log.debug("New assignments computed: {}", assignments);
+        if (log.isDebugEnabled()) {
+            log.debug("New assignments computed: {}", assignments);
+        }
 
-        long assignmentVersion = 
this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .setVersion(assignmentVersion)
-                .addAllAssignments(assignments)
-                .build();
+        isCompactionNeeded.set(!assignments.isEmpty());
 
-        CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(assignmentsUpdate.toByteArray());
-        try {
-            messageIdCompletableFuture.get();
-        } catch (InterruptedException | ExecutionException e) {
-            log.error("Failed to send assignment update", e);
-            throw new RuntimeException(e);
+        for(Assignment assignment : assignments) {
+            publishNewAssignment(assignment, false);
         }
+        
+    }
 
-        // wait for assignment update to go throw the pipeline
-        int retries = 0;
-        while (this.functionRuntimeManager.getCurrentAssignmentVersion() < 
assignmentVersion) {
-            if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) {
-                log.warn("Max number of retries reached for waiting for 
assignment to propagate. Will continue now.");
-                break;
-            }
-            log.info("Waiting for assignments to propagate...");
+    public void compactAssignmentTopic() {
+        if (this.admin != null) {
             try {
-                Thread.sleep(500);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                
this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
+            } catch (PulsarAdminException e) {
+                log.error("Failed to trigger compaction {}", e);
+                executorService.schedule(() -> compactAssignmentTopic(), 
DEFAULT_ADMIN_API_BACKOFF_SEC,
+                        TimeUnit.SECONDS);
             }
-            retries++;
+        }
+    }
+
+    private void publishNewAssignment(Assignment assignment, boolean deleted) {
+        try {
+            String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            // publish empty message with instance-id key so, compactor can 
delete and skip delivery of this instance-id
+            // message 
+            producer.newMessage().key(fullyQualifiedInstanceId)
+                    .value(deleted ? "".getBytes() : 
assignment.toByteArray()).sendAsync().get();
+        } catch (Exception e) {
+            log.error("Failed to {} assignment update {}", assignment, deleted 
? "send" : "deleted", e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -224,6 +255,5 @@ public class SchedulerManager implements AutoCloseable {
         } catch (PulsarClientException e) {
             log.warn("Failed to shutdown scheduler manager assignment 
producer", e);
         }
-        this.executorService.shutdown();
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0f695a9..a2524c6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -74,6 +74,9 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private long instanceLivenessCheckFreqMs;
     private String clientAuthenticationPlugin;
     private String clientAuthenticationParameters;
+    // Frequency how often worker performs compaction on function-topics
+    private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
+    private int metricsSamplingPeriodSec = 60;
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
@@ -88,8 +91,6 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     private boolean tlsRequireTrustedClientCertOnConnect = false;
     private boolean useTls = false;
     private boolean tlsHostnameVerificationEnable = false;
-    
-    private int metricsSamplingPeriodSec = 60;
     // Enforce authentication
     private boolean authenticationEnabled = false;
     // Autentication provider name list, which is a list of class names
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 7fc0cc9..488bcd7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
@@ -68,11 +69,15 @@ public class WorkerService {
     private PulsarAdmin brokerAdmin;
     private PulsarAdmin functionAdmin;
     private final MetricsGenerator metricsGenerator;
+    private final ScheduledExecutorService executor;
+    @VisibleForTesting
+    private URI dlogUri;
 
     public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.statsUpdater = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-stats-updater"));
+        this.executor = Executors.newScheduledThreadPool(10, new 
DefaultThreadFactory("pulsar-worker"));
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater);
     }
 
@@ -98,12 +103,13 @@ public class WorkerService {
         }
 
         // create the dlog namespace for storing function packages
+        this.dlogUri = dlogUri;
         DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig);
         try {
             this.dlogNamespace = NamespaceBuilder.newBuilder()
                     .conf(dlogConf)
                     .clientId("function-worker-" + workerConfig.getWorkerId())
-                    .uri(dlogUri)
+                    .uri(this.dlogUri)
                     .build();
         } catch (Exception e) {
             log.error("Failed to initialize dlog namespace {} for storing 
function packages",
@@ -128,7 +134,8 @@ public class WorkerService {
             log.info("Created Pulsar client");
 
             //create scheduler manager
-            this.schedulerManager = new SchedulerManager(this.workerConfig, 
this.client);
+            this.schedulerManager = new SchedulerManager(this.workerConfig, 
this.client, this.brokerAdmin,
+                    this.executor);
 
             //create function meta data manager
             this.functionMetaDataManager = new FunctionMetaDataManager(
@@ -232,6 +239,10 @@ public class WorkerService {
         if (null != this.functionAdmin) {
             this.functionAdmin.close();
         }
+        
+        if(this.executor != null) {
+            this.executor.shutdown();
+        }
     }
 
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 58c1a9a..817962d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Instance;
 
+import com.google.common.collect.Lists;
+
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -47,19 +49,17 @@ public class RoundRobinScheduler implements IScheduler {
             
workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment);
         }
 
+        List<Assignment> newAssignments = Lists.newArrayList();
         for (Instance unassignedFunctionInstance : 
unassignedFunctionInstances) {
             String heartBeatWorkerId = 
checkHeartBeatFunction(unassignedFunctionInstance);
             String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : 
findNextWorker(workerIdToAssignment);
             Assignment newAssignment = 
Assignment.newBuilder().setInstance(unassignedFunctionInstance)
                     .setWorkerId(workerId).build();
             workerIdToAssignment.get(workerId).add(newAssignment);
+            newAssignments.add(newAssignment);
         }
 
-        List<Assignment> assignments
-                = workerIdToAssignment.entrySet().stream()
-                .flatMap(entry -> 
entry.getValue().stream()).collect(Collectors.toList());
-
-        return assignments;
+        return newAssignments;
     }
 
     private static String checkHeartBeatFunction(Instance funInstance) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 85a2122..5e1ed02 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
@@ -86,6 +87,8 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -123,12 +126,8 @@ public class FunctionRuntimeManagerTest {
         assignments.add(assignment1);
         assignments.add(assignment2);
 
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .addAllAssignments(assignments)
-                .setVersion(1)
-                .build();
-
-        functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, 
assignmentsUpdate);
+        functionRuntimeManager.processAssignment(assignment1);
+        functionRuntimeManager.processAssignment(assignment2);
 
         verify(functionRuntimeManager, 
times(2)).setAssignment(any(Function.Assignment.class));
         verify(functionRuntimeManager, 
times(0)).deleteAssignment(any(Function.Assignment.class));
@@ -183,6 +182,7 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -205,6 +205,7 @@ public class FunctionRuntimeManagerTest {
                 Function.FunctionDetails.newBuilder()
                         
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build();
 
+        // Delete this assignment
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
@@ -221,23 +222,18 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.setAssignment(assignment2);
         reset(functionRuntimeManager);
 
-        List<Function.Assignment> assignments = new LinkedList<>();
-        assignments.add(assignment2);
-
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .addAllAssignments(assignments)
-                .setVersion(1)
-                .build();
-
         functionRuntimeManager.functionRuntimeInfoMap.put(
                 "test-tenant/test-namespace/func-1:0", new 
FunctionRuntimeInfo().setFunctionInstance(
                         
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
                                 .build()));
 
-        functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, 
assignmentsUpdate);
+        functionRuntimeManager.processAssignment(assignment1);
+        functionRuntimeManager.processAssignment(assignment2);
 
+        
functionRuntimeManager.deleteAssignment(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()));
+        
         verify(functionRuntimeManager, 
times(0)).setAssignment(any(Function.Assignment.class));
-        verify(functionRuntimeManager, 
times(1)).deleteAssignment(any(Function.Assignment.class));
+        verify(functionRuntimeManager, 
times(1)).deleteAssignment(any(String.class));
 
         
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
@@ -284,6 +280,7 @@ public class FunctionRuntimeManagerTest {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -327,14 +324,6 @@ public class FunctionRuntimeManagerTest {
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(0).build())
                 .build();
-        List<Function.Assignment> assignments = new LinkedList<>();
-        assignments.add(assignment1);
-        assignments.add(assignment3);
-
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.newBuilder()
-                .addAllAssignments(assignments)
-                .setVersion(1)
-                .build();
 
         functionRuntimeManager.functionRuntimeInfoMap.put(
                 "test-tenant/test-namespace/func-1:0", new 
FunctionRuntimeInfo().setFunctionInstance(
@@ -345,7 +334,8 @@ public class FunctionRuntimeManagerTest {
                         
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
                                 .build()));
 
-        functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, 
assignmentsUpdate);
+        functionRuntimeManager.processAssignment(assignment1);
+        functionRuntimeManager.processAssignment(assignment3);
 
         verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
         verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 2753bf1..c849fd3 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -131,6 +131,7 @@ public class MembershipManagerTest {
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
@@ -202,6 +203,7 @@ public class MembershipManagerTest {
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
         doReturn(pulsarClient).when(workerService).getClient();
@@ -296,6 +298,7 @@ public class MembershipManagerTest {
         ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
         WorkerService workerService = mock(WorkerService.class);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 19977bd..00e4b6e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -28,15 +28,19 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
 
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -46,6 +50,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Request;
@@ -53,9 +58,14 @@ import 
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -67,6 +77,8 @@ public class SchedulerManagerTest {
     private MembershipManager membershipManager;
     private CompletableFuture<MessageId> completableFuture;
     private Producer producer;
+    private TypedMessageBuilder<byte[]> message;
+    private ScheduledExecutorService executor;
 
     private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
@@ -94,8 +106,12 @@ public class SchedulerManagerTest {
         producer = mock(Producer.class);
         completableFuture = spy(new CompletableFuture<>());
         completableFuture.complete(MessageId.earliest);
-        byte[] bytes = any();
-        when(producer.sendAsync(bytes)).thenReturn(completableFuture);
+        //byte[] bytes = any();
+        message = mock(TypedMessageBuilder.class);
+        when(producer.newMessage()).thenReturn(message);
+        when(message.key(anyString())).thenReturn(message);
+        when(message.value(any())).thenReturn(message);
+        when(message.sendAsync()).thenReturn(completableFuture);
 
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
@@ -109,7 +125,9 @@ public class SchedulerManagerTest {
         PulsarClient pulsarClient = mock(PulsarClient.class);
         when(pulsarClient.newProducer()).thenReturn(builder);
 
-        schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient));
+        this.executor = Executors
+                .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("worker-test"));
+        schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient, null, executor));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
         functionMetaDataManager = mock(FunctionMetaDataManager.class);
         membershipManager = mock(MembershipManager.class);
@@ -118,6 +136,11 @@ public class SchedulerManagerTest {
         schedulerManager.setMembershipManager(membershipManager);
     }
 
+    @AfterMethod
+    public void stop() {
+        this.executor.shutdown();
+    }
+    
     @Test
     public void testSchedule() throws Exception {
 
@@ -143,9 +166,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -159,7 +179,9 @@ public class SchedulerManagerTest {
         // i am leader
         doReturn(true).when(membershipManager).isLeader();
         callSchedule();
-        verify(producer, times(1)).sendAsync(any(byte[].class));
+        List<Invocation> invocations = 
getMethodInvocationDetails(schedulerManager,
+                SchedulerManager.class.getMethod("invokeScheduler"));
+        Assert.assertEquals(invocations.size(), 1);
     }
 
     @Test
@@ -187,9 +209,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -200,17 +219,8 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
-        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
-        Assert.assertEquals(
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        .addAssignments(assignment1).build(),
-                assignmentsUpdate);
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 0);
     }
 
     @Test
@@ -243,9 +253,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -256,23 +263,20 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 1);
-
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignments: {}", assignments);
         Function.Assignment assignment2 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(0).build())
                 .build();
-        Assert.assertEquals(
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2).build(),
-                assignmentsUpdate);
+        Assert.assertEquals(assignment2, assignments);
 
     }
 
@@ -288,7 +292,7 @@ public class SchedulerManagerTest {
         // simulate function2 got removed
         Function.FunctionMetaData function2 = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
-                        
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
+                        
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1))
                 .build();
         functionMetaDataList.add(function1);
         
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
@@ -300,6 +304,7 @@ public class SchedulerManagerTest {
                         
.setFunctionMetaData(function1).setInstanceId(0).build())
                 .build();
 
+        // Delete this assignment
         Function.Assignment assignment2 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
@@ -309,14 +314,12 @@ public class SchedulerManagerTest {
         Map<String, Map<String, Function.Assignment>> currentAssignments = new 
HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()),
 assignment1);
+        //TODO: delete this assignment
         
assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()),
 assignment2);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -327,19 +330,16 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 1);
-
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignments: {}", assignments);
 
-        Assert.assertEquals(
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        .addAssignments(assignment1).build(),
-                assignmentsUpdate);
+        Assert.assertEquals(0, send.length);
     }
 
     @Test
@@ -373,9 +373,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -386,25 +383,21 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
-                Object.class));
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 1);
-
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignments: {}", assignments);
 
         Function.Assignment assignment2 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(0).build())
                 .build();
-        Assert.assertEquals(
-                assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2).build()
-                );
+        Assert.assertEquals(assignments, assignment2);
 
         // scale up
 
@@ -435,17 +428,23 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 4);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 2);
+        
+        Set<Assignment> allAssignments = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        send = (byte[]) invocations.get(1).getRawArguments()[0];
-        assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send);
-        Assert.assertEquals(assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 
+ 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2Scaled1)
-                        
.addAssignments(assignment2Scaled2).addAssignments(assignment2Scaled3).build()
-                );
+        assertTrue(allAssignments.contains(assignment2Scaled1));
+        assertTrue(allAssignments.contains(assignment2Scaled2));
+        assertTrue(allAssignments.contains(assignment2Scaled3));
     }
 
     @Test
@@ -479,9 +478,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -492,14 +488,24 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 3);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
+
+        log.info("assignments: {}", assignments);
+        
+        Set<Assignment> allAssignments = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
 
         Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -516,13 +522,11 @@ public class SchedulerManagerTest {
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
-        Assert.assertEquals(
-                assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2_1)
-                        
.addAssignments(assignment2_2).addAssignments(assignment2_3).build()
-        );
-
+        
+        assertTrue(allAssignments.contains(assignment2_1));
+        assertTrue(allAssignments.contains(assignment2_2));
+        assertTrue(allAssignments.contains(assignment2_3));
+        
         // scale down
 
         Function.FunctionMetaData function2Scaled = 
Function.FunctionMetaData.newBuilder()
@@ -542,17 +546,23 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 4);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 2);
+        send = (byte[]) invocations.get(0).getRawArguments()[0];
+        assignments = Assignment.parseFrom(send);
+        
+        Set<Assignment> allAssignments2 = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
 
-        send = (byte[]) invocations.get(1).getRawArguments()[0];
-        assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send);
-        Assert.assertEquals(assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 
+ 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2Scaled)
-                        .build()
-        );
+        assertTrue(allAssignments2.contains(assignment2Scaled));
     }
 
     @Test
@@ -582,9 +592,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        // set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000));
         workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000));
@@ -595,20 +602,20 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer,
-                Producer.class.getMethod("sendAsync", Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
-        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
-
-        List<Assignment> assignmentList = 
assignmentsUpdate.getAssignmentsList();
-        Assert.assertEquals(assignmentList.size(), 2);
-        for (Assignment assignment : assignmentList) {
-            String functionName = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName();
-            String assignedWorkerId = assignment.getWorkerId();
-            Assert.assertEquals(functionName, assignedWorkerId);
-        }
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 2);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
+        invocations.forEach(invocation -> {
+            try {
+                Assignment assignment = 
Assignment.parseFrom((byte[])invocation.getRawArguments()[0]);
+                String functionName = 
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName();
+                String assignedWorkerId = assignment.getWorkerId();
+                Assert.assertEquals(functionName, assignedWorkerId);
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
     }
     
     @Test
@@ -644,9 +651,6 @@ public class SchedulerManagerTest {
         currentAssignments.put("worker-1", assignmentEntry1);
         
doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
-        //set version
-        
doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion();
-
         // single node
         List<WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 
5000));
@@ -657,14 +661,14 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        List<Invocation> invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 3);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 1);
-
         byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Request.AssignmentsUpdate assignmentsUpdate = 
Request.AssignmentsUpdate.parseFrom(send);
+        Assignment assignments = Assignment.parseFrom(send);
 
-        log.info("assignmentsUpdate: {}", assignmentsUpdate);
+        log.info("assignmentsUpdate: {}", assignments);
 
         Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -681,13 +685,27 @@ public class SchedulerManagerTest {
                 .setInstance(Function.Instance.newBuilder()
                         
.setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
-        Assert.assertEquals(
-                assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2_1)
-                        
.addAssignments(assignment2_2).addAssignments(assignment2_3).build()
-        );
-
+        
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 3);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
+                Object.class));
+        send = (byte[]) invocations.get(0).getRawArguments()[0];
+        assignments = Assignment.parseFrom(send);
+        
+        Set<Assignment> allAssignments = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        
+        assertTrue(allAssignments.contains(assignment2_1));
+        assertTrue(allAssignments.contains(assignment2_2));
+        assertTrue(allAssignments.contains(assignment2_3));
+        
         // scale down
 
         Function.FunctionMetaData function2Updated = 
Function.FunctionMetaData.newBuilder()
@@ -718,28 +736,32 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        invocations = getMethodInvocationDetails(producer, 
Producer.class.getMethod("sendAsync",
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 6);
+        invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        Assert.assertEquals(invocations.size(), 2);
-
-        send = (byte[]) invocations.get(1).getRawArguments()[0];
-        assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send);
-        Assert.assertEquals(assignmentsUpdate,
-                Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 
+ 1)
-                        
.addAssignments(assignment1).addAssignments(assignment2Updated1)
-                        .addAssignments(assignment2Updated2)
-                        .addAssignments(assignment2Updated3)
-                        .build()
-        );
+        send = (byte[]) invocations.get(0).getRawArguments()[0];
+        assignments = Assignment.parseFrom(send);
+        
+        Set<Assignment> allAssignments2 = Sets.newHashSet();
+        invocations.forEach(invocation -> {
+            try {
+                
allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0]));
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        
+        assertTrue(allAssignments2.contains(assignment2Updated1));
+        assertTrue(allAssignments2.contains(assignment2Updated2));
+        assertTrue(allAssignments2.contains(assignment2Updated3));
     }
 
     private void callSchedule() throws NoSuchMethodException, 
InterruptedException,
             TimeoutException, ExecutionException {
-        long intialVersion = 
functionRuntimeManager.getCurrentAssignmentVersion();
         Future<?> complete = schedulerManager.schedule();
 
         complete.get(30, TimeUnit.SECONDS);
-        doReturn(intialVersion + 
1).when(functionRuntimeManager).getCurrentAssignmentVersion();
     }
 
     private List<Invocation> getMethodInvocationDetails(Object o, Method 
method) throws NoSuchMethodException {

Reply via email to