This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59e0cfb  Ack response implement (#8161)
59e0cfb is described below

commit 59e0cfb580e4e6d22e1ea8284f5866c1c5a6fd07
Author: congbo <[email protected]>
AuthorDate: Wed Oct 14 15:40:08 2020 +0800

    Ack response implement (#8161)
    
    ## Motivation
    Now acknowledge messages will not get response, but acknowledge messages 
with transaction will check the acknowledge whether conflict with other 
transaction in pendingack. We can commit or abort the transaction after we get 
the response.
    
    ## Modification
    - We add requestId field in ack protocol and it type is optional.
    - Server handle the ack command, if the command carry the requestId, server 
will return the response.
    - In normal, we don't need to get response. Unless we ack with transaction.
    
    ## Compatibility
    This PR is for transaction, and don't change any client api, so we don't 
need to think about the compatibility question.
---
 .../broker/service/BrokerServiceException.java     |   3 +
 .../org/apache/pulsar/broker/service/Consumer.java |  28 ++---
 .../apache/pulsar/broker/service/ServerCnx.java    |  14 ++-
 .../service/persistent/PersistentSubscription.java |  14 +--
 .../persistent/PersistentSubscriptionTest.java     |  19 ++--
 .../client/impl/ConsumerAckResponseTest.java       |  95 ++++++++++++++++
 .../TransactionEndToEndTest.java}                  |  10 +-
 .../pulsar/client/api/PulsarClientException.java   |  62 ++++++++++
 pulsar-client-cpp/include/pulsar/Result.h          |   1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |   3 +
 pulsar-client-cpp/lib/Result.cc                    |   3 +
 .../pulsar/client/impl/BatchMessageAcker.java      |   4 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  18 +++
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 126 ++++++++++++++++++++-
 .../PersistentAcknowledgmentsGroupingTracker.java  |   4 +-
 .../impl/conf/ConsumerConfigurationData.java       |   4 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 117 +++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  40 +++----
 .../pulsar/common/protocol/PulsarDecoder.java      |  11 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |   4 +
 21 files changed, 518 insertions(+), 65 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 7ec97ff..d4bcd8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -22,6 +22,7 @@ import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 
 /**
@@ -214,6 +215,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.InvalidTxnStatus;
         } else if (t instanceof NotAllowedException) {
             return ServerError.NotAllowedError;
+        } else if (t instanceof TransactionConflictException) {
+            return ServerError.TransactionConflict;
         } else {
             if (checkCauseIfUnknown) {
                 return getClientErrorCode(t.getCause(), false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 0edb24d..0e93d35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
@@ -60,6 +61,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
@@ -388,24 +390,22 @@ public class Consumer {
         });
     }
 
-    void messageAcked(CommandAck ack) {
+    CompletableFuture<Void> messageAcked(CommandAck ack) {
         this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String,Long> properties = Collections.emptyMap();
         if (ack.getPropertiesCount() > 0) {
             properties = ack.getPropertiesList().stream()
-                .collect(Collectors.toMap((e) -> e.getKey(),
-                                          (e) -> e.getValue()));
+                .collect(Collectors.toMap(PulsarApi.KeyLongValue::getKey,
+                        PulsarApi.KeyLongValue::getValue));
         }
 
         if (ack.getAckType() == AckType.Cumulative) {
             if (ack.getMessageIdCount() != 1) {
                 log.warn("[{}] [{}] Received multi-message ack", subscription, 
consumerId);
-                return;
             }
 
             if (Subscription.isIndividualAckMode(subType)) {
                 log.warn("[{}] [{}] Received cumulative ack on shared 
subscription, ignoring", subscription, consumerId);
-                return;
             }
             PositionImpl position = PositionImpl.earliest;
             if (ack.getMessageIdCount() == 1) {
@@ -418,7 +418,7 @@ public class Consumer {
             }
             List<Position> positionsAcked = 
Collections.singletonList(position);
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
-                transactionAcknowledge(ack.getTxnidMostBits(), 
ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
+                return transactionAcknowledge(ack.getTxnidMostBits(), 
ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
             } else {
                 subscription.acknowledgeMessage(positionsAcked, 
AckType.Cumulative, properties);
             }
@@ -445,24 +445,24 @@ public class Consumer {
                 }
             }
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
-                transactionAcknowledge(ack.getTxnidMostBits(), 
ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
+                return transactionAcknowledge(ack.getTxnidMostBits(), 
ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
             } else {
                 subscription.acknowledgeMessage(positionsAcked, 
AckType.Individual, properties);
             }
         }
+
+        return CompletableFuture.completedFuture(null);
     }
 
-    private void transactionAcknowledge(long txnidMostBits, long 
txnidLeastBits,
+    private CompletableFuture<Void> transactionAcknowledge(long txnidMostBits, 
long txnidLeastBits,
                                         List<Position> positionList, AckType 
ackType) {
         if (subscription instanceof PersistentSubscription) {
             TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
-            try {
-                ((PersistentSubscription) 
subscription).acknowledgeMessage(txnID, positionList, ackType);
-            } catch (TransactionConflictException e) {
-                log.error("Transaction acknowledge failed for txn " + txnID, 
e);
-            }
+            return ((PersistentSubscription) 
subscription).acknowledgeMessage(txnID, positionList, ackType);
         } else {
-            log.error("Transaction acknowledge only support the 
`PersistentSubscription`.");
+            String error = "Transaction acknowledge only support the 
`PersistentSubscription`.";
+            log.error(error);
+            return FutureUtil.failedFuture(new 
TransactionConflictException(error));
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aa72c7a..505ecf2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1264,7 +1264,19 @@ public class ServerCnx extends PulsarHandler {
         CompletableFuture<Consumer> consumerFuture = 
consumers.get(ack.getConsumerId());
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
-            consumerFuture.getNow(null).messageAcked(ack);
+            consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
+                        if (ack.hasRequestId()) {
+                            ctx.writeAndFlush(Commands.newAckResponse(
+                                    ack.getRequestId(), null, null, 
ack.getConsumerId()));
+                        }
+                    }).exceptionally(e -> {
+                        if (ack.hasRequestId()) {
+                            
ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(),
+                                    
BrokerServiceException.getClientErrorCode(e),
+                                    e.getMessage(), ack.getConsumerId()));
+                        }
+                        return null;
+                    });
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 90be062..20ba166 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -67,7 +67,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsS
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -416,7 +415,7 @@ public class PersistentSubscription implements Subscription 
{
      *  cumulative ack or try to single ack message already acked by any 
ongoing transaction.
      * @throws IllegalArgumentException if try to cumulative ack but passed in 
multiple positions.
      */
-    public synchronized void acknowledgeMessage(TxnID txnId, List<Position> 
positions, AckType ackType) throws TransactionConflictException {
+    public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID 
txnId, List<Position> positions, AckType ackType) {
         checkArgument(txnId != null, "TransactionID can not be null.");
         if (AckType.Cumulative == ackType) {
             // Check if another transaction is already using cumulative ack on 
this subscription.
@@ -425,14 +424,14 @@ public class PersistentSubscription implements 
Subscription {
                                   " try to cumulative ack message while 
transaction:" + this.pendingCumulativeAckTxnId +
                                   " already cumulative acked messages.";
                 log.error(errorMsg);
-                throw new TransactionConflictException(errorMsg);
+                return FutureUtil.failedFuture(new 
TransactionConflictException(errorMsg));
             }
 
             if (positions.size() != 1) {
                 String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
                                   " invalid cumulative ack received with 
multiple message ids.";
                 log.error(errorMsg);
-                throw new IllegalArgumentException(errorMsg);
+                return FutureUtil.failedFuture(new 
TransactionConflictException(errorMsg));
             }
 
             Position position = positions.get(0);
@@ -443,7 +442,7 @@ public class PersistentSubscription implements Subscription 
{
                         " try to cumulative ack position: " + position + " 
within range of cursor's " +
                         "markDeletePosition: " + 
cursor.getMarkDeletedPosition();
                 log.error(errorMsg);
-                throw new TransactionConflictException(errorMsg);
+                return FutureUtil.failedFuture(new 
TransactionConflictException(errorMsg));
             }
 
             if (log.isDebugEnabled()) {
@@ -481,7 +480,7 @@ public class PersistentSubscription implements Subscription 
{
                     String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
                                       " try to ack message:" + position + " in 
pending ack status.";
                     log.error(errorMsg);
-                    throw new TransactionConflictException(errorMsg);
+                    return FutureUtil.failedFuture(new 
TransactionConflictException(errorMsg));
                 }
 
                 // If try to ack message already acked by committed 
transaction or normal acknowledge, throw exception.
@@ -489,13 +488,14 @@ public class PersistentSubscription implements 
Subscription {
                     String errorMsg = "[" + topicName + "][" + subName + "] 
Transaction:" + txnId +
                             " try to ack message:" + position + " already 
acked before.";
                     log.error(errorMsg);
-                    throw new TransactionConflictException(errorMsg);
+                    return FutureUtil.failedFuture(new 
TransactionConflictException(errorMsg));
                 }
 
                 pendingAckMessageForCurrentTxn.add(position);
                 this.pendingAckMessages.add(position);
             }
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     private final MarkDeleteCallback markDeleteCallback = new 
MarkDeleteCallback() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 9d75762..4379375 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -62,7 +63,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.compaction.Compactor;
-import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
@@ -165,7 +165,7 @@ public class PersistentSubscriptionTest {
     }
 
     @Test
-    public void testCanAcknowledgeAndCommitForTransaction() throws 
TransactionConflictException {
+    public void testCanAcknowledgeAndCommitForTransaction() {
         List<Position> expectedSinglePositions = new ArrayList<>();
         expectedSinglePositions.add(new PositionImpl(1, 1));
         expectedSinglePositions.add(new PositionImpl(1, 3));
@@ -208,7 +208,7 @@ public class PersistentSubscriptionTest {
     }
 
     @Test
-    public void testCanAcknowledgeAndAbortForTransaction() throws 
TransactionConflictException, BrokerServiceException {
+    public void testCanAcknowledgeAndAbortForTransaction() throws 
BrokerServiceException, InterruptedException {
         List<Position> positions = new ArrayList<>();
         positions.add(new PositionImpl(2, 1));
         positions.add(new PositionImpl(2, 3));
@@ -242,10 +242,10 @@ public class PersistentSubscriptionTest {
 
         // Can not single ack message already acked.
         try {
-            persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Individual);
+            persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Individual).get();
             fail("Single acknowledge for transaction2 should fail. ");
-        } catch (TransactionConflictException e) {
-            
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
 " +
+        } catch (ExecutionException e) {
+            
assertEquals(e.getCause().getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
 " +
                     "Transaction:(1,2) try to ack message:2:1 in pending ack 
status.");
         }
 
@@ -254,11 +254,10 @@ public class PersistentSubscriptionTest {
 
         // Can not cumulative ack message for another txn.
         try {
-            persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Cumulative);
+            persistentSubscription.acknowledgeMessage(txnID2, positions, 
AckType.Cumulative).get();
             fail("Cumulative acknowledge for transaction2 should fail. ");
-        } catch (TransactionConflictException e) {
-            System.out.println(e.getMessage());
-            
assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
 " +
+        } catch (ExecutionException e) {
+            
assertEquals(e.getCause().getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName]
 " +
                 "Transaction:(1,2) try to cumulative ack message while 
transaction:(1,1) already cumulative acked messages.");
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
new file mode 100644
index 0000000..55fe046
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+
+public class ConsumerAckResponseTest extends ProducerConsumerBase {
+
+    private final static TransactionImpl transaction = 
mock(TransactionImpl.class);
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+        doReturn(1L).when(transaction).getTxnIdLeastBits();
+        doReturn(1L).when(transaction).getTxnIdMostBits();
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        doReturn(completableFuture).when(transaction).registerAckOp(any());
+        doNothing().when(transaction).registerAckedTopic(any(), any());
+
+        Thread.sleep(1000 * 3);
+    }
+
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAckResponse() throws PulsarClientException, 
InterruptedException, ExecutionException {
+        String topic = "testAckResponse";
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscribe();
+        producer.send(1);
+        producer.send(2);
+        try {
+            consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), 
transaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.TransactionConflictException);
+        }
+        Message<Integer> message = consumer.receive();
+        consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
similarity index 98%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 0da8cca..f9a5c44 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.transaction;
+package org.apache.pulsar.client.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -57,7 +57,7 @@ import org.testng.annotations.Test;
  * End to end transaction test.
  */
 @Slf4j
-public class EndToEndTest extends TransactionTestBase {
+public class TransactionEndToEndTest extends TransactionTestBase {
 
     private final static int TOPIC_PARTITION = 3;
 
@@ -248,9 +248,9 @@ public class EndToEndTest extends TransactionTestBase {
                 Message<byte[]> message = consumer.receive();
                 Assert.assertNotNull(message);
                 log.info("receive msgId: {}", message.getMessageId());
-                consumer.acknowledgeAsync(message.getMessageId(), txn);
+                consumer.acknowledgeAsync(message.getMessageId(), txn).get();
             }
-            Thread.sleep(2000);
+            Thread.sleep(1000);
 
             consumer.redeliverUnacknowledgedMessages();
 
@@ -266,7 +266,7 @@ public class EndToEndTest extends TransactionTestBase {
             for (int i = 0; i < messageCnt; i++) {
                 message = consumer.receive(2, TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
-                consumer.acknowledgeAsync(message.getMessageId(), commitTxn);
+                consumer.acknowledgeAsync(message.getMessageId(), 
commitTxn).get();
                 log.info("receive msgId: {}", message.getMessageId());
             }
 
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 597e0d5..457d87c 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -763,6 +763,58 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    /**
+     * Consumer assign exception thrown by Pulsar client.
+     */
+    public static class MessageAcknowledgeException extends 
PulsarClientException {
+
+        /**
+         * Constructs an {@code MessageAcknowledgeException} with the 
specified cause.
+         *
+         * @param t
+         *        The cause (which is saved for later retrieval by the
+         *        {@link #getCause()} method).  (A null value is permitted,
+         *        and indicates that the cause is nonexistent or unknown.)
+         */
+        public MessageAcknowledgeException(Throwable t) {
+            super(t);
+        }
+
+        /**
+         * Constructs an {@code MessageAcknowledgeException} with the 
specified detail message.
+         * @param msg The detail message.
+         */
+        public MessageAcknowledgeException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Consumer assign exception thrown by Pulsar client.
+     */
+    public static class TransactionConflictException extends 
PulsarClientException {
+
+        /**
+         * Constructs an {@code TransactionConflictException} with the 
specified cause.
+         *
+         * @param t
+         *        The cause (which is saved for later retrieval by the
+         *        {@link #getCause()} method).  (A null value is permitted,
+         *        and indicates that the cause is nonexistent or unknown.)
+         */
+        public TransactionConflictException(Throwable t) {
+            super(t);
+        }
+
+        /**
+         * Constructs an {@code TransactionConflictException} with the 
specified detail message.
+         * @param msg The detail message.
+         */
+        public TransactionConflictException(String msg) {
+            super(msg);
+        }
+    }
+
     // wrap an exception to enriching more info messages.
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
@@ -821,6 +873,10 @@ public class PulsarClientException extends IOException {
             return new CryptoException(msg);
         } else if (t instanceof ConsumerAssignException) {
             return new ConsumerAssignException(msg);
+        } else if (t instanceof MessageAcknowledgeException) {
+            return new MessageAcknowledgeException(msg);
+        } else if (t instanceof TransactionConflictException) {
+            return new TransactionConflictException(msg);
         } else if (t instanceof PulsarClientException) {
             return new PulsarClientException(msg);
         } else if (t instanceof CompletionException) {
@@ -906,6 +962,10 @@ public class PulsarClientException extends IOException {
             return new CryptoException(msg);
         } else if (cause instanceof ConsumerAssignException) {
             return new ConsumerAssignException(msg);
+        } else if (cause instanceof MessageAcknowledgeException) {
+            return new MessageAcknowledgeException(msg);
+        } else if (cause instanceof TransactionConflictException) {
+            return new TransactionConflictException(msg);
         } else if (cause instanceof TopicDoesNotExistException) {
             return new TopicDoesNotExistException(msg);
         } else {
@@ -936,6 +996,8 @@ public class PulsarClientException extends IOException {
                 || t instanceof ChecksumException
                 || t instanceof CryptoException
                 || t instanceof ConsumerAssignException
+                || t instanceof MessageAcknowledgeException
+                || t instanceof TransactionConflictException
                 || t instanceof ProducerBusyException
                 || t instanceof ConsumerBusyException) {
             return false;
diff --git a/pulsar-client-cpp/include/pulsar/Result.h 
b/pulsar-client-cpp/include/pulsar/Result.h
index 6dd7e45..01a2474 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -83,6 +83,7 @@ enum Result
     ResultTransactionCoordinatorNotFoundError,       /// Transaction 
coordinator not found
     ResultInvalidTxnStatusError,                     /// Invalid txn status 
error
     ResultNotAllowedError,                           /// Not allowed
+    ResultTransactionConflict,                       /// Transaction ack 
conflict
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index ae44b52..22cc420 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -120,6 +120,9 @@ static Result getResult(ServerError serverError) {
 
         case NotAllowedError:
             return ResultNotAllowedError;
+
+        case TransactionConflict:
+            return ResultTransactionConflict;
     }
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 3c1c2a8..5f074a4 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -147,6 +147,9 @@ const char* strResult(Result result) {
 
         case ResultNotAllowedError:
             return "ResultNotAllowedError";
+
+        case ResultTransactionConflict:
+            return "ResultTransactionConflict";
     };
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index e34d1a1..e0b8197 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -63,6 +63,10 @@ class BatchMessageAcker {
         return bitSet.isEmpty();
     }
 
+    public synchronized int getBitSetSize() {
+        return bitSet.size();
+    }
+
     public synchronized boolean ackCumulative(int batchIndex) {
         // +1 since to argument is exclusive
         bitSet.clear(0, batchIndex + 1);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 936b27f..056b75a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
@@ -90,6 +91,7 @@ import 
org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -401,6 +403,20 @@ public class ClientCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleAckResponse(CommandAckResponse ackResponse) {
+        checkArgument(state == State.Ready);
+        checkArgument(ackResponse.getRequestId() >= 0);
+        long consumerId = ackResponse.getConsumerId();
+        if (!ackResponse.hasError()) {
+            consumers.get(consumerId).ackReceipt(ackResponse.getRequestId());
+        } else {
+            consumers.get(consumerId).ackError(ackResponse.getRequestId(),
+                    getPulsarClientException(ackResponse.getError(), 
ackResponse.getMessage()));
+        }
+    }
+
+
+    @Override
     protected void handleMessage(CommandMessage cmdMessage, ByteBuf 
headersAndPayload) {
         checkArgument(state == State.Ready);
 
@@ -1029,6 +1045,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.ConsumerAssignException(errorMsg);
         case NotAllowedError:
             return new PulsarClientException.NotAllowedException(errorMsg);
+        case TransactionConflict:
+            return new 
PulsarClientException.TransactionConflictException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index db1b661..8c67f2c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -448,7 +448,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             // committed
             txn.registerAckedTopic(getTopic(), subscription);
             // register the ackFuture as part of the transaction
-            return txn.registerAckOp(ackFuture);
+            txn.registerAckOp(ackFuture);
+            return ackFuture;
         } else {
             return ackFuture;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index bfbc6ed..5b6bcfa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -77,7 +77,9 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import 
org.apache.pulsar.client.api.PulsarClientException.MessageAcknowledgeException;
 import 
org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -87,6 +89,7 @@ import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
@@ -103,8 +106,10 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,6 +191,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final boolean createTopicIfDoesNotExist;
 
+    private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
+
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
@@ -328,6 +335,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
+        this.ackRequests = new ConcurrentLongHashMap<>(16, 1);
 
         grabCnx();
     }
@@ -535,6 +543,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return FutureUtil.failedFuture(exception);
         }
 
+        if (txn != null) {
+            return doAcknowledgeForResponse(messageId, ackType, null, 
properties,
+                    new TxnID(txn.getTxnIdMostBits(), 
txn.getTxnIdLeastBits()));
+        }
+
         if (messageId instanceof BatchMessageIdImpl) {
             BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
             if (ackType == AckType.Cumulative && txn != null) {
@@ -1037,6 +1050,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             possibleSendToDeadLetterTopicMessages.clear();
         }
 
+        if (!ackRequests.isEmpty()) {
+            ackRequests.forEach((key, value) -> {
+                value.callback
+                        .completeExceptionally(new 
MessageAcknowledgeException("Consumer has closed!"));
+                value.recycle();
+            });
+
+            ackRequests.clear();
+        }
+
         acknowledgmentsGroupingTracker.close();
         if (batchReceiveTimeout != null) {
             batchReceiveTimeout.cancel();
@@ -2344,6 +2367,107 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         pendingChunckedMessageCount--;
     }
 
+    private CompletableFuture<Void> doAcknowledgeForResponse(MessageId 
messageId, AckType ackType,
+                                                             ValidationError 
validationError,
+                                                             Map<String, Long> 
properties, TxnID txnID) {
+        CompletableFuture<Void> callBack = new CompletableFuture<>();
+        BitSetRecyclable bitSetRecyclable = null;
+        long ledgerId;
+        long entryId;
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            bitSetRecyclable = BitSetRecyclable.create();
+            ledgerId = batchMessageId.getLedgerId();
+            entryId = batchMessageId.getEntryId();
+            if (ackType == AckType.Cumulative) {
+                batchMessageId.ackCumulative();
+                bitSetRecyclable.set(0, 
batchMessageId.getAcker().getBitSetSize());
+                bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1);
+            } else {
+                batchMessageId.ackIndividual();
+                bitSetRecyclable.set(0, 
batchMessageId.getAcker().getBitSetSize());
+                bitSetRecyclable.clear(batchMessageId.getBatchIndex());
+            }
+        } else {
+            MessageIdImpl singleMessage = (MessageIdImpl) messageId;
+            ledgerId = singleMessage.getLedgerId();
+            entryId = singleMessage.getEntryId();
+        }
+        long requestId = client.newRequestId();
+        ByteBuf cmd = Commands.newAck(consumerId, ledgerId, entryId,
+                bitSetRecyclable, ackType,
+                validationError, properties, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), requestId);
+        OpForAckCallBack op = OpForAckCallBack.create(cmd, callBack, messageId,
+                new TxnID(txnID.getMostSigBits(), txnID.getLeastSigBits()));
+        ackRequests.put(requestId, op);
+        unAckedMessageTracker.remove(messageId);
+        cmd.retain();
+        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        return callBack;
+    }
+
+    protected void ackReceipt(long requestId) {
+        OpForAckCallBack callBackOp = ackRequests.remove(requestId);
+        if (callBackOp == null || callBackOp.callback.isDone()) {
+            log.info("Ack request has been handled requestId : {}", requestId);
+        } else {
+            callBackOp.callback.complete(null);
+            if (log.isDebugEnabled()) {
+                log.debug("MessageId : {} has ack by TxnId : {}", 
callBackOp.messageId.getLedgerId() + ":"
+                        + callBackOp.messageId.getEntryId(), 
callBackOp.txnID.toString());
+            }
+        }
+    }
+
+    protected void ackError(long requestId, PulsarClientException 
pulsarClientException) {
+        OpForAckCallBack callBackOp = ackRequests.remove(requestId);
+        if (callBackOp == null || callBackOp.callback.isDone()) {
+            log.info("Ack request has been handled requestId : {}", requestId);
+        } else {
+            callBackOp.callback.completeExceptionally(pulsarClientException);
+            if (log.isDebugEnabled()) {
+                log.debug("MessageId : {} has ack by TxnId : {}", 
callBackOp.messageId, callBackOp.txnID);
+            }
+            callBackOp.recycle();
+        }
+    }
+
+    private static class OpForAckCallBack {
+        protected ByteBuf cmd;
+        protected CompletableFuture<Void> callback;
+        protected MessageIdImpl messageId;
+        protected TxnID txnID;
+
+        static OpForAckCallBack create(ByteBuf cmd, CompletableFuture<Void> 
callback,
+                                       MessageId messageId, TxnID txnID) {
+            OpForAckCallBack op = RECYCLER.get();
+            op.callback = callback;
+            op.cmd = cmd;
+            op.messageId = (MessageIdImpl) messageId;
+            op.txnID = txnID;
+            return op;
+        }
+
+        private OpForAckCallBack(Recycler.Handle<OpForAckCallBack> 
recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        void recycle() {
+            if (cmd != null) {
+                ReferenceCountUtil.safeRelease(cmd);
+            }
+            recyclerHandle.recycle(this);
+        }
+
+        private final Recycler.Handle<OpForAckCallBack> recyclerHandle;
+        private static final Recycler<OpForAckCallBack> RECYCLER = new 
Recycler<OpForAckCallBack>() {
+            @Override
+            protected OpForAckCallBack newObject(Handle<OpForAckCallBack> 
handle) {
+                return new OpForAckCallBack(handle);
+            }
+        };
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f6dfb84..62f9f58 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -313,7 +313,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
         }
 
         final ByteBuf cmd = Commands.newAck(consumer.consumerId, 
msgId.ledgerId, msgId.entryId, bitSet, ackType,
-                null, properties, txnidLeastBits, txnidMostBits);
+                null, properties, txnidLeastBits, txnidMostBits, -1);
         bitSet.recycle();
         cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
         return true;
@@ -534,7 +534,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
         } else {
             ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), 
msgId.getEntryId(), lastCumulativeAckSet,
-                    ackType, validationError, map, txnidLeastBits, 
txnidMostBits);
+                    ackType, validationError, map, txnidLeastBits, 
txnidMostBits, -1);
             if (flush) {
                 cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
             } else {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 7f5d65b..e1f6556 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -84,10 +84,10 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
     private long tickDurationMillis = 1000;
 
     private int priorityLevel = 0;
-    
+
     // max pending chunked message to avoid sitting incomplete message into 
the queue and memory
     private int maxPendingChuckedMessage = 10;
-    
+
     private boolean autoAckOldestChunkedMessageOnQueueFull = false;
 
     private long expireTimeOfIncompleteChunkedMessageMillis = 60 * 1000;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 65c4a93..9516c25 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -83,6 +83,7 @@ public final class PulsarApi {
     TransactionCoordinatorNotFound(20, 20),
     InvalidTxnStatus(21, 21),
     NotAllowedError(22, 22),
+    TransactionConflict(23, 23),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -108,6 +109,7 @@ public final class PulsarApi {
     public static final int TransactionCoordinatorNotFound_VALUE = 20;
     public static final int InvalidTxnStatus_VALUE = 21;
     public static final int NotAllowedError_VALUE = 22;
+    public static final int TransactionConflict_VALUE = 23;
     
     
     public final int getNumber() { return value; }
@@ -137,6 +139,7 @@ public final class PulsarApi {
         case 20: return TransactionCoordinatorNotFound;
         case 21: return InvalidTxnStatus;
         case 22: return NotAllowedError;
+        case 23: return TransactionConflict;
         default: return null;
       }
     }
@@ -19071,6 +19074,10 @@ public final class PulsarApi {
     // optional uint64 txnid_most_bits = 7 [default = 0];
     boolean hasTxnidMostBits();
     long getTxnidMostBits();
+    
+    // optional uint64 request_id = 8;
+    boolean hasRequestId();
+    long getRequestId();
   }
   public static final class CommandAck extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -19290,6 +19297,16 @@ public final class PulsarApi {
       return txnidMostBits_;
     }
     
+    // optional uint64 request_id = 8;
+    public static final int REQUEST_ID_FIELD_NUMBER = 8;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       ackType_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType.Individual;
@@ -19298,6 +19315,7 @@ public final class PulsarApi {
       properties_ = java.util.Collections.emptyList();
       txnidLeastBits_ = 0L;
       txnidMostBits_ = 0L;
+      requestId_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -19357,6 +19375,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeUInt64(7, txnidMostBits_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeUInt64(8, requestId_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -19393,6 +19414,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(7, txnidMostBits_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(8, requestId_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -19520,6 +19545,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000020);
         txnidMostBits_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000040);
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
       
@@ -19583,6 +19610,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000010;
         }
         result.txnidMostBits_ = txnidMostBits_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.requestId_ = requestId_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -19624,6 +19655,9 @@ public final class PulsarApi {
         if (other.hasTxnidMostBits()) {
           setTxnidMostBits(other.getTxnidMostBits());
         }
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
         return this;
       }
       
@@ -19718,6 +19752,11 @@ public final class PulsarApi {
               txnidMostBits_ = input.readUInt64();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              requestId_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -20013,6 +20052,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint64 request_id = 8;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000080;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandAck)
     }
     
@@ -20046,6 +20106,10 @@ public final class PulsarApi {
     // optional string message = 5;
     boolean hasMessage();
     String getMessage();
+    
+    // optional uint64 request_id = 6;
+    boolean hasRequestId();
+    long getRequestId();
   }
   public static final class CommandAckResponse extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -20154,12 +20218,23 @@ public final class PulsarApi {
       }
     }
     
+    // optional uint64 request_id = 6;
+    public static final int REQUEST_ID_FIELD_NUMBER = 6;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       txnidLeastBits_ = 0L;
       txnidMostBits_ = 0L;
       error_ = 
org.apache.pulsar.common.api.proto.PulsarApi.ServerError.UnknownError;
       message_ = "";
+      requestId_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -20197,6 +20272,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBytes(5, getMessageBytes());
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeUInt64(6, requestId_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -20225,6 +20303,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBytesSize(5, getMessageBytes());
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(6, requestId_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -20348,6 +20430,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000008);
         message_ = "";
         bitField0_ = (bitField0_ & ~0x00000010);
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -20401,6 +20485,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000010;
         }
         result.message_ = message_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.requestId_ = requestId_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -20422,6 +20510,9 @@ public final class PulsarApi {
         if (other.hasMessage()) {
           setMessage(other.getMessage());
         }
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
         return this;
       }
       
@@ -20484,6 +20575,11 @@ public final class PulsarApi {
               message_ = input.readBytes();
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              requestId_ = input.readUInt64();
+              break;
+            }
           }
         }
       }
@@ -20613,6 +20709,27 @@ public final class PulsarApi {
         
       }
       
+      // optional uint64 request_id = 6;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000020;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
       // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandAckResponse)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index f9b2ca1..5b01e40 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1140,12 +1140,12 @@ public class Commands {
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, 
BitSetRecyclable ackSet, AckType ackType,
                                  ValidationError validationError, Map<String, 
Long> properties) {
-        return newAck(consumerId, ledgerId, entryId, ackSet, ackType, 
validationError, properties, -1, -1);
+        return newAck(consumerId, ledgerId, entryId, ackSet, ackType, 
validationError, properties, -1, -1, -1);
     }
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, 
BitSetRecyclable ackSet, AckType ackType,
                                  ValidationError validationError, Map<String, 
Long> properties, long txnIdLeastBits,
-                                 long txnIdMostBits) {
+                                 long txnIdMostBits, long requestId) {
         CommandAck.Builder ackBuilder = CommandAck.newBuilder();
         ackBuilder.setConsumerId(consumerId);
         ackBuilder.setAckType(ackType);
@@ -1154,6 +1154,7 @@ public class Commands {
         messageIdDataBuilder.setEntryId(entryId);
         if (ackSet != null) {
             
messageIdDataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet.toLongArray()));
+            ackSet.recycle();
         }
         MessageIdData messageIdData = messageIdDataBuilder.build();
         ackBuilder.addMessageId(messageIdData);
@@ -1166,6 +1167,10 @@ public class Commands {
         if (txnIdLeastBits >= 0) {
             ackBuilder.setTxnidLeastBits(txnIdLeastBits);
         }
+
+        if (requestId >= 0) {
+            ackBuilder.setRequestId(requestId);
+        }
         for (Map.Entry<String, Long> e : properties.entrySet()) {
             ackBuilder.addProperties(
                     
KeyLongValue.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build());
@@ -1180,34 +1185,25 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newAckResponse(long consumerId, long txnIdLeastBits, 
long txnIdMostBits) {
+    public static ByteBuf newAckResponse(long requestId, ServerError error, 
String errorMsg, long consumerId) {
         CommandAckResponse.Builder commandAckResponseBuilder = 
CommandAckResponse.newBuilder();
         commandAckResponseBuilder.setConsumerId(consumerId);
-        commandAckResponseBuilder.setTxnidLeastBits(txnIdLeastBits);
-        commandAckResponseBuilder.setTxnidMostBits(txnIdMostBits);
-        CommandAckResponse commandAckResponse = 
commandAckResponseBuilder.build();
-
-        ByteBuf res = serializeWithSize(
-            
BaseCommand.newBuilder().setType(Type.ACK_RESPONSE).setAckResponse(commandAckResponse));
-        commandAckResponseBuilder.recycle();
-        commandAckResponse.recycle();
+        commandAckResponseBuilder.setRequestId(requestId);
 
-        return res;
-    }
+        if (error != null) {
+            commandAckResponseBuilder.setError(error);
+        }
 
-    public static ByteBuf newAckErrorResponse(ServerError error, String 
errorMsg, long consumerId) {
-        CommandAckResponse.Builder ackErrorBuilder = 
CommandAckResponse.newBuilder();
-        ackErrorBuilder.setConsumerId(consumerId);
-        ackErrorBuilder.setError(error);
         if (errorMsg != null) {
-            ackErrorBuilder.setMessage(errorMsg);
+            commandAckResponseBuilder.setMessage(errorMsg);
         }
 
-        CommandAckResponse response = ackErrorBuilder.build();
-        ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK_RESPONSE).setAckResponse(response));
+        CommandAckResponse commandAckResponse = 
commandAckResponseBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+                
.setType(Type.ACK_RESPONSE).setAckResponse(commandAckResponseBuilder));
 
-        ackErrorBuilder.recycle();
-        response.recycle();
+        commandAckResponseBuilder.recycle();
+        commandAckResponse.recycle();
 
         return res;
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 06462b7..123d2dc 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxn;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandAddPartitionToTxnResponse;
@@ -148,6 +149,12 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 cmd.getAck().recycle();
                 break;
 
+            case ACK_RESPONSE:
+                checkArgument(cmd.hasAckResponse());
+                handleAckResponse(cmd.getAckResponse());
+                cmd.getAckResponse().recycle();
+                break;
+
             case CLOSE_CONSUMER:
                 checkArgument(cmd.hasCloseConsumer());
                 safeInterceptCommand(cmd);
@@ -555,6 +562,10 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleAckResponse(CommandAckResponse ackResponse) {
+        throw new UnsupportedOperationException();
+    }
+
     protected void handleFlow(CommandFlow flow) {
         throw new UnsupportedOperationException();
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 9c4b913..e740b11 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -201,6 +201,8 @@ enum ServerError {
     TransactionCoordinatorNotFound = 20; // Transaction coordinator not found 
error
     InvalidTxnStatus = 21; // Invalid txn status error
     NotAllowedError = 22; // Not allowed error
+
+    TransactionConflict = 23; // Ack with transaction conflict
 }
 
 enum AuthMethod {
@@ -508,6 +510,7 @@ message CommandAck {
 
     optional uint64 txnid_least_bits = 6 [default = 0];
     optional uint64 txnid_most_bits = 7 [default = 0];
+    optional uint64 request_id = 8;
 }
 
 message CommandAckResponse {
@@ -516,6 +519,7 @@ message CommandAckResponse {
     optional uint64 txnid_most_bits = 3 [default = 0];
     optional ServerError error = 4;
     optional string message = 5;
+    optional uint64 request_id = 6;
 }
 
 // changes on active consumer

Reply via email to