liangyepianzhou opened a new issue, #16124:
URL: https://github.com/apache/pulsar/issues/16124
**Describe the bug**
When consumer reconnects to broker due to channelInactive, the cnx will be
null, but doTransactionAcknowledgeForResponse will call
`cnx().newAckForReceipt(cmd, requested);` and then NullPointerException will be
reported.
* connectionClosed
``` java
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
...
state.setState(State.Connecting);
....
}
```
* doAcknowledge
```java
if (getState() != State.Ready && getState() != State.Connecting) {
...
}
if (txn != null) {
return doTransactionAcknowledgeForResponse(messageId, ackType,
null, properties,
new TxnID(txn.getTxnIdMostBits(),
txn.getTxnIdLeastBits()));
}
return
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId,
ackType, properties);
```
* doTransactionAcknowledgeForResponse
```java
...
return cnx().newAckForReceipt(cmd, requested);
```
**To Reproduce**
Steps to reproduce the behavior:
1. new a consumer
2. restart broker
3. use the consumer to ack message with a transaction.
4. See error
Or you can use this test to reproduce the error.
```java
public void testName() throws Exception {
String topic = NAMESPACE1 + "/test1";
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
consumer.getSubscription();
for (int i = 0; i < 10; i++) {
producer.newMessage().value(Bytes.toBytes(i)).send();
}
Method method = ConsumerImpl.class.getDeclaredMethod("cnx");
method.setAccessible(true);
ClientCnx cnx = (ClientCnx) method.invoke(consumer);
Method method1 =
ConsumerImpl.class.getDeclaredMethod("connectionClosed", ClientCnx.class);
method1.setAccessible(true);
method1.invoke(consumer, cnx);
for (int i = 0; i <10 ; i++) {
Message<byte[]> message = consumer.receive();
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
consumer.acknowledgeAsync(message.getMessageId(), transaction);
transaction.commit().get();
}
}
```
**Screenshots**
The calling logic can be judged according to the log.
1. channelInactive
2. connectionClosed
3. doAcknowledge
4. doTransactionAcknowledgeForResponse
<img width="1788" alt="image"
src="https://user-images.githubusercontent.com/55571188/174433454-36e792ba-8fb5-4559-a3fe-9a4c7246745f.png">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]