devinbost edited a comment on issue #6054:
URL: https://github.com/apache/pulsar/issues/6054#issuecomment-839495126
I traced the call chain for where ack's should be getting sent, and I filled
in some additional steps:
When `ProducerImpl` sends the messages, it builds `newSend` command
instances, which get picked up by `ServerCnx.handleSend(..)`, which goes to
`Producer.publishMessage(..)`, then `PersistentTopic.publishMessage(..)`
which calls `asyncAddEntry(headersAndPayload, publishContext)`, which calls
`[PersistentTopic].ledger.asyncAddEntry(headersAndPayload, (int)
publishContext.getNumberOfMessages(), this, publishContext)`
which creates the `OpAddEntry` and calls
`internalAsyncAddEntry(addOperation)` on a different thread, which adds
`OpAddEntry` to `[ManagedLedgerImpl].pendingAddEntries`
From somewhere (it's not clear to me exactly where yet) we call
`OpAddEntry.safeRun()`, which polls `pendingAddEntries`, gets the callback on
`OpAddEntry` (which is the `PersistentTopic` instance) and calls
`[PersistentTopic].addComplete(lastEntry, data.asReadOnly(), ctx)`, which calls
`publishContext.completed(..)` on `Producer.MessagePublishContext`, which calls
`Producer.ServerCnx.execute(this)` on the `MessagePublishContext`, which calls
`MessagePublishContext.run()`, which triggers
`Producer.ServerCnx.getCommandSender().sendSendReceiptResponse(..)`
[SEND_RECEIPT], which writes a `newSendReceiptCommand` to the channel.
From there, the client gets the `SEND_RECEIPT` command from `PulsarDecoder`,
which calls `[ClientCnx].handleSendReceipt(..)`, which calls
`[ProducerImpl].ackReceived(..)`, which calls `releaseSemaphoreForSendOp(..)`
to release the semaphore.
So, I added a lot of logging, and to my great surprise, I'm getting an NPE
after adding these debug lines to `OpAddEntry.createOpAddEntry(..)`:
```
private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf
data, AddEntryCallback callback, Object ctx) {
log.debug("Running OpAddEntry.createOpAddEntry(..)");
OpAddEntry op = RECYCLER.get();
log.debug("1. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op
!= null ? op.toString() : null);
op.ml = ml;
op.ledger = null;
op.data = data.retain();
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
op.addOpCount =
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
op.closeWhenDone = false;
op.entryId = -1;
op.startTime = System.nanoTime();
op.state = State.OPEN;
ml.mbean.addAddEntrySample(op.dataLength);
log.debug("2. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op
!= null ? op.toString() : null);
return op;
}
```
Here's the stack trace:
```
2021-05-12T06:14:19,569 [pulsar-io-28-32] DEBUG
org.apache.bookkeeper.mledger.impl.OpAddEntry - Running
OpAddEntry.createOpAddEntry(..)
2021-05-12T06:14:19,569 [pulsar-io-28-32] WARN
org.apache.pulsar.broker.service.ServerCnx - [/10.20.20.160:37696] Got
exception java.lang.NullPointerException
at
org.apache.bookkeeper.mledger.impl.OpAddEntry.toString(OpAddEntry.java:354)
at
org.apache.bookkeeper.mledger.impl.OpAddEntry.createOpAddEntry(OpAddEntry.java:100)
at
org.apache.bookkeeper.mledger.impl.OpAddEntry.create(OpAddEntry.java:81)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncAddEntry(ManagedLedgerImpl.java:689)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncAddEntry(PersistentTopic.java:428)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:404)
at
org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:224)
at
org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:161)
at
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1372)
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1376)
at
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1265)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1302)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
The only place we could be throwing an NPE is here:
`log.debug("1. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op != null
? op.toString() : null);`
So, `op` isn't null until the null check passes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]