devinbost commented on issue #6054: URL: https://github.com/apache/pulsar/issues/6054#issuecomment-873367594
I mapped out more of the ack flow, so I will add it to what I documented [here](https://github.com/apache/pulsar/issues/6054#issuecomment-839495126) and make it more readable. The bracket notation below is intended to specify an instance of the class (to distinguish from a static method call.) When `ProducerImpl` sends the messages, it builds `newSend` command instances, which get picked up by `ServerCnx.handleSend(..)`, which goes to `Producer.publishMessage(..)`, then `PersistentTopic.publishMessage(..)` which calls `asyncAddEntry(headersAndPayload, publishContext)`, which calls `[PersistentTopic].ledger.asyncAddEntry(headersAndPayload, (int) publishContext.getNumberOfMessages(), this, publishContext)` which creates the `OpAddEntry` and calls `internalAsyncAddEntry(addOperation)` on a different thread, which adds `OpAddEntry` to `[ManagedLedgerImpl].pendingAddEntries` From somewhere (it's not clear to me exactly where yet) we call `OpAddEntry.safeRun()`, which polls `pendingAddEntries`, gets the callback on `OpAddEntry` (which is the `PersistentTopic` instance) and calls `[PersistentTopic].addComplete(lastEntry, data.asReadOnly(), ctx)`, which calls `publishContext.completed(..)` on `Producer.MessagePublishContext`, which calls `Producer.ServerCnx.execute(this)` on the `MessagePublishContext`, which calls `MessagePublishContext.run()`, which triggers `Producer.ServerCnx.getCommandSender().sendSendReceiptResponse(..)` [SEND_RECEIPT], which writes a `newSendReceiptCommand` to the channel. From there, the client gets the `SEND_RECEIPT` command from `PulsarDecoder`, which calls `[ClientCnx].handleSendReceipt(..)`, which calls `[ProducerImpl].ackReceived(..)`, which calls `releaseSemaphoreForSendOp(..)` to release the semaphore. The semaphore doesn't block until `maxPendingMessages` is reached, which is 1000 by default. After a rollover, `pendingAddEntries` is also polled. (Could this access race with the other path that's polling `pendingAddEntries`?) Here's how this happens: Something triggers `[ManagedLedgerImpl].createComplete(..)`, which calls `[ManagedLedgerImpl].updateLedgersListAfterRollover(MetaStoreCallback<Void>)`, which calls `[MetaStoreImpl].asyncUpdateLedgerIds(..)`, which calls `callback.operationComplete(..)` on the `MetaStoreCallback<Void>`, which calls `[ManagedLedgerImpl].updateLedgersIdsComplete(..)`, which polls for `pendingAddEntries` which calls `op.initiate()` on each `OpAddEntry` in `pendingAddEntries`, which calls `ledger.asyncAddEntry(..)`, which calls `[LedgerHandle].asyncAddEntry(..)`, which calls `[LedgerHandle].doAsyncAddEntry(op)`, which adds the `op` to `pendingAddOps` on the `LedgerHandle` instance, which calls `[PendingAddOp].cb.addCompleteWithLatency(..)` which calls the callback on `[PendingAddOp]`, which is `[OpAddEntry]` which calls `addCompleteWithLatency` on the `AddCallback` interface, which calls `[OpAddEntry].addComplete(..)` which, if it succeeded: changes its state and recycles it if completed or if there was a failure, it triggers `[OpAddEntry].safeRun()` which removes itself from `[ManagedLedgerImpl].pendingAddEntries` and triggers `cb.addComplete(..)` on `[PersistentTopic]`, which triggers `publishContext.completed(..)` on `[Producer.MessagePublishContext]` which triggers `producer.publishOperationCompleted()` which decrements `pendingPublishAcks` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
