devinbost commented on issue #6054:
URL: https://github.com/apache/pulsar/issues/6054#issuecomment-839495126


   I traced the call chain for where ack's should be getting sent, and I filled 
in some additional steps:
   
   When `ProducerImpl` sends the messages, it builds `newSend` command 
instances, which get picked up by `ServerCnx.handleSend(..)`, which goes to 
`Producer.publishMessage(..)`, then `PersistentTopic.publishMessage(..)`
   which calls `asyncAddEntry(headersAndPayload, publishContext)`, which calls 
`[PersistentTopic].ledger.asyncAddEntry(headersAndPayload, (int) 
publishContext.getNumberOfMessages(), this, publishContext)`
   which creates the `OpAddEntry` and calls 
`internalAsyncAddEntry(addOperation)` on a different thread, which adds 
`OpAddEntry` to `[ManagedLedgerImpl].pendingAddEntries`
   
   From somewhere (it's not clear to me exactly where yet) we call 
`OpAddEntry.safeRun()`, which polls `pendingAddEntries`, gets the callback on 
`OpAddEntry` (which is the `PersistentTopic` instance) and calls 
`[PersistentTopic].addComplete(lastEntry, data.asReadOnly(), ctx)`, which calls 
`publishContext.completed(..)` on `Producer.MessagePublishContext`, which calls 
`Producer.ServerCnx.execute(this)` on the `MessagePublishContext`, which calls 
`MessagePublishContext.run()`, which triggers 
`Producer.ServerCnx.getCommandSender().sendSendReceiptResponse(..)` 
[SEND_RECEIPT], which writes a `newSendReceiptCommand` to the channel.
   
   So, I added a lot of logging, and to my great surprise, I'm getting an NPE 
after adding these debug lines to `OpAddEntry.createOpAddEntry(..)`:
   
   ```
       private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf 
data, AddEntryCallback callback, Object ctx) {
           log.debug("Running OpAddEntry.createOpAddEntry(..)");
           OpAddEntry op = RECYCLER.get();
           log.debug("1. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op 
!= null ? op.toString() : null);
           op.ml = ml;
           op.ledger = null;
           op.data = data.retain();
           op.dataLength = data.readableBytes();
           op.callback = callback;
           op.ctx = ctx;
           op.addOpCount = 
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
           op.closeWhenDone = false;
           op.entryId = -1;
           op.startTime = System.nanoTime();
           op.state = State.OPEN;
           ml.mbean.addAddEntrySample(op.dataLength);
           log.debug("2. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op 
!= null ? op.toString() : null);
           return op;
       }
   ```
   
   Here's the stack trace:
   
   ```
      2021-05-12T06:14:19,569 [pulsar-io-28-32] DEBUG 
org.apache.bookkeeper.mledger.impl.OpAddEntry - Running 
OpAddEntry.createOpAddEntry(..)
       2021-05-12T06:14:19,569 [pulsar-io-28-32] WARN  
org.apache.pulsar.broker.service.ServerCnx - [/10.20.20.160:37696] Got 
exception java.lang.NullPointerException
        at 
org.apache.bookkeeper.mledger.impl.OpAddEntry.toString(OpAddEntry.java:354)
        at 
org.apache.bookkeeper.mledger.impl.OpAddEntry.createOpAddEntry(OpAddEntry.java:100)
        at 
org.apache.bookkeeper.mledger.impl.OpAddEntry.create(OpAddEntry.java:81)
        at 
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncAddEntry(ManagedLedgerImpl.java:689)
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncAddEntry(PersistentTopic.java:428)
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:404)
        at 
org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:224)
        at 
org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:161)
        at 
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1372)
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
        at 
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1376)
        at 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1265)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1302)
        at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at 
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   The only place we could be throwing an NPE is here:
   `log.debug("1. In OpAddEntry.createOpAddEntry, OpAddEntry is {}", op != null 
? op.toString() : null);`
   
   So, `op` isn't null until the null check passes?
   
   Seems like this issue: https://github.com/apache/pulsar/issues/10433
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to