Shawyeok opened a new issue, #21971:
URL: https://github.com/apache/pulsar/issues/21971

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   In summary, `jackson-annotations:2.12.0` or later is now required for 
`pulsar-client 3.0.x`, and this also applies to versions 3.1.x and 3.2.x.
   
   My colleague experienced an issue where the caller thread was blocked at the 
`ConsumerBuilder#subscribe` method. Below is the stack trace:
   ```
   "main" #1 prio=5 os_prio=0 cpu=18400.66ms elapsed=241.20s 
tid=0x00007f0fd0017800 nid=0x10 waiting on condition  [0x00007f0fd7f17000]
      java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00000000f10134d8> (a 
java.util.concurrent.CompletableFuture$Signaller)
        at 
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
        at 
java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
        at 
java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
        at 
java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
        at 
java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
        at 
org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)
   ```
   
   The worst part is the absence of any error message; the main thread simply 
gets blocked indefinitely. After investigating, I identified the root cause 
mentioned above.
   
   A full example to reproduce this issue can be found here: 
https://github.com/Shawyeok/pulsarClientSubscribeStuck
   
   Below is the missing stack trace for `ClassNotFoundException`
   <details>
   <summary>Stacktrace</summary>
   <pre>
   java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonKey
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.hasAsKey(JacksonAnnotationIntrospector.java:1129)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasAsKey(AnnotationIntrospectorPair.java:619)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addFields(POJOPropertiesCollector.java:501)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:426)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:272)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:258)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:391)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:174)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1501)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1449)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:550)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:828)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:308)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1572)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter._writeValueAndClose(ObjectWriter.java:1273)
        at 
org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter.writeValueAsString(ObjectWriter.java:1140)
        at 
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.init(ConsumerStatsRecorderImpl.java:113)
        at 
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.<init>(ConsumerStatsRecorderImpl.java:105)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.<init>(ConsumerImpl.java:294)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.newConsumerImpl(ConsumerImpl.java:252)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.createInternalConsumer(MultiTopicsConsumerImpl.java:1123)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$doSubscribeTopicPartitions$43(MultiTopicsConsumerImpl.java:1044)
        at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
        at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
        at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doSubscribeTopicPartitions(MultiTopicsConsumerImpl.java:1056)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$subscribeTopicPartitions$42(MultiTopicsConsumerImpl.java:1001)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
        at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.subscribeTopicPartitions(MultiTopicsConsumerImpl.java:999)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.subscribeAsync(MultiTopicsConsumerImpl.java:991)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$createPartitionedConsumer$39(MultiTopicsConsumerImpl.java:957)
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at 
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.createPartitionedConsumer(MultiTopicsConsumerImpl.java:957)
        at 
org.apache.pulsar.client.impl.PulsarClientImpl.lambda$doSingleTopicSubscribeAsync$5(PulsarClientImpl.java:527)
        at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646)
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$getPartitionedTopicMetadata$8(BinaryProtoLookupService.java:239)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:669)
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:144)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
org.apache.pulsar.shade.io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.ClassNotFoundException: 
com.fasterxml.jackson.annotation.JsonKey
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 85 more
   </pre>
   </details>
   
   ### Solution
   
   The `ClassNotFoundException` is thrown from the `doSubscribeTopicPartitions` 
method call, which is nested inside a `whenComplete` block. As a result, 
`subscribeResult` never completes. The relevant code can be found here: 
https://github.com/apache/pulsar/blob/434da8b5cc76a08215319be7ab066e1d1c23f6c4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1000-L1009
   
   To address this issue, we could modify the `whenComplete` to a combination 
of `thenAccept` and `exceptionally`. Here's how it could be rewritten:"
   ```java
   client.preProcessSchemaBeforeSubscribe(client, schema, topicName)
         .thenAccept(schema -> {
             doSubscribeTopicPartitions(schema, subscribeResult, topicName, 
numPartitions, createIfDoesNotExist);
         }).exceptionally(cause -> {
             subscribeResult.completeExceptionally(cause);
         });
   ```
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   Here are related discussions:
   - https://github.com/apache/pulsar/pull/19458#discussion_r1102070399
   - https://github.com/apache/pulsar/pull/19458#issuecomment-1635371635
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to