This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ea98b46c631db5928c956eae5481a0ea1dab22a
Author: Jiwei Guo <[email protected]>
AuthorDate: Sun Mar 27 22:07:03 2022 +0800

    [fix][broker] Fix topic policy reader close bug. (#14897)
    
    ### Motivation
    
    https://github.com/apache/pulsar/issues/14896 is flaky, after diving into 
the codes, I find it's a bug about closing topic policy reader.  We should use 
`ex.getCause` instead of `ex`.
    
    Stacktrace:
    ```
    2022-03-27T10:42:16,795+0800 [broker-client-shared-internal-executor-58-1] 
WARN  org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService - 
Read more topic polices exception, read again.
    java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: 
Consumer already closed
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_291]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_291]
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) 
~[?:1.8.0_291]
        at 
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
 ~[?:1.8.0_291]
        at 
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) 
~[?:1.8.0_291]
        at 
org.apache.pulsar.client.impl.MultiTopicsReaderImpl.readNextAsync(MultiTopicsReaderImpl.java:140)
 ~[classes/:?]
    
    ```
    
    (cherry picked from commit 85c1ba50cb155111f969403a092affd536d9dcb9)
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java  | 4 +++-
 .../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java    | 5 +++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 964bdde..1c01814 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -364,7 +365,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 notifyListener(msg);
                 readMorePolicies(reader);
             } else {
-                if (ex instanceof 
PulsarClientException.AlreadyClosedException) {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
                     log.error("Read more topic policies exception, close the 
read now!", ex);
                     cleanCacheAndCloseReader(
                             
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 7dd02bd..bbd3cae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -74,6 +74,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 
1);
         Assert.assertEquals(partitions, PARTITIONS);
         Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
+        reader.close();
     }
 
     @Test(timeOut = 1000 * 60)
@@ -97,6 +98,10 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
                     .subscribeAsync());
         }
         FutureUtil.waitForAll(futureList).get();
+        // Close all the consumers after check
+        for (CompletableFuture<Consumer<byte[]>> consumer : futureList) {
+            consumer.join().close();
+        }
     }
 
 }

Reply via email to