This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a1ae52218403e3eb261e03a664e63b9828e4d368
Author: Jiwei Guo <techno...@apache.org>
AuthorDate: Tue Dec 12 17:09:17 2023 +0800

    [fix][fn] Fix Deadlock in Functions Worker LeaderService (#21711)
    
    Fixes #21501
    
    ### Motivation
    
    No need to `synchronized` the method `isLeader` in LeaderService
    
    See the deadlock stack :
    ```
    "pulsar-external-listener-44525-1":
            at 
org.apache.pulsar.functions.worker.FunctionMetaDataManager.giveupLeadership(FunctionMetaDataManager.java)
            - waiting to lock <0x0000100013535c90> (a 
org.apache.pulsar.functions.worker.FunctionMetaDataManager)
            at 
org.apache.pulsar.functions.worker.LeaderService.becameInactive(LeaderService.java:167)
            - locked <0x000010001344c6d8> (a 
org.apache.pulsar.functions.worker.LeaderService)
            at 
org.apache.pulsar.client.impl.ConsumerImpl.lambda$activeConsumerChanged$27(ConsumerImpl.java:1136)
            at 
org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$2606/0x00007f854ce9cb10.run(Unknown
 Source)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.8.1/ThreadPoolExecutor.java:1136)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.8.1/ThreadPoolExecutor.java:635)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(java.base@17.0.8.1/Thread.java:833)
    "pulsar-web-44514-6":
            at 
org.apache.pulsar.functions.worker.LeaderService.isLeader(LeaderService.java)
            - waiting to lock <0x000010001344c6d8> (a 
org.apache.pulsar.functions.worker.LeaderService)
            at 
org.apache.pulsar.functions.worker.SchedulerManager.scheduleInternal(SchedulerManager.java:200)
            at 
org.apache.pulsar.functions.worker.SchedulerManager.schedule(SchedulerManager.java:229)
            at 
org.apache.pulsar.functions.worker.FunctionMetaDataManager.updateFunctionOnLeader(FunctionMetaDataManager.java:251)
            - locked <0x0000100013535c90> (a 
org.apache.pulsar.functions.worker.FunctionMetaDataManager)
            at 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.internalProcessFunctionRequest(ComponentImpl.java:1775)
            at 
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.updateRequest(ComponentImpl.java:996)
            at 
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:222)
            at 
org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:196)
    ```
---
 .../pulsar/functions/worker/PulsarFunctionTlsTest.java   | 16 ++++++++++++++++
 .../apache/pulsar/functions/worker/LeaderService.java    |  4 ++--
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 246d980d617..a06a504af00 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.worker;
 
 import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
@@ -33,6 +35,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -41,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -236,6 +240,18 @@ public class PulsarFunctionTlsTest {
 
             log.info(" -------- Start test function : {}", functionName);
 
+            int finalI = i;
+            Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1, 
TimeUnit.SECONDS).untilAsserted(() -> {
+                final PulsarWorkerService workerService = 
((PulsarWorkerService) fnWorkerServices[finalI]);
+                final LeaderService leaderService = 
workerService.getLeaderService();
+                assertNotNull(leaderService);
+                if (leaderService.isLeader()) {
+                    assertTrue(true);
+                } else {
+                    final WorkerInfo workerInfo = 
workerService.getMembershipManager().getLeader();
+                    assertTrue(workerInfo != null && 
!workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
+                }
+            });
             pulsarAdmins[i].functions().createFunctionWithUrl(
                 functionConfig, jarFilePathUrl
             );
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 7f035b5562f..e7816f06aac 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -41,7 +41,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
     private ConsumerImpl<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private final PulsarClient pulsarClient;
-    private boolean isLeader = false;
+    private volatile boolean isLeader = false;
 
     static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
 
@@ -172,7 +172,7 @@ public class LeaderService implements AutoCloseable, 
ConsumerEventListener {
         }
     }
 
-    public synchronized boolean isLeader() {
+    public boolean isLeader() {
         return isLeader;
     }
 

Reply via email to