This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d654d5e [pulsar-function] fix possible deadlock on broker-function
service startup (#9499)
d654d5e is described below
commit d654d5e0d2661592654204a0594fcb0a73f14a4e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Feb 9 08:25:37 2021 -0800
[pulsar-function] fix possible deadlock on broker-function service startup
(#9499)
### Motivation
Standalone pulsar broker and function service can have deadlock which also
sometime causes failures in function unit-test case. Pulsar function tries to
create a subscription which blocks the zk thread and can be cause of possible
deadlock. Below is the thread-dump when the deadlock happens and function
service start up fails.
So, remove blocking call while creating subscription using admin-api.
```
"pulsar-load-manager-139-1" #717 prio=5 os_prio=31 tid=0x00007f8a68698000
nid=0x3610f waiting on condition [0x00007000156e9000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000792a07020> (a
java.util.concurrent.CompletableFuture$Signaller)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at
org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
at
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationFromZK(SimpleLoadManagerImpl.java:391)
at
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationDouble(SimpleLoadManagerImpl.java:401)
at
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getLoadBalancerBrokerOverloadedThresholdPercentage(SimpleLoadManagerImpl.java:450)
at
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.generateLoadReportForcefully(SimpleLoadManagerImpl.java:1139)
- locked <0x000000078e53f2d0> (a java.util.HashSet)
at
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.writeLoadReportOnZookeeper(SimpleLoadManagerImpl.java:1295)
at
org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask.run(LoadReportUpdaterTask.java:39)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"pulsar-ordered-OrderedExecutor-1-0-EventThread" #621 daemon prio=5
os_prio=31 tid=0x00007f89fd8e1800 nid=0x2fe07 waiting on condition
[0x000070000f7ce000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000792ecb8b0> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:2060)
at
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$69(PersistentTopicsBase.java:2034)
at
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$676/1709042625.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$5(ZooKeeperCache.java:249)
at
org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$154/5998675.processResult(Unknown
Source)
at
org.apache.bookkeeper.zookeeper.ZooKeeperClient$15$1.processResult(ZooKeeperClient.java:879)
at
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:583)
at
org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510)
main" #1 prio=5 os_prio=31 tid=0x00007f8aa0013000 nid=0x1603 waiting on
condition [0x0000700004160000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007930efe68> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:178)
at
org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:297)
at
org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:632)
at
org.glassfish.jersey.client.JerseyInvocation$$Lambda$672/237755480.call(Unknown
Source)
at
org.glassfish.jersey.client.JerseyInvocation.call(JerseyInvocation.java:654)
at
org.glassfish.jersey.client.JerseyInvocation.lambda$runInScope$3(JerseyInvocation.java:648)
at
org.glassfish.jersey.client.JerseyInvocation$$Lambda$673/197134325.call(Unknown
Source)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
at
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
at
org.glassfish.jersey.client.JerseyInvocation.runInScope(JerseyInvocation.java:648)
at
org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:631)
at
org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:434)
at
org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:318)
at
org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1143)
at
org.apache.pulsar.functions.worker.PulsarWorkerService.start(PulsarWorkerService.java:454)
at
org.apache.pulsar.broker.PulsarService.startWorkerService(PulsarService.java:1343)
at
org.apache.pulsar.broker.PulsarService.start(PulsarService.java:671)
at
org.apache.pulsar.io.PulsarFunctionE2ETest.setup(PulsarFunctionE2ETest.java:209)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
```
---
.../broker/admin/impl/PersistentTopicsBase.java | 52 ++++++++++++----------
1 file changed, 28 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a55f683..82a0e5c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2124,30 +2124,34 @@ public class PersistentTopicsBase extends AdminResource
{
asyncResponse.resume(new RestException(Status.CONFLICT,
"Subscription already exists for topic"));
return;
}
- PersistentSubscription subscription = (PersistentSubscription)
topic
- .createSubscription(subscriptionName, InitialPosition.Latest,
replicated).get();
- // Mark the cursor as "inactive" as it was created without a real
consumer connected
- subscription.deactivateCursor();
-
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(),
targetMessageId.getEntryId()))
- .thenRun(() -> {
- log.info("[{}][{}] Successfully created subscription
{} at message id {}", clientAppId(),
- topicName, subscriptionName, targetMessageId);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable t = (ex instanceof CompletionException ?
ex.getCause() : ex);
- log.warn("[{}][{}] Failed to create subscription {} at
message id {}", clientAppId(), topicName,
- subscriptionName, targetMessageId, t);
- if (t instanceof SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for position
specified: " + t.getMessage()));
- } else if (t instanceof SubscriptionBusyException) {
- asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription Busy: " +
t.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, t);
- }
- return null;
- });
+ topic.createSubscription(subscriptionName, InitialPosition.Latest,
replicated).thenApply(subscription -> {
+ // Mark the cursor as "inactive" as it was created without a
real consumer connected
+ ((PersistentSubscription) subscription).deactivateCursor();
+
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(),
targetMessageId.getEntryId()))
+ .thenRun(() -> {
+ log.info("[{}][{}] Successfully created
subscription {} at message id {}", clientAppId(),
+ topicName, subscriptionName,
targetMessageId);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable t = (ex instanceof CompletionException ?
ex.getCause() : ex);
+ log.warn("[{}][{}] Failed to create subscription
{} at message id {}", clientAppId(),
+ topicName, subscriptionName,
targetMessageId, t);
+ if (t instanceof
SubscriptionInvalidCursorPosition) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Unable to find position for position
specified: " + t.getMessage()));
+ } else if (t instanceof SubscriptionBusyException)
{
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " +
t.getMessage()));
+ } else {
+
resumeAsyncResponseExceptionally(asyncResponse, t);
+ }
+ return null;
+ });
+ return null;
+ }).exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+ return null;
+ });
} catch (Throwable e) {
log.warn("[{}][{}] Failed to create subscription {} at message id
{}", clientAppId(), topicName,
subscriptionName, targetMessageId, e);