This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4af6b2620b09433505728ef696c2bbaf684858c7 Author: Enrico Olivelli <[email protected]> AuthorDate: Wed Apr 27 09:47:54 2022 +0200 PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335) Sometimes the CommandSubscribe object has already been released and it triggers this error: 17:36:40.676 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] WARN org.apache.pulsar.broker.service.ServerCnx - [/192.168.1.111:50688][persistent://public/default/test-cb4105f6-f850-4bdf-9e79-66d4ac42658c][13b9ee68-4ee4-4845-b955-77420b8b6a29] Failed to create consumer: consumerId=0, refCnt: 0 java.util.concurrent.CompletionException: io.netty.util.IllegalReferenceCountException: refCnt: 0 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?] at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1081) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.pulsar.broker.service.BrokerService.lambda(BrokerService.java:1419) ~[pulsar-broker-2.10.0.jar:2.10.0] at java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815) ~[?:?] at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799) ~[?:?] at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121) ~[?:?] at org.apache.pulsar.broker.service.BrokerService.openLedgerComplete(BrokerService.java:1405) ~[pulsar-broker-2.10.0.jar:2.10.0] at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda(ManagedLedgerFactoryImpl.java:425) ~[managed-ledger-2.10.0.jar:2.10.0] at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) ~[?:?] at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.initializeComplete(ManagedLedgerFactoryImpl.java:392) ~[managed-ledger-2.10.0.jar:2.10.0] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:525) ~[managed-ledger-2.10.0.jar:2.10.0] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:515) ~[managed-ledger-2.10.0.jar:2.10.0] at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda(MetaStoreImpl.java:167) ~[managed-ledger-2.10.0.jar:2.10.0] at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) [?:?] at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) [?:?] at java.util.concurrent.CompletableFuture.run(CompletableFuture.java:478) [?:?] at org.apache.bookkeeper.common.util.OrderedExecutor.run(OrderedExecutor.java:203) [bookkeeper-common-4.14.4.jar:4.14.4] at java.util.concurrent.Executors.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run2168(FutureTask.java:264) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] at java.util.concurrent.ScheduledThreadPoolExecutor.run(ScheduledThreadPoolExecutor.java:304) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628) [?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at io.netty.buffer.PooledSlicedByteBuf.getBytes(PooledSlicedByteBuf.java:235) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final] at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250) ~[pulsar-common-2.10.0.jar:2.10.0] at org.apache.pulsar.common.api.proto.KeyValue.getKey(KeyValue.java:19) ~[pulsar-common-2.10.0.jar:2.10.0] at java.util.stream.Collectors.lambda(Collectors.java:1658) ~[?:?] at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[?:?] at java.util.ArrayList.forEachRemaining(ArrayList.java:1511) ~[?:?] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?] at java.util.stream.ReduceOps.evaluateSequential(ReduceOps.java:913) ~[?:?] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?] at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?] at org.apache.pulsar.broker.service.SubscriptionOption.getPropertiesMap(SubscriptionOption.java:57) ~[pulsar-broker-2.10.0.jar:2.10.0] at org.apache.pulsar.broker.service.ServerCnx.lambda(ServerCnx.java:1047) ~[pulsar-broker-2.10.0.jar:2.10.0] at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1072) ~[?:?] ... 28 more (cherry picked from commit e78d9f1ac546c150f4068c148e5ffe95c2ddf1f9) --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index dd78eec80b7..46691273e31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1016,6 +1016,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { boolean createTopicIfDoesNotExist = forceTopicCreation && service.isAllowAutoTopicCreation(topicName.toString()); + final long consumerEpoch; + if (subscribe.hasConsumerEpoch()) { + consumerEpoch = subscribe.getConsumerEpoch(); + } else { + consumerEpoch = DEFAULT_CONSUMER_EPOCH; + } + Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap( + subscribe.getSubscriptionPropertiesList()); service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { @@ -1037,10 +1045,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { new SubscriptionNotFoundException( "Subscription does not exist")); } - long consumerEpoch = DEFAULT_CONSUMER_EPOCH; - if (subscribe.hasConsumerEpoch()) { - consumerEpoch = subscribe.getConsumerEpoch(); - } + SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this) .subscriptionName(subscriptionName) .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel) @@ -1049,8 +1054,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { .initialPosition(initialPosition) .startMessageRollbackDurationSec(startMessageRollbackDurationSec) .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) - .subscriptionProperties(SubscriptionOption.getPropertiesMap( - subscribe.getSubscriptionPropertiesList())) + .subscriptionProperties(subscriptionProperties) .consumerEpoch(consumerEpoch) .build(); if (schema != null) {
