This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d631c0 fix bug in FunctionRuntimeManager involving not cleaning up
old invalid assignments (#2223)
4d631c0 is described below
commit 4d631c0608fdeb605dd39822e2abc2cc57c7b37b
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Jul 25 11:37:48 2018 -0700
fix bug in FunctionRuntimeManager involving not cleaning up old invalid
assignments (#2223)
### Motivation
Old invalid assignments of functions are not cleaned up and left over after
a function is rescheduled.
Getting the status of functions rely on assignments to route the get status
to the correct worker running the function. If the assignments are not
correct, get status requests will be routed incorrectly.
Also fixed a test that was asserting the incorrect value. The test would
have caught this problem if it was asserting the correct value.
---
.../pulsar/functions/worker/FunctionRuntimeManager.java | 12 ++++++++++--
.../pulsar/functions/worker/FunctionRuntimeManagerTest.java | 4 +---
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 05a79d8..5e1995e 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -82,7 +82,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
private MembershipManager membershipManager;
private final ConnectorsManager connectorsManager;
-
public FunctionRuntimeManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
Namespace dlogNamespace,
@@ -354,6 +353,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
* @param assignmentsUpdate the assignment update
*/
public synchronized void processAssignmentUpdate(MessageId messageId,
AssignmentsUpdate assignmentsUpdate) {
+
if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {
Map<String, Assignment> assignmentMap = new HashMap<>();
@@ -430,8 +430,16 @@ public class FunctionRuntimeManager implements
AutoCloseable{
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
this.insertStartAction(newFunctionRuntimeInfo);
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId,
newFunctionRuntimeInfo);
- this.setAssignment(assignment);
}
+
+ // find existing assignment
+ Assignment existing_assignment =
this.findAssignment(assignment);
+ if (existing_assignment != null) {
+ // delete old assignment that could have old data
+ this.deleteAssignment(existing_assignment);
+ }
+ // set to newest assignment
+ this.setAssignment(assignment);
}
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 8ab7473..4f618c4 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -378,12 +378,10 @@ public class FunctionRuntimeManagerTest {
.build()))));
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
-
-
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 2);
+
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-1:0"),
assignment1);
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
.get("worker-1").get("test-tenant/test-namespace/func-2:0"),
assignment3);
}
-
}