This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 247a608 Fixed concurrent modification exception on function worker
stop (#4006)
247a608 is described below
commit 247a60866dcdd1fa5e2865bebb4409bdb1ffa3e5
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 8 14:39:05 2019 -0700
Fixed concurrent modification exception on function worker stop (#4006)
---
.../pulsar/functions/worker/FunctionRuntimeManager.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 4618e88..e4acc70 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -58,6 +58,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -91,9 +92,9 @@ public class FunctionRuntimeManager implements AutoCloseable{
private RuntimeFactory runtimeFactory;
private MembershipManager membershipManager;
-
+
private final PulsarAdmin functionAdmin;
-
+
@Getter
private WorkerService workerService;
@@ -424,7 +425,10 @@ public class FunctionRuntimeManager implements
AutoCloseable{
final String workerId = this.workerConfig.getWorkerId();
Map<String, Assignment> assignments =
workerIdToAssignments.get(workerId);
if (assignments != null) {
- assignments.values().forEach(assignment -> {
+ // Take a copy of the map since the stopFunction will modify the
same map
+ // and invalidate the iterator
+ Map<String, Assignment> copiedAssignments = new
TreeMap<>(assignments);
+ copiedAssignments.values().forEach(assignment -> {
String fullyQualifiedInstanceId =
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
try {
stopFunction(fullyQualifiedInstanceId, false);
@@ -681,7 +685,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
this.setAssignment(assignment);
}
}
-
+
public synchronized void deleteAssignment(String fullyQualifiedInstanceId)
{
FunctionRuntimeInfo functionRuntimeInfo =
_getFunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo != null) {
@@ -713,7 +717,7 @@ public class FunctionRuntimeManager implements
AutoCloseable{
}
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
-
+
String workerId = null;
for(Entry<String, Map<String, Assignment>> workerAssignments :
workerIdToAssignments.entrySet()) {
if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) {