This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5ea98b46c631db5928c956eae5481a0ea1dab22a Author: Jiwei Guo <[email protected]> AuthorDate: Sun Mar 27 22:07:03 2022 +0800 [fix][broker] Fix topic policy reader close bug. (#14897) ### Motivation https://github.com/apache/pulsar/issues/14896 is flaky, after diving into the codes, I find it's a bug about closing topic policy reader. We should use `ex.getCause` instead of `ex`. Stacktraceļ¼ ``` 2022-03-27T10:42:16,795+0800 [broker-client-shared-internal-executor-58-1] WARN org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService - Read more topic polices exception, read again. java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Consumer already closed at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_291] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_291] at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) ~[?:1.8.0_291] at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) ~[?:1.8.0_291] at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) ~[?:1.8.0_291] at org.apache.pulsar.client.impl.MultiTopicsReaderImpl.readNextAsync(MultiTopicsReaderImpl.java:140) ~[classes/:?] ``` (cherry picked from commit 85c1ba50cb155111f969403a092affd536d9dcb9) --- .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java | 4 +++- .../apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 964bdde..1c01814 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -364,7 +365,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic notifyListener(msg); readMorePolicies(reader); } else { - if (ex instanceof PulsarClientException.AlreadyClosedException) { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof PulsarClientException.AlreadyClosedException) { log.error("Read more topic policies exception, close the read now!", ex); cleanCacheAndCloseReader( reader.getSystemTopic().getTopicName().getNamespaceObject(), false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 7dd02bd..bbd3cae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -74,6 +74,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { Assert.assertEquals(admin.topics().getPartitionedTopicList(ns).size(), 1); Assert.assertEquals(partitions, PARTITIONS); Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS); + reader.close(); } @Test(timeOut = 1000 * 60) @@ -97,6 +98,10 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { .subscribeAsync()); } FutureUtil.waitForAll(futureList).get(); + // Close all the consumers after check + for (CompletableFuture<Consumer<byte[]>> consumer : futureList) { + consumer.join().close(); + } } }
