eolivelli opened a new issue #12508:
URL: https://github.com/apache/pulsar/issues/12508
**Describe the bug**
In case of a AVRO message Message.getValue() needs to download the schema
from the Registry performing a blocking operation that involves a request to
the Broker itself.
If you call Message.getValue() on the completion of a CompletableFuture
returned by Pulsar Client, like Reader.readNextAsync() then there is a good
chance to create a deadlock.
This is my code
```
private CompletableFuture<?> readNextMessageIfAvailable(Reader<Op> reader,
Consumer<Message> consumer) {
return reader
.hasMessageAvailableAsync()
.thenCompose(hasMessageAvailable -> {
if (hasMessageAvailable == null
|| !hasMessageAvailable) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Message<Op>> opMessage =
reader.readNextAsync();
return opMessage.thenCompose(msg -> {
Op value = msg.getValue(); // <------
DEADLOCK !!!
consumer.accept(value);
return readNextMessageIfAvailable(reader);
});
}
});
}
```
In this code msg.getValue() is executed on completion of readNextAsync() or
of hasMessageAvailableAsync() and this may happen in the ClientCnx thread.
This is the stacktrace:
```
"pulsar-client-io-54-1" #152 prio=5 os_prio=31 cpu=89.65ms elapsed=7.12s
tid=0x00007fa8bee81000 nid=0x29e03 waiting on condition [0x000070000f2fa000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x000000079d15a990> (a
java.util.concurrent.CompletableFuture$Signaller)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at
java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
at
java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
at
java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
at
java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaInfoByVersion(AbstractMultiVersionReader.java:119)
at
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader.loadReader(MultiVersionAvroReader.java:47)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:52)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader$1.load(AbstractMultiVersionReader.java:49)
at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
- locked <0x000000079d10d5e0> (a
com.google.common.cache.LocalCache$StrongAccessEntry)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
at
org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:90)
at
org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:67)
at
org.apache.pulsar.client.impl.MessageImpl.decode(MessageImpl.java:471)
at
org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:449)
at
io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage.lambda$readNextMessageIfAvailable$0(PulsarSchemaStorage.java:143)
at
io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage$$Lambda$1421/0x0000000800abd840.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1106)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2235)
at
io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage.lambda$readNextMessageIfAvailable$1(PulsarSchemaStorage.java:142)
at
io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.PulsarSchemaStorage$$Lambda$1369/0x0000000800a99840.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire([email protected]/CompletableFuture.java:1072)
at
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
at
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$hasMessageAvailableAsync$48(ConsumerImpl.java:1925)
at
org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$1367/0x0000000800a99040.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
at
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
at
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
at
org.apache.pulsar.client.impl.ConsumerImpl.lambda$internalGetLastMessageIdAsync$53(ConsumerImpl.java:2042)
at
org.apache.pulsar.client.impl.ConsumerImpl$$Lambda$1364/0x0000000800a98440.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
at
java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
at
java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
at
org.apache.pulsar.client.impl.ClientCnx.handleGetLastMessageIdSuccess(ClientCnx.java:491)
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:298)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
```
There is a workaround, that is to use "thenComposeAsync" instead of
"thenCompose", but apart from this specific problem, the real problem is that
`Message.getValue()` is a blocking operation that performs a network request
(that is piled up on the same eventloop that triggered the call).
Fixing this is not trivial, as Message.getValue() triggers the Schema
Implementation, in this case AbstractStructSchema, that in turn needs to
download the Schema, so it finally depends on the Schema Implementation.
Originally the problem was reported by @vroyer in the context of Pulsar IO,
my case is different, but in any case this problem affects any kind of trial of
implementing a fully non blocking API for Pulsar
**To Reproduce**
Use the sample code
**Expected behavior**
No deadlock
**Proposal**
One possibility is to force loading of the Schema inside the readNextAsync()
method and let the CompletableFuture be completed only when the schema is fully
loaded, this way the Message.getValue() won't block anymore.
This change will need to change the Schema API in order to support
asynchronous decoding or at least asynchronous pre-fetching of the schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]