This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d97b57c  Fixed deadlock between create function and leader 
initialization (#7508)
d97b57c is described below

commit d97b57c776e21103e7e32cb7167eb50ae0c2c677
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Jul 11 08:37:04 2020 -0700

    Fixed deadlock between create function and leader initialization (#7508)
---
 .../functions/worker/FunctionMetaDataManager.java  | 68 ++++++++++++----------
 1 file changed, 37 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index abe2b38..4ec1ef5 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -199,45 +199,51 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
      * @throws IllegalStateException if we are not the leader
      * @throws IllegalArgumentException if the request is out of date.
      */
-    public synchronized void updateFunctionOnLeader(FunctionMetaData 
functionMetaData, boolean delete)
+    public void updateFunctionOnLeader(FunctionMetaData functionMetaData, 
boolean delete)
             throws IllegalStateException, IllegalArgumentException {
-        if (exclusiveLeaderProducer == null) {
-            throw new IllegalStateException("Not the leader");
-        }
         boolean needsScheduling;
-        if (delete) {
-            needsScheduling = proccessDeregister(functionMetaData);
-        } else {
-            needsScheduling = processUpdate(functionMetaData);
-        }
-        byte[] toWrite;
-        if (workerConfig.getUseCompactedMetadataTopic()) {
+        synchronized (this) {
+            if (exclusiveLeaderProducer == null) {
+                throw new IllegalStateException("Not the leader");
+            }
+
             if (delete) {
-                toWrite = "".getBytes();
+                needsScheduling = proccessDeregister(functionMetaData);
             } else {
-                toWrite = functionMetaData.toByteArray();
+                needsScheduling = processUpdate(functionMetaData);
             }
-        } else {
-            Request.ServiceRequest serviceRequest = 
Request.ServiceRequest.newBuilder()
-                    .setServiceRequestType(delete ? 
Request.ServiceRequest.ServiceRequestType.DELETE : 
Request.ServiceRequest.ServiceRequestType.UPDATE)
-                    .setFunctionMetaData(functionMetaData)
-                    .setWorkerId(workerConfig.getWorkerId())
-                    .setRequestId(UUID.randomUUID().toString())
-                    .build();
-            toWrite = serviceRequest.toByteArray();
-        }
-        try {
-            TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
-                    .value(toWrite)
-                    .property(versionTag, 
Long.toString(functionMetaData.getVersion()));
+            byte[] toWrite;
             if (workerConfig.getUseCompactedMetadataTopic()) {
-                builder = 
builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
+                if (delete) {
+                    toWrite = "".getBytes();
+                } else {
+                    toWrite = functionMetaData.toByteArray();
+                }
+            } else {
+                Request.ServiceRequest serviceRequest = 
Request.ServiceRequest.newBuilder()
+                        .setServiceRequestType(delete ? 
Request.ServiceRequest.ServiceRequestType.DELETE
+                                : 
Request.ServiceRequest.ServiceRequestType.UPDATE)
+                        .setFunctionMetaData(functionMetaData)
+                        .setWorkerId(workerConfig.getWorkerId())
+                        .setRequestId(UUID.randomUUID().toString())
+                        .build();
+                toWrite = serviceRequest.toByteArray();
             }
-            lastMessageSeen = builder.send();
-        } catch (Exception e) {
-            log.error("Could not write into Function Metadata topic", e);
-            throw new IllegalStateException("Internal Error updating function 
at the leader", e);
+            try {
+                TypedMessageBuilder builder = 
exclusiveLeaderProducer.newMessage()
+                        .value(toWrite)
+                        .property(versionTag, 
Long.toString(functionMetaData.getVersion()));
+                if (workerConfig.getUseCompactedMetadataTopic()) {
+                    builder = 
builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
+                }
+                lastMessageSeen = builder.send();
+            } catch (Exception e) {
+                log.error("Could not write into Function Metadata topic", e);
+                throw new IllegalStateException("Internal Error updating 
function at the leader", e);
+            }
+
         }
+
         if (needsScheduling) {
             this.schedulerManager.schedule();
         }

Reply via email to