This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4af6b2620b09433505728ef696c2bbaf684858c7
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Apr 27 09:47:54 2022 +0200

    PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
    
    Sometimes the CommandSubscribe object has already been released and it 
triggers this error:
    17:36:40.676 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] WARN  
org.apache.pulsar.broker.service.ServerCnx - 
[/192.168.1.111:50688][persistent://public/default/test-cb4105f6-f850-4bdf-9e79-66d4ac42658c][13b9ee68-4ee4-4845-b955-77420b8b6a29]
 Failed to create consumer: consumerId=0, refCnt: 0
    java.util.concurrent.CompletionException: 
io.netty.util.IllegalReferenceCountException: refCnt: 0
            at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
 ~[?:?]
            at 
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1081) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
            at 
org.apache.pulsar.broker.service.BrokerService.lambda(BrokerService.java:1419) 
~[pulsar-broker-2.10.0.jar:2.10.0]
            at 
java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121) 
~[?:?]
            at 
org.apache.pulsar.broker.service.BrokerService.openLedgerComplete(BrokerService.java:1405)
 ~[pulsar-broker-2.10.0.jar:2.10.0]
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda(ManagedLedgerFactoryImpl.java:425)
 ~[managed-ledger-2.10.0.jar:2.10.0]
            at 
java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) ~[?:?]
            at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
            at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.initializeComplete(ManagedLedgerFactoryImpl.java:392)
 ~[managed-ledger-2.10.0.jar:2.10.0]
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:525)
 ~[managed-ledger-2.10.0.jar:2.10.0]
            at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:515)
 ~[managed-ledger-2.10.0.jar:2.10.0]
            at 
org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda(MetaStoreImpl.java:167) 
~[managed-ledger-2.10.0.jar:2.10.0]
            at 
java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) 
[?:?]
            at 
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) [?:?]
            at 
java.util.concurrent.CompletableFuture.run(CompletableFuture.java:478) [?:?]
            at 
org.apache.bookkeeper.common.util.OrderedExecutor.run(OrderedExecutor.java:203) 
[bookkeeper-common-4.14.4.jar:4.14.4]
            at java.util.concurrent.Executors.call(Executors.java:515) [?:?]
            at java.util.concurrent.FutureTask.run2168(FutureTask.java:264) 
[?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
            at 
java.util.concurrent.ScheduledThreadPoolExecutor.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
            at 
java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628) [?:?]
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.74.Final.jar:4.1.74.Final]
            at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
            at 
io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) 
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) 
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481) 
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130)
 ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.buffer.PooledSlicedByteBuf.getBytes(PooledSlicedByteBuf.java:235) 
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270) 
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at 
io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246) 
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at 
org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
 ~[pulsar-common-2.10.0.jar:2.10.0]
            at 
org.apache.pulsar.common.api.proto.KeyValue.getKey(KeyValue.java:19) 
~[pulsar-common-2.10.0.jar:2.10.0]
            at java.util.stream.Collectors.lambda(Collectors.java:1658) ~[?:?]
            at 
java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[?:?]
            at java.util.ArrayList.forEachRemaining(ArrayList.java:1511) ~[?:?]
            at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
            at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
~[?:?]
            at 
java.util.stream.ReduceOps.evaluateSequential(ReduceOps.java:913) ~[?:?]
            at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
            at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
            at 
org.apache.pulsar.broker.service.SubscriptionOption.getPropertiesMap(SubscriptionOption.java:57)
 ~[pulsar-broker-2.10.0.jar:2.10.0]
            at 
org.apache.pulsar.broker.service.ServerCnx.lambda(ServerCnx.java:1047) 
~[pulsar-broker-2.10.0.jar:2.10.0]
            at 
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1072) 
~[?:?]
            ... 28 more
    
    (cherry picked from commit e78d9f1ac546c150f4068c148e5ffe95c2ddf1f9)
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index dd78eec80b7..46691273e31 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1016,6 +1016,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 boolean createTopicIfDoesNotExist = forceTopicCreation
                         && 
service.isAllowAutoTopicCreation(topicName.toString());
 
+                final long consumerEpoch;
+                if (subscribe.hasConsumerEpoch()) {
+                    consumerEpoch = subscribe.getConsumerEpoch();
+                } else {
+                    consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+                }
+                Optional<Map<String, String>> subscriptionProperties = 
SubscriptionOption.getPropertiesMap(
+                        subscribe.getSubscriptionPropertiesList());
                 service.getTopic(topicName.toString(), 
createTopicIfDoesNotExist)
                         .thenCompose(optTopic -> {
                             if (!optTopic.isPresent()) {
@@ -1037,10 +1045,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                                 new 
SubscriptionNotFoundException(
                                                         "Subscription does not 
exist"));
                             }
-                            long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
-                            if (subscribe.hasConsumerEpoch()) {
-                                consumerEpoch = subscribe.getConsumerEpoch();
-                            }
+
                             SubscriptionOption option = 
SubscriptionOption.builder().cnx(ServerCnx.this)
                                     .subscriptionName(subscriptionName)
                                     
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1049,8 +1054,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     .initialPosition(initialPosition)
                                     
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
                                     
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-                                    
.subscriptionProperties(SubscriptionOption.getPropertiesMap(
-                                            
subscribe.getSubscriptionPropertiesList()))
+                                    
.subscriptionProperties(subscriptionProperties)
                                     .consumerEpoch(consumerEpoch)
                                     .build();
                             if (schema != null) {

Reply via email to