This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d631c0  fix bug in FunctionRuntimeManager involving not cleaning up 
old invalid assignments (#2223)
4d631c0 is described below

commit 4d631c0608fdeb605dd39822e2abc2cc57c7b37b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Jul 25 11:37:48 2018 -0700

    fix bug in FunctionRuntimeManager involving not cleaning up old invalid 
assignments (#2223)
    
    ### Motivation
    
    Old invalid assignments of functions are not cleaned up and left over after 
a function is rescheduled.
    
    Getting the status of functions rely on assignments to route the get status 
to the correct worker running the function.  If the assignments are not 
correct, get status requests will be routed incorrectly.
    
    Also fixed  a test that was asserting the incorrect value.  The test would 
have caught this problem if it was asserting the correct value.
---
 .../pulsar/functions/worker/FunctionRuntimeManager.java      | 12 ++++++++++--
 .../pulsar/functions/worker/FunctionRuntimeManagerTest.java  |  4 +---
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 05a79d8..5e1995e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -82,7 +82,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
     private MembershipManager membershipManager;
     private final ConnectorsManager connectorsManager;
 
-
     public FunctionRuntimeManager(WorkerConfig workerConfig,
                                   PulsarClient pulsarClient,
                                   Namespace dlogNamespace,
@@ -354,6 +353,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
      * @param assignmentsUpdate the assignment update
      */
     public synchronized void processAssignmentUpdate(MessageId messageId, 
AssignmentsUpdate assignmentsUpdate) {
+
         if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {
 
             Map<String, Assignment> assignmentMap = new HashMap<>();
@@ -430,8 +430,16 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                         this.insertStartAction(newFunctionRuntimeInfo);
                         this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
-                        this.setAssignment(assignment);
                     }
+
+                    // find existing assignment
+                    Assignment existing_assignment = 
this.findAssignment(assignment);
+                    if (existing_assignment != null) {
+                        // delete old assignment that could have old data
+                        this.deleteAssignment(existing_assignment);
+                    }
+                    // set to newest assignment
+                    this.setAssignment(assignment);
                 }
             }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 8ab7473..4f618c4 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -378,12 +378,10 @@ public class FunctionRuntimeManagerTest {
                                         .build()))));
 
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
-
-        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 2);
+        
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
     }
-
 }

Reply via email to