liangyepianzhou opened a new issue, #16124:
URL: https://github.com/apache/pulsar/issues/16124

   **Describe the bug**
   When consumer reconnects to broker due to channelInactive, the cnx will be 
null, but doTransactionAcknowledgeForResponse will call 
`cnx().newAckForReceipt(cmd, requested);` and then NullPointerException will be 
reported.
   * connectionClosed
   ``` java
          if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
          ...
          state.setState(State.Connecting);
          ....
          }
   ```
   * doAcknowledge
   ```java
           if (getState() != State.Ready && getState() != State.Connecting) {
           ...
           }
           if (txn != null) {
               return doTransactionAcknowledgeForResponse(messageId, ackType, 
null, properties,
                       new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
           }
           return 
acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, 
ackType, properties);
   
   ```
   * doTransactionAcknowledgeForResponse
   ```java
   ...
   return cnx().newAckForReceipt(cmd, requested);
   ```
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. new a consumer 
   2. restart broker
   3. use the consumer to ack message with a transaction.
   4. See error
   
   Or you can use this  test to reproduce the error.
   ```java
       public void testName() throws Exception {
           String topic = NAMESPACE1 + "/test1";
           @Cleanup
           Producer<byte[]> producer = pulsarClient
                   .newProducer(Schema.BYTES)
                   .topic(topic)
                   .sendTimeout(0, TimeUnit.SECONDS)
                   .create();
   
           @Cleanup
           Consumer<byte[]> consumer = pulsarClient
                   .newConsumer()
                   .topic(topic)
                   .subscriptionName("sub")
                   .subscribe();
           consumer.getSubscription();
   
           for (int i = 0; i < 10; i++) {
               producer.newMessage().value(Bytes.toBytes(i)).send();
           }
           Method method = ConsumerImpl.class.getDeclaredMethod("cnx");
           method.setAccessible(true);
           ClientCnx cnx = (ClientCnx) method.invoke(consumer);
           Method method1 = 
ConsumerImpl.class.getDeclaredMethod("connectionClosed", ClientCnx.class);
           method1.setAccessible(true);
           method1.invoke(consumer, cnx);
   
           for (int i = 0; i <10 ; i++) {
               Message<byte[]> message = consumer.receive();
               Transaction transaction = pulsarClient
                       .newTransaction()
                       .withTransactionTimeout(5, TimeUnit.SECONDS)
                       .build().get();
               consumer.acknowledgeAsync(message.getMessageId(), transaction);
               transaction.commit().get();
           }
       }
   ```
   **Screenshots**
   The calling logic can be judged according to the log.
   1. channelInactive
   2. connectionClosed
   3. doAcknowledge
   4. doTransactionAcknowledgeForResponse
   
   <img width="1788" alt="image" 
src="https://user-images.githubusercontent.com/55571188/174433454-36e792ba-8fb5-4559-a3fe-9a4c7246745f.png";>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to