This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2439eda30fe08978f4254d3698972374155852b0
Author: congbo <[email protected]>
AuthorDate: Mon Aug 9 23:44:47 2021 +0800

    [Transaction] Fix transaction buffer client handle endTxn op when topic or 
sub have been deleted. (#11304)
    
    now, when topic have been deleted we will not handle the `endTxnOnTopic` 
and endTxnOnSub.
    now, when sub have been deleted we will not handle the `endTxnOnSub`.
    
    when topic or sub have been deleted, we should return success to tc on 
`endTxnOnTopic` and endTxnOnSub operation.
    
    when topic not exist in this broker, we should judge the topic if it have 
been created. if not created, we should return success.
    
    (cherry picked from commit a6e66dd38bbb9edc552fe6877d0d600bd6d5506b)
---
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |  11 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   5 +
 .../apache/bookkeeper/mledger/impl/MetaStore.java  |  12 ++
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     |   6 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 167 ++++++++++------
 .../broker/transaction/TransactionTestBase.java    |   5 +-
 .../buffer/TransactionBufferClientTest.java        | 219 ++++++++++-----------
 .../apache/pulsar/common/protocol/Commands.java    |   5 +-
 8 files changed, 247 insertions(+), 183 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 4e103d0..cfc1d3d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -160,4 +161,14 @@ public interface ManagedLedgerFactory {
      */
     void shutdown() throws InterruptedException, ManagedLedgerException;
 
+    /**
+     * Check managed ledger has been initialized before.
+     *
+     * @param ledgerName {@link String}
+     * @return a future represents the result of the operation.
+     *         an instance of {@link Boolean} is returned
+     *         if the operation succeeds.
+     */
+    CompletableFuture<Boolean> asyncExists(String ledgerName);
+
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 16d577a..699dc68 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -525,6 +525,11 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     }
 
     @Override
+    public CompletableFuture<Boolean> asyncExists(String ledgerName) {
+        return store.asyncExists(ledgerName);
+    }
+
+    @Override
     public ManagedLedgerInfo getManagedLedgerInfo(String name) throws 
InterruptedException, ManagedLedgerException {
         class Result {
             ManagedLedgerInfo info = null;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
index 8b99203..9f1563b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -129,4 +131,14 @@ public interface MetaStore {
      * @throws MetaStoreException
      */
     Iterable<String> getManagedLedgers() throws MetaStoreException;
+
+    /**
+     * Check ledger exists.
+     *
+     * @param ledgerName {@link String}
+     * @return a future represents the result of the operation.
+     *         an instance of {@link Boolean} is returned
+     *         if the operation succeeds.
+     */
+    CompletableFuture<Boolean> asyncExists(String ledgerName);
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 9558d11..a295114 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
 import io.netty.buffer.ByteBuf;
@@ -257,6 +258,11 @@ public class MetaStoreImpl implements MetaStore {
         }
     }
 
+    @Override
+    public CompletableFuture<Boolean> asyncExists(String path) {
+        return store.exists(PREFIX + path);
+    }
+
     //
     // update timestamp if missing or 0
     // 3 cases - timestamp does not exist for ledgers serialized before
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e0a056a..ac50c53 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1981,43 +1981,68 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final String topic = command.getTopic();
         final int txnAction = command.getTxnAction().getValue();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: 
[{}]", topic,
                     txnID, txnAction);
         }
-        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopics().get(TopicName.get(topic).toString());
-        if (topicFuture != null) {
-            topicFuture.whenComplete((optionalTopic, t) -> {
-                if (!optionalTopic.isPresent()) {
-                    log.error("handleEndTxnOnPartition fail ! The topic {} 
does not exist in broker, "
-                            + "txnId: [{}], txnAction: [{}]", topic, txnID, 
TxnAction.valueOf(txnAction));
-                    ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
-                            requestId, ServerError.ServiceNotReady,
-                            "Topic " + topic + " is not found."));
-                    return;
-                }
-                optionalTopic.get().endTxn(txnID, txnAction, 
command.getTxnidLeastBitsOfLowWatermark())
+        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
+        topicFuture.thenAccept(optionalTopic -> {
+            if (optionalTopic.isPresent()) {
+                optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
                         .whenComplete((ignored, throwable) -> {
                             if (throwable != null) {
-                                log.error("Handle endTxnOnPartition {} 
failed.", topic, throwable);
+                                log.error("handleEndTxnOnPartition fail!, 
topic {}, txnId: [{}], "
+                                        + "txnAction: [{}]", topic, txnID, 
TxnAction.valueOf(txnAction), throwable);
                                 
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                                         requestId, 
BrokerServiceException.getClientErrorCode(throwable),
-                                        throwable.getMessage()));
+                                        throwable.getMessage(),
+                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                                 return;
                             }
                             
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                     txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                         });
-            });
-        } else {
-            log.error("handleEndTxnOnPartition faile ! The topic {} does not 
exist in broker, "
-                    + "txnId: [{}], txnAction: [{}]", topic, txnID, 
TxnAction.valueOf(txnAction));
-            ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                    requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
-                    ServerError.ServiceNotReady,
-                    "The topic " + topic + " is not exist in broker."));
-        }
+
+            } else {
+                getBrokerService().getManagedLedgerFactory()
+                        
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
+                        .thenAccept((b) -> {
+                            if (b) {
+                                log.error("handleEndTxnOnPartition fail ! The 
topic {} does not exist in broker, "
+                                                + "txnId: [{}], txnAction: 
[{}]", topic,
+                                        txnID, TxnAction.valueOf(txnAction));
+                                
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                                        ServerError.ServiceNotReady,
+                                        "The topic " + topic + " does not 
exist in broker.",
+                                        txnID.getMostSigBits(), 
txnID.getLeastSigBits()));
+                            } else {
+                                log.warn("handleEndTxnOnPartition fail ! The 
topic {} has not been created, "
+                                                + "txnId: [{}], txnAction: 
[{}]",
+                                        topic, txnID, 
TxnAction.valueOf(txnAction));
+                                
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                            }
+                        }).exceptionally(e -> {
+                    log.error("handleEndTxnOnPartition fail ! topic {} , "
+                                    + "txnId: [{}], txnAction: [{}]", topic, 
txnID,
+                            TxnAction.valueOf(txnAction), e.getCause());
+                    ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                            requestId, ServerError.ServiceNotReady,
+                            e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                    return null;
+                });
+            }
+        }).exceptionally(e -> {
+            log.error("handleEndTxnOnPartition fail ! topic {} , "
+                            + "txnId: [{}], txnAction: [{}]", topic, txnID,
+                    TxnAction.valueOf(txnAction), e.getCause());
+            ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                    requestId, ServerError.ServiceNotReady,
+                    e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+            return null;
+        });
     }
 
     @Override
@@ -2028,70 +2053,82 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final String topic = command.getSubscription().getTopic();
         final String subName = command.getSubscription().getSubscription();
         final int txnAction = command.getTxnAction().getValue();
+        final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
+        final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: 
[{}]", topic,
+            log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], 
txnAction: [{}]", topic, subName,
                     new TxnID(txnidMostBits, txnidLeastBits), txnAction);
         }
 
-        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopics().get(TopicName.get(topic).toString());
-        if (topicFuture != null) {
-            topicFuture.thenAccept(optionalTopic -> {
-
-                if (!optionalTopic.isPresent()) {
-                    log.error("handleEndTxnOnSubscription fail! The topic {} 
does not exist in broker, txnId: "
-                                    + "[{}], txnAction: [{}]", topic,
-                            new TxnID(txnidMostBits, txnidLeastBits), 
TxnAction.valueOf(txnAction));
-                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                            requestId, txnidLeastBits, txnidMostBits,
-                            ServerError.ServiceNotReady,
-                            "The topic " + topic + " is not exist in 
broker."));
-                    return;
-                }
-
+        CompletableFuture<Optional<Topic>> topicFuture = 
service.getTopicIfExists(TopicName.get(topic).toString());
+        topicFuture.thenAccept(optionalTopic -> {
+            if (optionalTopic.isPresent()) {
                 Subscription subscription = 
optionalTopic.get().getSubscription(subName);
                 if (subscription == null) {
-                    log.error("Topic {} subscription {} is not exist.", 
optionalTopic.get().getName(), subName);
-                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                            requestId, txnidLeastBits, txnidMostBits,
-                            ServerError.ServiceNotReady,
-                            "Topic " + optionalTopic.get().getName()
-                                    + " subscription " + subName + " is not 
exist."));
+                    log.warn("handleEndTxnOnSubscription fail! "
+                                    + "topic {} subscription {} does not 
exist. txnId: [{}], txnAction: [{}]",
+                            optionalTopic.get().getName(), subName, txnID, 
TxnAction.valueOf(txnAction));
+                    ctx.writeAndFlush(
+                            
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
                     return;
                 }
 
                 CompletableFuture<Void> completableFuture =
-                        subscription.endTxn(txnidMostBits, txnidLeastBits, 
txnAction,
-                                command.getTxnidLeastBitsOfLowWatermark());
-                completableFuture.whenComplete((ignored, throwable) -> {
-                    if (throwable != null) {
-                        log.error("Handle end txn on subscription failed for 
request {}", requestId, throwable);
+                        subscription.endTxn(txnidMostBits, txnidLeastBits, 
txnAction, lowWaterMark);
+                completableFuture.whenComplete((ignored, e) -> {
+                    if (e != null) {
+                        log.error("handleEndTxnOnSubscription fail ! topic: {} 
, subscription: {}"
+                                        + "txnId: [{}], txnAction: [{}]", 
topic, subName,
+                                txnID, TxnAction.valueOf(txnAction), 
e.getCause());
                         
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                 requestId, txnidLeastBits, txnidMostBits,
-                                
BrokerServiceException.getClientErrorCode(throwable),
+                                BrokerServiceException.getClientErrorCode(e),
                                 "Handle end txn on subscription failed."));
                         return;
                     }
                     ctx.writeAndFlush(
                             
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
                 });
-            }).exceptionally(e -> {
-                log.error("Handle end txn on subscription failed for request 
{}", requestId, e);
-                ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                        requestId, txnidLeastBits, txnidMostBits,
-                        ServerError.ServiceNotReady,
-                        "Handle end txn on subscription failed."));
-                return null;
-            });
-        } else {
-            log.error("handleEndTxnOnSubscription fail! The topic {} does not 
exist in broker, txnId: "
-                    + "[{}], txnAction: [{}]", topic,
-                    new TxnID(txnidMostBits, txnidLeastBits), 
TxnAction.valueOf(txnAction));
+            } else {
+                getBrokerService().getManagedLedgerFactory()
+                        
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
+                        .thenAccept((b) -> {
+                            if (b) {
+                                log.error("handleEndTxnOnSubscription fail! 
The topic {} does not exist in broker, "
+                                                + "subscription: {} ,txnId: 
[{}], txnAction: [{}]", topic, subName,
+                                        new TxnID(txnidMostBits, 
txnidLeastBits), TxnAction.valueOf(txnAction));
+                                
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                        requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
+                                        ServerError.ServiceNotReady,
+                                        "The topic " + topic + " does not 
exist in broker."));
+                            } else {
+                                log.warn("handleEndTxnOnSubscription fail ! 
The topic {} has not been created, "
+                                                + "subscription: {} txnId: 
[{}], txnAction: [{}]",
+                                        topic, subName, txnID, 
TxnAction.valueOf(txnAction));
+                                
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
+                                        txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                            }
+                        }).exceptionally(e -> {
+                    log.error("handleEndTxnOnSubscription fail ! topic {} , 
subscription: {}"
+                                    + "txnId: [{}], txnAction: [{}]", topic, 
subName,
+                            txnID, TxnAction.valueOf(txnAction), e.getCause());
+                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                            requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
+                            ServerError.ServiceNotReady, e.getMessage()));
+                    return null;
+                });
+            }
+        }).exceptionally(e -> {
+            log.error("handleEndTxnOnSubscription fail ! topic: {} , 
subscription: {}"
+                            + "txnId: [{}], txnAction: [{}]", topic, subName,
+                    txnID, TxnAction.valueOf(txnAction), e.getCause());
             ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                     requestId, txnidLeastBits, txnidMostBits,
                     ServerError.ServiceNotReady,
-                    "The topic " + topic + " is not exist in broker."));
-        }
+                    "Handle end txn on subscription failed."));
+            return null;
+        });
     }
 
     private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, 
SchemaData schema) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 2c3cb4d..67ecd87 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -21,8 +21,9 @@ package org.apache.pulsar.broker.transaction;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
 import java.util.ArrayList;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 6081580..08de910 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,16 +18,10 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Cleanup;
@@ -37,15 +31,12 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
-import 
org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
@@ -56,13 +47,12 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import java.lang.reflect.Field;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -74,76 +64,38 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 @Test(groups = "broker")
-public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
+public class TransactionBufferClientTest extends TransactionTestBase {
 
     private static final Logger log = 
LoggerFactory.getLogger(TransactionBufferClientTest.class);
     private TransactionBufferClient tbClient;
     TopicName partitionedTopicName = TopicName.get("persistent", "public", 
"test", "tb-client");
     int partitions = 10;
-    BrokerService[] brokerServices;
-    private final static String namespace = "public/test";
-
-    private EventLoopGroup eventLoopGroup;
+    private static final String namespace = "public/test";
 
     @Override
-    protected void afterSetup() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", 
ClusterData.builder().serviceUrl(pulsarServices[0].getWebServiceAddress()).build());
-        pulsarAdmins[0].tenants().createTenant("public", new 
TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
-        pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
-        
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
 partitions);
-        String subName = "test";
-        
pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(),
 subName, MessageId.latest);
+    @BeforeClass(alwaysRun = true)
+    protected void setup() throws Exception {
+        setBrokerCount(3);
+        internalSetup();
+        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
+        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(namespace, 10);
+        
admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
 partitions);
         pulsarClient.newConsumer()
                 .topic(partitionedTopicName.getPartitionedTopicName())
-                .subscriptionName(subName).subscribe();
-        tbClient = TransactionBufferClientImpl.create(
-                ((PulsarClientImpl) pulsarClient),
+                .subscriptionName("test").subscribe();
+        tbClient = TransactionBufferClientImpl.create(pulsarClient,
                 new HashedWheelTimer(new 
DefaultThreadFactory("transaction-buffer")));
     }
 
     @Override
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
-        if (tbClient != null) {
-            tbClient.close();
-        }
-        if (brokerServices != null) {
-            for (BrokerService bs : brokerServices) {
-                bs.close();
-            }
-            brokerServices = null;
-        }
-        super.cleanup();
-        eventLoopGroup.shutdownGracefully().get();
-    }
-
-    @Override
-    protected void afterPulsarStart() throws Exception {
-        eventLoopGroup = new NioEventLoopGroup();
-        brokerServices = new BrokerService[pulsarServices.length];
-        AtomicLong atomicLong = new AtomicLong(0);
-        for (int i = 0; i < pulsarServices.length; i++) {
-            Subscription mockSubscription = mock(Subscription.class);
-            Mockito.when(mockSubscription.endTxn(Mockito.anyLong(),
-                    Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong()))
-                    .thenReturn(CompletableFuture.completedFuture(null));
-
-            Topic mockTopic = mock(Topic.class);
-            Mockito.when(mockTopic.endTxn(any(), Mockito.anyInt(), anyLong()))
-                    .thenReturn(CompletableFuture.completedFuture(null));
-            
Mockito.when(mockTopic.getSubscription(any())).thenReturn(mockSubscription);
-
-            ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topicMap =
-                    mock(ConcurrentOpenHashMap.class);
-            Mockito.when(topicMap.get(Mockito.anyString())).thenReturn(
-                    CompletableFuture.completedFuture(Optional.of(mockTopic)));
-
-            BrokerService brokerService = Mockito.spy(new 
BrokerService(pulsarServices[i], eventLoopGroup));
-            doReturn(new 
MockBrokerInterceptor()).when(brokerService).getInterceptor();
-            doReturn(atomicLong.getAndIncrement() + 
"").when(brokerService).generateUniqueProducerName();
-            brokerServices[i] = brokerService;
-            Mockito.when(brokerService.getTopics()).thenReturn(topicMap);
-            
Mockito.when(pulsarServices[i].getBrokerService()).thenReturn(brokerService);
-        }
+        tbClient.close();
+        super.internalCleanup();
     }
 
     @Test
@@ -199,43 +151,6 @@ public class TransactionBufferClientTest extends 
TransactionMetaStoreTestBase {
     }
 
     @Test
-    public void testTransactionBufferOpFail() throws InterruptedException, 
ExecutionException {
-        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>[] 
originalMaps =
-                new ConcurrentOpenHashMap[brokerServices.length];
-        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topicMap = new ConcurrentOpenHashMap<>();
-        for (int i = 0; i < brokerServices.length; i++) {
-            originalMaps[i] = brokerServices[i].getTopics();
-            when(brokerServices[i].getTopics()).thenReturn(topicMap);
-        }
-
-        try {
-            tbClient.abortTxnOnSubscription(
-                    partitionedTopicName.getPartition(0).toString(), "test", 
1L, 1, -1L).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
PulsarClientException.LookupException);
-        }
-
-        try {
-            tbClient.abortTxnOnTopic(
-                    partitionedTopicName.getPartition(0).toString(), 1L, 1, 
-1L).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
PulsarClientException.LookupException);
-        }
-
-        for (int i = 0; i < brokerServices.length; i++) {
-            when(brokerServices[i].getTopics()).thenReturn(originalMaps[i]);
-        }
-
-        tbClient.abortTxnOnSubscription(
-                partitionedTopicName.getPartition(0).toString(), "test", 1L, 
1, -1L).get();
-
-        tbClient.abortTxnOnTopic(
-                partitionedTopicName.getPartition(0).toString(), 1L, 1, 
-1L).get();
-    }
-
-    @Test
     public void testTransactionBufferClientTimeout() throws Exception {
         PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
@@ -314,12 +229,27 @@ public class TransactionBufferClientTest extends 
TransactionMetaStoreTestBase {
     }
 
     @Test
-    public void testTransactionBufferLookUp() throws ExecutionException, 
InterruptedException {
+    public void testTransactionBufferLookUp() throws Exception {
         String topic = "persistent://" + namespace + 
"/testTransactionBufferLookUp";
-        tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, 
-1L).get();
-        tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 
1L, -1L).get();
-        tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
-        tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
+        String subName = "test";
+
+        String abortTopic = topic + "_abort_sub";
+        String commitTopic = topic + "_commit_sub";
+        admin.topics().createNonPartitionedTopic(abortTopic);
+        admin.topics().createSubscription(abortTopic, subName, 
MessageId.earliest);
+
+        admin.topics().createNonPartitionedTopic(commitTopic);
+        admin.topics().createSubscription(commitTopic, subName, 
MessageId.earliest);
+
+        waitPendingAckInit(abortTopic, subName);
+        waitPendingAckInit(commitTopic, subName);
+
+        tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get();
+
+        tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, 
-1L).get();
+
+        tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get();
+        tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get();
     }
 
     @Test
@@ -333,10 +263,69 @@ public class TransactionBufferClientTest extends 
TransactionMetaStoreTestBase {
         field.setAccessible(true);
         field.set(transactionBufferHandler, new Semaphore(2));
 
-        String topic = "persistent://" + namespace + 
"/testTransactionBufferLookUp";
-        tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, 
-1L).get();
+
+        String topic = "persistent://" + namespace + 
"/testTransactionBufferHandlerSemaphore";
+        String subName = "test";
+
+        String abortTopic = topic + "_abort_sub";
+        String commitTopic = topic + "_commit_sub";
+
+        admin.topics().createNonPartitionedTopic(abortTopic);
+        admin.topics().createSubscription(abortTopic, subName, 
MessageId.earliest);
+
+        admin.topics().createNonPartitionedTopic(commitTopic);
+        admin.topics().createSubscription(commitTopic, subName, 
MessageId.earliest);
+
+        waitPendingAckInit(abortTopic, subName);
+        waitPendingAckInit(commitTopic, subName);
+        tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get();
+
+        tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, 
-1L).get();
+
+        tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get();
+        tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get();
+    }
+
+    @Test
+    public void testEndTopicNotExist() throws Exception {
+        String topic = "persistent://" + namespace + "/testEndTopicNotExist";
+        String sub = "test";
+
         tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
-        tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 
1L, -1L).get();
         tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
+
+        tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, 
-1L).get();
+        tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, 
-1L).get();
+    }
+
+    @Test
+    public void testEndSubNotExist() throws Exception {
+
+        String topic = "persistent://" + namespace + "/testEndTopicNotExist";
+        String sub = "test";
+        admin.topics().createNonPartitionedTopic(topic + "_abort_sub");
+
+        admin.topics().createNonPartitionedTopic(topic + "_commit_sub");
+
+        tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, 
-1L).get();
+        tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, 
-1L).get();
+    }
+
+    private void waitPendingAckInit(String topic, String sub) throws Exception 
{
+
+        boolean exist = false;
+        for (int i = 0; i < getPulsarServiceList().size(); i++) {
+            CompletableFuture<Optional<Topic>> completableFuture = 
getPulsarServiceList().get(i)
+                    .getBrokerService().getTopics().get(topic);
+            if (completableFuture != null) {
+                PersistentSubscription persistentSubscription =
+                        (PersistentSubscription) 
completableFuture.get().get().getSubscription(sub);
+                Awaitility.await().untilAsserted(() ->
+                        
assertEquals(persistentSubscription.getTransactionPendingAckStats().state, 
"Ready"));
+                exist = true;
+            }
+        }
+
+        assertTrue(exist);
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7c93ecc..16ffd96 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1314,10 +1314,13 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newEndTxnOnPartitionResponse(long requestId, 
ServerError error, String errorMsg) {
+    public static ByteBuf newEndTxnOnPartitionResponse(long requestId, 
ServerError error, String errorMsg,
+                                                       long txnIdLeastBits, 
long txnIdMostBits) {
         BaseCommand cmd = localCmd(Type.END_TXN_ON_PARTITION_RESPONSE);
         CommandEndTxnOnPartitionResponse response = 
cmd.setEndTxnOnPartitionResponse()
                 .setRequestId(requestId)
+                .setTxnidMostBits(txnIdMostBits)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setError(error);
         if (errorMsg != null) {
             response.setMessage(errorMsg);

Reply via email to