Shawyeok opened a new issue, #21971: URL: https://github.com/apache/pulsar/issues/21971
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation In summary, `jackson-annotations:2.12.0` or later is now required for `pulsar-client 3.0.x`, and this also applies to versions 3.1.x and 3.2.x. My colleague experienced an issue where the caller thread was blocked at the `ConsumerBuilder#subscribe` method. Below is the stack trace: ``` "main" #1 prio=5 os_prio=0 cpu=18400.66ms elapsed=241.20s tid=0x00007f0fd0017800 nid=0x10 waiting on condition [0x00007f0fd7f17000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park([email protected]/Native Method) - parking to wait for <0x00000000f10134d8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194) at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796) at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128) at java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823) at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998) at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101) ``` The worst part is the absence of any error message; the main thread simply gets blocked indefinitely. After investigating, I identified the root cause mentioned above. A full example to reproduce this issue can be found here: https://github.com/Shawyeok/pulsarClientSubscribeStuck Below is the missing stack trace for `ClassNotFoundException` <details> <summary>Stacktrace</summary> <pre> java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonKey at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.hasAsKey(JacksonAnnotationIntrospector.java:1129) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.hasAsKey(AnnotationIntrospectorPair.java:619) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addFields(POJOPropertiesCollector.java:501) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:426) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:272) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:258) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:391) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:174) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1501) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1449) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:550) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:828) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:308) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1572) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter._writeValueAndClose(ObjectWriter.java:1273) at org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectWriter.writeValueAsString(ObjectWriter.java:1140) at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.init(ConsumerStatsRecorderImpl.java:113) at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.<init>(ConsumerStatsRecorderImpl.java:105) at org.apache.pulsar.client.impl.ConsumerImpl.<init>(ConsumerImpl.java:294) at org.apache.pulsar.client.impl.ConsumerImpl.newConsumerImpl(ConsumerImpl.java:252) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.createInternalConsumer(MultiTopicsConsumerImpl.java:1123) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$doSubscribeTopicPartitions$43(MultiTopicsConsumerImpl.java:1044) at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250) at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.doSubscribeTopicPartitions(MultiTopicsConsumerImpl.java:1056) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$subscribeTopicPartitions$42(MultiTopicsConsumerImpl.java:1001) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.subscribeTopicPartitions(MultiTopicsConsumerImpl.java:999) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.subscribeAsync(MultiTopicsConsumerImpl.java:991) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.lambda$createPartitionedConsumer$39(MultiTopicsConsumerImpl.java:957) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.createPartitionedConsumer(MultiTopicsConsumerImpl.java:957) at org.apache.pulsar.client.impl.PulsarClientImpl.lambda$doSingleTopicSubscribeAsync$5(PulsarClientImpl.java:527) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire$$$capture(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$getPartitionedTopicMetadata$8(BinaryProtoLookupService.java:239) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:669) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:144) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.pulsar.shade.io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonKey at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 85 more </pre> </details> ### Solution The `ClassNotFoundException` is thrown from the `doSubscribeTopicPartitions` method call, which is nested inside a `whenComplete` block. As a result, `subscribeResult` never completes. The relevant code can be found here: https://github.com/apache/pulsar/blob/434da8b5cc76a08215319be7ab066e1d1c23f6c4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1000-L1009 To address this issue, we could modify the `whenComplete` to a combination of `thenAccept` and `exceptionally`. Here's how it could be rewritten:" ```java client.preProcessSchemaBeforeSubscribe(client, schema, topicName) .thenAccept(schema -> { doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist); }).exceptionally(cause -> { subscribeResult.completeExceptionally(cause); }); ``` ### Alternatives _No response_ ### Anything else? Here are related discussions: - https://github.com/apache/pulsar/pull/19458#discussion_r1102070399 - https://github.com/apache/pulsar/pull/19458#issuecomment-1635371635 ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
