This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 90764e7a030 [fix][txn] Always send correct transaction id in end txn 
response (#19137)
90764e7a030 is described below

commit 90764e7a03034589764343202196508e7950584e
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Jan 10 07:14:47 2023 +0100

    [fix][txn] Always send correct transaction id in end txn response (#19137)
---
 .../pulsar/broker/service/PulsarCommandSender.java |   2 +-
 .../broker/service/PulsarCommandSenderImpl.java    |   7 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  44 ++--
 .../pulsar/broker/service/ServerCnxTest.java       | 284 ++++++++++++++++++++-
 .../broker/service/utils/ClientChannelHelper.java  |  35 ++-
 .../client/impl/TransactionMetaStoreHandler.java   |  62 +++--
 .../apache/pulsar/common/protocol/Commands.java    |  16 +-
 7 files changed, 382 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 50579b3d741..f25703757d3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -87,7 +87,7 @@ public interface PulsarCommandSender {
 
     void sendNewTxnResponse(long requestId, TxnID txnID, long tcID);
 
-    void sendNewTxnErrorResponse(long requestId, long txnID, ServerError 
error, String message);
+    void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, 
String message);
 
     void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index b5f4d17801c..e7cc25b2c3b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -331,8 +331,8 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     }
 
     @Override
-    public void sendNewTxnErrorResponse(long requestId, long txnID, 
ServerError error, String message) {
-        BaseCommand command = Commands.newTxnResponse(requestId, txnID, error, 
message);
+    public void sendNewTxnErrorResponse(long requestId, long tcID, ServerError 
error, String message) {
+        BaseCommand command = Commands.newTxnResponse(requestId, tcID, error, 
message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
@@ -352,7 +352,8 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
 
     @Override
     public void sendEndTxnErrorResponse(long requestId, TxnID txnID, 
ServerError error, String message) {
-        BaseCommand command = Commands.newEndTxnResponse(requestId, 
txnID.getMostSigBits(), error, message);
+        BaseCommand command = Commands.newEndTxnResponse(requestId, 
txnID.getLeastSigBits(),
+                txnID.getMostSigBits(), error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9e80e98064a..a61468c6469 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2406,7 +2406,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     } else {
                         ex = handleTxnException(ex, 
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
 
-                        
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
+                        
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+                                txnID.getLeastSigBits(),
+                                txnID.getMostSigBits(),
                                 BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
@@ -2451,7 +2453,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final long requestId = command.getRequestId();
         final String topic = command.getTopic();
         final int txnAction = command.getTxnAction().getValue();
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
         final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
 
         if (log.isDebugEnabled()) {
@@ -2487,7 +2489,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                         ServerError.ServiceNotReady,
                                         "The topic " + topic + " does not 
exist in broker.",
-                                        txnID.getMostSigBits(), 
txnID.getLeastSigBits()));
+                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             } else {
                                 log.warn("handleEndTxnOnPartition fail ! The 
topic {} has not been created, "
                                                 + "txnId: [{}], txnAction: 
[{}]",
@@ -2496,13 +2498,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                         txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
-                    log.error("handleEndTxnOnPartition fail ! topic {}, "
-                                    + "txnId: [{}], txnAction: [{}]", topic, 
txnID,
-                            TxnAction.valueOf(txnAction), e.getCause());
-                    ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
-                            requestId, ServerError.ServiceNotReady,
-                            e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
-                    return null;
+                            log.error("handleEndTxnOnPartition fail ! topic 
{}, "
+                                            + "txnId: [{}], txnAction: [{}]", 
topic, txnID,
+                                    TxnAction.valueOf(txnAction), 
e.getCause());
+                            
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                                    requestId, ServerError.ServiceNotReady,
+                                    e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                            return null;
                 });
             }
         }).exceptionally(e -> {
@@ -2556,7 +2558,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                 requestId, txnidLeastBits, txnidMostBits,
                                 BrokerServiceException.getClientErrorCode(e),
-                                "Handle end txn on subscription failed."));
+                                "Handle end txn on subscription failed: " + 
e.getMessage()));
                         return;
                     }
                     ctx.writeAndFlush(
@@ -2569,7 +2571,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             if (b) {
                                 log.error("handleEndTxnOnSubscription fail! 
The topic {} does not exist in broker, "
                                                 + "subscription: {}, txnId: 
[{}], txnAction: [{}]", topic, subName,
-                                        new TxnID(txnidMostBits, 
txnidLeastBits), TxnAction.valueOf(txnAction));
+                                        txnID, TxnAction.valueOf(txnAction));
                                 
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                         requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
                                         ServerError.ServiceNotReady,
@@ -2582,13 +2584,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                         txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
-                    log.error("handleEndTxnOnSubscription fail ! topic {}, 
subscription: {}"
-                                    + "txnId: [{}], txnAction: [{}]", topic, 
subName,
-                            txnID, TxnAction.valueOf(txnAction), e.getCause());
-                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                            requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
-                            ServerError.ServiceNotReady, e.getMessage()));
-                    return null;
+                            log.error("handleEndTxnOnSubscription fail ! topic 
{}, subscription: {}"
+                                            + "txnId: [{}], txnAction: [{}]", 
topic, subName,
+                                    txnID, TxnAction.valueOf(txnAction), 
e.getCause());
+                            
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                    requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
+                                    ServerError.ServiceNotReady, 
e.getMessage()));
+                            return null;
                 });
             }
         }).exceptionally(e -> {
@@ -2598,7 +2600,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                     requestId, txnidLeastBits, txnidMostBits,
                     ServerError.ServiceNotReady,
-                    "Handle end txn on subscription failed."));
+                    "Handle end txn on subscription failed: " + 
e.getMessage()));
             return null;
         });
     }
@@ -2655,7 +2657,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     } else {
                         ex = handleTxnException(ex, 
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
 
-                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, 
txnID.getLeastSigBits(),
                                 txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 97b58d99e3d..1a98822340f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,6 +22,9 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
@@ -76,13 +79,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -92,13 +95,19 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
@@ -114,6 +123,8 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -151,7 +162,6 @@ public class ServerCnxTest {
     private ClientChannelHelper clientChannelHelper;
     private PulsarService pulsar;
     private MetadataStoreExtended store;
-    private ConfigurationCacheService configCacheService;
     private NamespaceResources namespaceResources;
     protected NamespaceService namespaceService;
     private final int currentProtocolVersion = 
ProtocolVersion.values()[ProtocolVersion.values().length - 1]
@@ -2059,7 +2069,7 @@ public class ServerCnxTest {
                     return false;
                 }
             };
-            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), 
Mockito.any())).thenReturn(existingConsumerFuture);
+            Mockito.when(consumers.putIfAbsent(anyLong(), 
Mockito.any())).thenReturn(existingConsumerFuture);
             // do test: delay complete after execute 'isDone()' many times
             // Why is the design so complicated, see: 
https://github.com/apache/pulsar/pull/15051
             try (MockedStatic<ConcurrentLongHashMap> theMock = 
Mockito.mockStatic(ConcurrentLongHashMap.class)) {
@@ -2098,12 +2108,12 @@ public class ServerCnxTest {
         }
         // case3: exists existingConsumerFuture, already complete and exception
         CompletableFuture existingConsumerFuture = 
Mockito.mock(CompletableFuture.class);
-        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), 
Mockito.any())).thenReturn(existingConsumerFuture);
+        Mockito.when(consumers.putIfAbsent(anyLong(), 
Mockito.any())).thenReturn(existingConsumerFuture);
         // make consumerFuture delay finish
         Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
         // when sync get return, future will return success value.
         Mockito.when(existingConsumerFuture.get()).thenThrow(new 
NullPointerException());
-        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), 
Mockito.any())).
+        Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())).
                 thenThrow(new NullPointerException());
         
Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
         
Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new 
NullPointerException());
@@ -2326,4 +2336,268 @@ public class ServerCnxTest {
         assertEquals(((CommandPartitionedTopicMetadataResponse) 
response).getError(), ServerError.ServiceNotReady);
         channel.finish();
     }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                List.of("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = 
(CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                List.of("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = 
(CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddSubscriptionToTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        final Subscription sub = new Subscription();
+        sub.setTopic("topic1");
+        sub.setSubscription("sub1");
+        ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+                List.of(sub));
+        channel.writeInbound(clientCommand);
+        CommandAddSubscriptionToTxnResponse response = 
(CommandAddSubscriptionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        final Subscription sub = new Subscription();
+        sub.setTopic("topic1");
+        sub.setSubscription("sub1");
+        ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+                List.of(sub));
+        channel.writeInbound(clientCommand);
+        CommandAddSubscriptionToTxnResponse response = 
(CommandAddSubscriptionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = 
Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+                TxnAction.COMMIT));
+        channel.writeInbound(clientCommand);
+        CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = 
Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+                TxnAction.COMMIT));
+        channel.writeInbound(clientCommand);
+        CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnPartitionResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        
doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class),
 anyInt(), anyLong());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+                successTopicName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnPartitionResponse response = 
(CommandEndTxnOnPartitionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnPartitionResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        doReturn(CompletableFuture.failedFuture(new RuntimeException("server 
error"))).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+                successTopicName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnPartitionResponse response = 
(CommandEndTxnOnPartitionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnSubscription() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        final org.apache.pulsar.broker.service.Subscription sub = 
mock(org.apache.pulsar.broker.service.Subscription.class);
+        doReturn(sub).when(topic).getSubscription(any());
+        doReturn(CompletableFuture.completedFuture(null))
+                .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+        ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+                successTopicName, successSubName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnSubscriptionResponse response = 
(CommandEndTxnOnSubscriptionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnSubscriptionFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = 
mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+
+        final org.apache.pulsar.broker.service.Subscription sub = 
mock(org.apache.pulsar.broker.service.Subscription.class);
+        doReturn(sub).when(topic).getSubscription(any());
+        doReturn(CompletableFuture.failedFuture(new RuntimeException("server 
error")))
+                .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+        ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+                successTopicName, successSubName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnSubscriptionResponse response = 
(CommandEndTxnOnSubscriptionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "Handle end txn on subscription 
failed: server error");
+
+        channel.finish();
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 33d954f7bfa..2dc56282a79 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.broker.service.utils;
 
 import java.util.Queue;
-
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
@@ -168,6 +172,35 @@ public class ClientChannelHelper {
         protected void 
handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
             queue.offer(new 
CommandPartitionedTopicMetadataResponse().copyFrom(response));
         }
+
+        @Override
+        protected void handleAddPartitionToTxnResponse(
+                CommandAddPartitionToTxnResponse 
commandAddPartitionToTxnResponse) {
+            queue.offer(new 
CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
+        }
+
+        @Override
+        protected void handleAddSubscriptionToTxnResponse(
+                CommandAddSubscriptionToTxnResponse 
commandAddSubscriptionToTxnResponse) {
+            queue.offer(new 
CommandAddSubscriptionToTxnResponse().copyFrom(commandAddSubscriptionToTxnResponse));
+        }
+
+        @Override
+        protected void handleEndTxnResponse(CommandEndTxnResponse 
commandEndTxnResponse) {
+            queue.offer(new 
CommandEndTxnResponse().copyFrom(commandEndTxnResponse));
+        }
+
+        @Override
+        protected void handleEndTxnOnPartitionResponse(
+                CommandEndTxnOnPartitionResponse 
commandEndTxnOnPartitionResponse) {
+            queue.offer(new 
CommandEndTxnOnPartitionResponse().copyFrom(commandEndTxnOnPartitionResponse));
+        }
+
+        @Override
+        protected void handleEndTxnOnSubscriptionResponse(
+                CommandEndTxnOnSubscriptionResponse 
commandEndTxnOnSubscriptionResponse) {
+            queue.offer(new 
CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
+        }
     };
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 885feb09cd4..9e56c00bbb7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -216,9 +216,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     void handleNewTxnResponse(CommandNewTxnResponse response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
              error = response.getError();
              message = response.getMessage();
@@ -226,14 +226,13 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             error = null;
             message = null;
         }
-        TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
-        long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
+        final long requestId = response.getRequestId();
         internalPinnedExecutor.execute(() -> {
             OpForTxnIdCallBack op = (OpForTxnIdCallBack) 
pendingRequests.remove(requestId);
             if (op == null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got new txn response for timeout {} - {}", 
txnID.getMostSigBits(),
-                            txnID.getLeastSigBits());
+                    LOG.debug("Got new txn response for transaction {}", 
txnID);
                 }
                 return;
             }
@@ -300,9 +299,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     void 
handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse 
response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
             error = response.getError();
             message = response.getMessage();
@@ -310,14 +309,13 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             error = null;
             message = null;
         }
-        TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
-        long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
+        final long requestId = response.getRequestId();
         internalPinnedExecutor.execute(() -> {
             OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(requestId);
             if (op == null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got add publish partition to txn response for 
timeout {} - {}", txnID.getMostSigBits(),
-                            txnID.getLeastSigBits());
+                    LOG.debug("Got add publish partition to txn response for 
transaction {}", txnID);
                 }
                 return;
             }
@@ -351,8 +349,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
                     return;
                 }
-                LOG.error("{} for request {} error {} with txnID {}.", 
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
-                        requestId, error, txnID);
+                LOG.error("{} for request {}, transaction {}, error: {}",
+                        BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), 
requestId, txnID, error);
 
             }
 
@@ -384,9 +382,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     public void 
handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse 
response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
             error = response.getError();
             message = response.getMessage();
@@ -394,7 +392,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             error = null;
             message = null;
         }
-        long requestId = response.getRequestId();
+        final long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
         internalPinnedExecutor.execute(() -> {
             OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(requestId);
             if (op == null) {
@@ -410,8 +409,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                 }
                 op.callback.complete(null);
             } else {
-                LOG.error("Add subscription to txn failed for request {} error 
{}.",
-                        requestId, error);
+                LOG.error("Add subscription to txn failed for request {}, 
transaction {}, error: {}",
+                        requestId, txnID, error);
                 if (checkIfNeedRetryByError(error, message, op)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Get a response for {} request {} error 
TransactionCoordinatorNotFound and try it"
@@ -465,9 +464,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     void handleEndTxnResponse(CommandEndTxnResponse response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
             error = response.getError();
             message = response.getMessage();
@@ -475,21 +474,20 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             error = null;
             message = null;
         }
-        TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
-        long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
+        final long requestId = response.getRequestId();
         internalPinnedExecutor.execute(() -> {
             OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(requestId);
             if (op == null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got end txn response for timeout {} - {}", 
txnID.getMostSigBits(),
-                            txnID.getLeastSigBits());
+                    LOG.debug("Got end txn response for transaction but no 
requests pending for txn {}", txnID);
                 }
                 return;
             }
 
             if (!hasError) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got end txn response success for request {}", 
requestId);
+                    LOG.debug("Got end txn response success for request {}, 
txn {}", requestId, txnID);
                 }
                 op.callback.complete(null);
             } else {
@@ -516,8 +514,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
                     return;
                 }
-                LOG.error("Got {} response for request {} error {}", 
BaseCommand.Type.END_TXN.name(),
-                        requestId, error);
+                LOG.error("Got {} response for request {}, transaction {}, 
error: {}",
+                        BaseCommand.Type.END_TXN.name(), requestId, txnID, 
error);
 
             }
             onResponse(op);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index fdb94c17795..4d4d64f7fe9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1367,12 +1367,16 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newAddPartitionToTxnResponse(long requestId, long 
txnIdMostBits, ServerError error,
-           String errorMsg) {
+    public static ByteBuf newAddPartitionToTxnResponse(long requestId,
+                                                       long txnIdLeastBits,
+                                                       long txnIdMostBits,
+                                                       ServerError error,
+                                                       String errorMsg) {
         BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN_RESPONSE);
         CommandAddPartitionToTxnResponse response = 
cmd.setAddPartitionToTxnResponse()
                 .setRequestId(requestId)
                 .setError(error)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setTxnidMostBits(txnIdMostBits);
 
         if (errorMsg != null) {
@@ -1401,12 +1405,13 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long 
txnIdMostBits, ServerError error,
-          String errorMsg) {
+    public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long 
txnIdLeastBits,
+                                                          long txnIdMostBits, 
ServerError error, String errorMsg) {
         BaseCommand cmd = localCmd(Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE);
         CommandAddSubscriptionToTxnResponse response = 
cmd.setAddSubscriptionToTxnResponse()
                 .setRequestId(requestId)
                 .setTxnidMostBits(txnIdMostBits)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setError(error);
         if (errorMsg != null) {
             response.setMessage(errorMsg);
@@ -1432,11 +1437,12 @@ public class Commands {
         return cmd;
     }
 
-    public static BaseCommand newEndTxnResponse(long requestId, long 
txnIdMostBits,
+    public static BaseCommand newEndTxnResponse(long requestId, long 
txnIdLeastBits, long txnIdMostBits,
                                                 ServerError error, String 
errorMsg) {
         BaseCommand cmd = localCmd(Type.END_TXN_RESPONSE);
         CommandEndTxnResponse response = cmd.setEndTxnResponse()
                 .setRequestId(requestId)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setTxnidMostBits(txnIdMostBits)
                 .setError(error);
         if (errorMsg != null) {


Reply via email to