This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3396065a3a6 [fix][fn] Fix Deadlock in Functions Worker LeaderService
(#21711)
3396065a3a6 is described below
commit 3396065a3a62af3e3586700f0bbfcbff93716b48
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Dec 12 17:09:17 2023 +0800
[fix][fn] Fix Deadlock in Functions Worker LeaderService (#21711)
Fixes #21501
### Motivation
No need to `synchronized` the method `isLeader` in LeaderService
See the deadlock stack :
```
"pulsar-external-listener-44525-1":
at
org.apache.pulsar.functions.worker.FunctionMetaDataManager.giveupLeadership(FunctionMetaDataManager.java)
- waiting to lock <0x0000100013535c90> (a
org.apache.pulsar.functions.worker.FunctionMetaDataManager)
at
org.apache.pulsar.functions.worker.LeaderService.becameInactive(LeaderService.java:167)
- locked <0x000010001344c6d8> (a
org.apache.pulsar.functions.worker.LeaderService)
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$activeConsumerChanged$27(ConsumerImpl.java:1136)
at
org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$2606/0x00007f854ce9cb10.run(Unknown
Source)
at
java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:833)
"pulsar-web-44514-6":
at
org.apache.pulsar.functions.worker.LeaderService.isLeader(LeaderService.java)
- waiting to lock <0x000010001344c6d8> (a
org.apache.pulsar.functions.worker.LeaderService)
at
org.apache.pulsar.functions.worker.SchedulerManager.scheduleInternal(SchedulerManager.java:200)
at
org.apache.pulsar.functions.worker.SchedulerManager.schedule(SchedulerManager.java:229)
at
org.apache.pulsar.functions.worker.FunctionMetaDataManager.updateFunctionOnLeader(FunctionMetaDataManager.java:251)
- locked <0x0000100013535c90> (a
org.apache.pulsar.functions.worker.FunctionMetaDataManager)
at
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.internalProcessFunctionRequest(ComponentImpl.java:1775)
at
org.apache.pulsar.functions.worker.rest.api.ComponentImpl.updateRequest(ComponentImpl.java:996)
at
org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.registerFunction(FunctionsImpl.java:222)
at
org.apache.pulsar.broker.admin.impl.FunctionsBase.registerFunction(FunctionsBase.java:196)
```
---
.../pulsar/functions/worker/PulsarFunctionTlsTest.java | 16 ++++++++++++++++
.../apache/pulsar/functions/worker/LeaderService.java | 4 ++--
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 1e8b26beee3..9882b15450e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.worker;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
@@ -33,6 +35,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -41,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -242,6 +246,18 @@ public class PulsarFunctionTlsTest {
log.info(" -------- Start test function : {}", functionName);
+ int finalI = i;
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).pollInterval(1,
TimeUnit.SECONDS).untilAsserted(() -> {
+ final PulsarWorkerService workerService =
((PulsarWorkerService) fnWorkerServices[finalI]);
+ final LeaderService leaderService =
workerService.getLeaderService();
+ assertNotNull(leaderService);
+ if (leaderService.isLeader()) {
+ assertTrue(true);
+ } else {
+ final WorkerInfo workerInfo =
workerService.getMembershipManager().getLeader();
+ assertTrue(workerInfo != null &&
!workerInfo.getWorkerId().equals(workerService.getWorkerConfig().getWorkerId()));
+ }
+ });
pulsarAdmins[i].functions().createFunctionWithUrl(
functionConfig, jarFilePathUrl
);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
index 7f035b5562f..e7816f06aac 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java
@@ -41,7 +41,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
private ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
- private boolean isLeader = false;
+ private volatile boolean isLeader = false;
static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
@@ -172,7 +172,7 @@ public class LeaderService implements AutoCloseable,
ConsumerEventListener {
}
}
- public synchronized boolean isLeader() {
+ public boolean isLeader() {
return isLeader;
}