This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c5a6a0b78b3538098eb04466c8242128144011a9 Author: Xiangying Meng <[email protected]> AuthorDate: Tue Jun 21 15:33:23 2022 +0800 [fix][txn] Fix NPE when ack message with transaction at cnx = null (#16142) Fix https://github.com/apache/pulsar/issues/16124 ## Motivation When a channel is inactive, connectHandler will set the cnx = null and reconnect. At this time, consumers use transaction to ack messages will report NPE. ## Modification Return exception when cnx = null. **Why not use a queue to store operations?** 1. If we use a queue to store op, we need to take care of the timeout of the op. And the lock is required. 2. If the connection time is long or there is a BUG client that has not been connected, the client will crash. (cherry picked from commit 53cc84a580dd747685905e1d11b8e19c0e59a614) --- .../pulsar/broker/transaction/TransactionTest.java | 41 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 9 ++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 3f6ea313652..68e73d895e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.Bytes; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -87,6 +88,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -109,6 +111,7 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -992,6 +995,44 @@ public class TransactionTest extends TransactionTestBase { transaction.commit().get(); } + @Test + public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception { + String topic = NAMESPACE1 + "/testGetConnectExceptionForAckMsgWhenCnxIsNull"; + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient + .newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.newMessage().value(Bytes.toBytes(i)).send(); + } + ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx"); + Whitebox.invokeMethod(consumer, "connectionClosed", cnx); + + Message<byte[]> message = consumer.receive(); + Transaction transaction = pulsarClient + .newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + + try { + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.ConnectException); + } + } + + @Test public void testPendingAckBatchMessageCommit() throws Exception { String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b4ee5a2e784..9422877ef63 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2644,7 +2644,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } else { unAckedMessageTracker.remove(messageId); } - return cnx().newAckForReceipt(cmd, requestId); + ClientCnx cnx = cnx(); + if (cnx == null) { + return FutureUtil.failedFuture(new PulsarClientException + .ConnectException("Failed to ack message [" + messageId + "] " + + "for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState())); + } else { + return cnx.newAckForReceipt(cmd, requestId); + } } public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
