This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d654d5e  [pulsar-function] fix possible deadlock on broker-function 
service startup (#9499)
d654d5e is described below

commit d654d5e0d2661592654204a0594fcb0a73f14a4e
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Feb 9 08:25:37 2021 -0800

    [pulsar-function] fix possible deadlock on broker-function service startup 
(#9499)
    
    ### Motivation
    
    Standalone pulsar broker and function service can have deadlock which also 
sometime causes failures in function unit-test case. Pulsar function tries to 
create a subscription which blocks the zk thread and can be cause of possible 
deadlock. Below is the thread-dump when the deadlock happens and function 
service start up fails.
    
    So, remove blocking call while creating subscription using admin-api.
    
    
    
    ```
    
    "pulsar-load-manager-139-1" #717 prio=5 os_prio=31 tid=0x00007f8a68698000 
nid=0x3610f waiting on condition [0x00007000156e9000]
       java.lang.Thread.State: TIMED_WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000792a07020> (a 
java.util.concurrent.CompletableFuture$Signaller)
            at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
            at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709)
            at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
            at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788)
            at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
            at 
org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
            at 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationFromZK(SimpleLoadManagerImpl.java:391)
            at 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationDouble(SimpleLoadManagerImpl.java:401)
            at 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getLoadBalancerBrokerOverloadedThresholdPercentage(SimpleLoadManagerImpl.java:450)
            at 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.generateLoadReportForcefully(SimpleLoadManagerImpl.java:1139)
            - locked <0x000000078e53f2d0> (a java.util.HashSet)
            at 
org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.writeLoadReportOnZookeeper(SimpleLoadManagerImpl.java:1295)
            at 
org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask.run(LoadReportUpdaterTask.java:39)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:748)
    
    
    
    "pulsar-ordered-OrderedExecutor-1-0-EventThread" #621 daemon prio=5 
os_prio=31 tid=0x00007f89fd8e1800 nid=0x2fe07 waiting on condition 
[0x000070000f7ce000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000792ecb8b0> (a 
java.util.concurrent.CompletableFuture$Signaller)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
            at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
            at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
            at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
            at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:2060)
            at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$69(PersistentTopicsBase.java:2034)
            at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$676/1709042625.accept(Unknown
 Source)
            at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
            at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
            at 
org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$5(ZooKeeperCache.java:249)
            at 
org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$154/5998675.processResult(Unknown
 Source)
            at 
org.apache.bookkeeper.zookeeper.ZooKeeperClient$15$1.processResult(ZooKeeperClient.java:879)
            at 
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:583)
            at 
org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510)
    
    
    main" #1 prio=5 os_prio=31 tid=0x00007f8aa0013000 nid=0x1603 waiting on 
condition [0x0000700004160000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000007930efe68> (a 
java.util.concurrent.CompletableFuture$Signaller)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
            at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
            at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
            at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
            at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:178)
            at 
org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:297)
            at 
org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:632)
            at 
org.glassfish.jersey.client.JerseyInvocation$$Lambda$672/237755480.call(Unknown 
Source)
            at 
org.glassfish.jersey.client.JerseyInvocation.call(JerseyInvocation.java:654)
            at 
org.glassfish.jersey.client.JerseyInvocation.lambda$runInScope$3(JerseyInvocation.java:648)
            at 
org.glassfish.jersey.client.JerseyInvocation$$Lambda$673/197134325.call(Unknown 
Source)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
            at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
            at 
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
            at 
org.glassfish.jersey.client.JerseyInvocation.runInScope(JerseyInvocation.java:648)
            at 
org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:631)
            at 
org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:434)
            at 
org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:318)
            at 
org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1143)
            at 
org.apache.pulsar.functions.worker.PulsarWorkerService.start(PulsarWorkerService.java:454)
            at 
org.apache.pulsar.broker.PulsarService.startWorkerService(PulsarService.java:1343)
            at 
org.apache.pulsar.broker.PulsarService.start(PulsarService.java:671)
            at 
org.apache.pulsar.io.PulsarFunctionE2ETest.setup(PulsarFunctionE2ETest.java:209)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    ```
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 52 ++++++++++++----------
 1 file changed, 28 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a55f683..82a0e5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2124,30 +2124,34 @@ public class PersistentTopicsBase extends AdminResource 
{
                 asyncResponse.resume(new RestException(Status.CONFLICT, 
"Subscription already exists for topic"));
                 return;
             }
-            PersistentSubscription subscription = (PersistentSubscription) 
topic
-                .createSubscription(subscriptionName, InitialPosition.Latest, 
replicated).get();
-            // Mark the cursor as "inactive" as it was created without a real 
consumer connected
-            subscription.deactivateCursor();
-            
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), 
targetMessageId.getEntryId()))
-                    .thenRun(() -> {
-                        log.info("[{}][{}] Successfully created subscription 
{} at message id {}", clientAppId(),
-                                topicName, subscriptionName, targetMessageId);
-                        asyncResponse.resume(Response.noContent().build());
-                    }).exceptionally(ex -> {
-                        Throwable t = (ex instanceof CompletionException ? 
ex.getCause() : ex);
-                        log.warn("[{}][{}] Failed to create subscription {} at 
message id {}", clientAppId(), topicName,
-                                subscriptionName, targetMessageId, t);
-                        if (t instanceof SubscriptionInvalidCursorPosition) {
-                            asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                                    "Unable to find position for position 
specified: " + t.getMessage()));
-                        } else if (t instanceof SubscriptionBusyException) {
-                            asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
-                                    "Failed for Subscription Busy: " + 
t.getMessage()));
-                        } else {
-                            resumeAsyncResponseExceptionally(asyncResponse, t);
-                        }
-                        return null;
-                    });
+            topic.createSubscription(subscriptionName, InitialPosition.Latest, 
replicated).thenApply(subscription -> {
+                // Mark the cursor as "inactive" as it was created without a 
real consumer connected
+                ((PersistentSubscription) subscription).deactivateCursor();
+                
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), 
targetMessageId.getEntryId()))
+                        .thenRun(() -> {
+                            log.info("[{}][{}] Successfully created 
subscription {} at message id {}", clientAppId(),
+                                    topicName, subscriptionName, 
targetMessageId);
+                            asyncResponse.resume(Response.noContent().build());
+                        }).exceptionally(ex -> {
+                            Throwable t = (ex instanceof CompletionException ? 
ex.getCause() : ex);
+                            log.warn("[{}][{}] Failed to create subscription 
{} at message id {}", clientAppId(),
+                                    topicName, subscriptionName, 
targetMessageId, t);
+                            if (t instanceof 
SubscriptionInvalidCursorPosition) {
+                                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                        "Unable to find position for position 
specified: " + t.getMessage()));
+                            } else if (t instanceof SubscriptionBusyException) 
{
+                                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                        "Failed for Subscription Busy: " + 
t.getMessage()));
+                            } else {
+                                
resumeAsyncResponseExceptionally(asyncResponse, t);
+                            }
+                            return null;
+                        });
+                return null;
+            }).exceptionally(ex -> {
+                resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                return null;
+            });
         } catch (Throwable e) {
             log.warn("[{}][{}] Failed to create subscription {} at message id 
{}", clientAppId(), topicName,
                     subscriptionName, targetMessageId, e);

Reply via email to