This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 46baae6a5e8 [improve][transaction] Optimize topic lookup when TC end 
tx. (#14991)
46baae6a5e8 is described below

commit 46baae6a5e89a5feed171d231b59042a2ce9e1f8
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Apr 12 18:53:03 2022 +0800

    [improve][transaction] Optimize topic lookup when TC end tx. (#14991)
    
    ### Motivation
    
    When TC ends tx, it has to look up the topic. The original way uses 
pulsar-client to do this. It will look up every topic.
    If using bundle cache to find the topic owner broker, it can avoid lookup 
every topic and then decrease the lookup time.
    
    ### Modifications
    
    - Using bundle cache to find the topic owner broker. If occurs an error, 
fall back to look up the topic.
    - Remove the topic cache, because using bundle cache, the original cache is 
useless.
---
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../buffer/impl/TransactionBufferClientImpl.java   |   9 +-
 .../buffer/impl/TransactionBufferHandlerImpl.java  | 155 ++++++++++-----------
 .../broker/transaction/TransactionProduceTest.java |  29 ++++
 .../broker/transaction/TransactionTestBase.java    |   2 +-
 .../buffer/TransactionBufferClientTest.java        |  37 +++--
 .../buffer/TransactionBufferHandlerImplTest.java   |  41 ++++--
 .../pulsar/client/impl/PulsarClientImpl.java       |   8 +-
 8 files changed, 174 insertions(+), 109 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b060cbbcfcf..7369453c9d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -751,7 +751,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 this.transactionBufferSnapshotService = new 
SystemTopicBaseTxnBufferSnapshotService(getClient());
                 this.transactionTimer =
                         new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-transaction-timer"));
-                transactionBufferClient = 
TransactionBufferClientImpl.create(getClient(), transactionTimer,
+                transactionBufferClient = 
TransactionBufferClientImpl.create(this, transactionTimer,
                         
config.getTransactionBufferClientMaxConcurrentRequests(),
                         
config.getTransactionBufferClientOperationTimeoutInMills());
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 060476e573c..c531f9f1871 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
 import io.netty.util.HashedWheelTimer;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
@@ -39,9 +40,9 @@ public class TransactionBufferClientImpl implements 
TransactionBufferClient {
         this.tbHandler = tbHandler;
     }
 
-    public static TransactionBufferClient create(PulsarClient pulsarClient, 
HashedWheelTimer timer,
-             int maxConcurrentRequests, long operationTimeoutInMills) {
-        TransactionBufferHandler handler = new 
TransactionBufferHandlerImpl(pulsarClient, timer,
+    public static TransactionBufferClient create(PulsarService pulsarService, 
HashedWheelTimer timer,
+        int maxConcurrentRequests, long operationTimeoutInMills) throws 
PulsarServerException {
+        TransactionBufferHandler handler = new 
TransactionBufferHandlerImpl(pulsarService, timer,
                 maxConcurrentRequests, operationTimeoutInMills);
         return new TransactionBufferClientImpl(handler);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index b80a273bc6f..3f9a083787b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -18,21 +18,23 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -42,7 +44,10 @@ import 
org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
 import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
 @Slf4j
@@ -53,31 +58,17 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
     private final HashedWheelTimer timer;
-    private final PulsarClient pulsarClient;
+    private final PulsarService pulsarService;
+    private final PulsarClientImpl pulsarClient;
 
     private static final 
AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER 
=
             
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, 
"requestCredits");
     private volatile int requestCredits;
 
-    private final LoadingCache<String, CompletableFuture<ClientCnx>> 
lookupCache = CacheBuilder.newBuilder()
-            .maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES)
-            .build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
-                @Override
-                public CompletableFuture<ClientCnx> load(String topic) {
-                    CompletableFuture<ClientCnx> siFuture = 
getClientCnx(topic);
-                    siFuture.whenComplete((si, cause) -> {
-                        if (null != cause) {
-                            lookupCache.invalidate(topic);
-                        }
-                    });
-                    return siFuture;
-                }
-            });
-
-    public TransactionBufferHandlerImpl(PulsarClient pulsarClient, 
HashedWheelTimer timer,
-                                        int maxConcurrentRequests, long 
operationTimeoutInMills) {
-        this.pulsarClient = pulsarClient;
+    public TransactionBufferHandlerImpl(PulsarService pulsarService, 
HashedWheelTimer timer,
+        int maxConcurrentRequests, long operationTimeoutInMills) throws 
PulsarServerException {
+        this.pulsarService = pulsarService;
+        this.pulsarClient = (PulsarClientImpl) pulsarService.getClient();
         this.outstandingRequests = new ConcurrentSkipListMap<>();
         this.pendingRequests = new GrowableArrayBlockingQueue<>();
         this.operationTimeoutInMills = operationTimeoutInMills;
@@ -97,15 +88,9 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, 
txnIdMostBits,
                 topic, action, lowWaterMark);
 
-        try {
-            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, 
lookupCache.get(topic));
-            if (checkRequestCredits(op)) {
-                endTxn(op);
-            }
-        } catch (ExecutionException e) {
-            log.error("[{}] failed to get client cnx from lookup cache", 
topic, e);
-            lookupCache.invalidate(topic);
-            cb.completeExceptionally(new 
PulsarClientException.LookupException(e.getCause().getMessage()));
+        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, 
getClientCnx(topic));
+        if (checkRequestCredits(op)) {
+            endTxn(op);
         }
         return cb;
     }
@@ -122,15 +107,9 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, 
txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
-        try {
-            OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, 
lookupCache.get(topic));
-            if (checkRequestCredits(op)) {
-                endTxn(op);
-            }
-        } catch (ExecutionException e) {
-            log.error("[{}] failed to get client cnx from lookup cache", 
topic, e);
-            lookupCache.invalidate(topic);
-            cb.completeExceptionally(new 
PulsarClientException.LookupException(e.getCause().getMessage()));
+        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, 
getClientCnx(topic));
+        if (checkRequestCredits(op)) {
+            endTxn(op);
         }
         return cb;
     }
@@ -150,8 +129,8 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
     }
 
     public void endTxn(OpRequestSend op) {
-        op.cnx.whenComplete((clientCnx, throwable) -> {
-            if (throwable == null) {
+        op.cnx.whenComplete((clientCnx, ex) -> {
+            if (ex == null) {
                 if (clientCnx.ctx().channel().isActive()) {
                     
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
                     outstandingRequests.put(op.requestId, op);
@@ -166,16 +145,19 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
                     op.cmd.retain();
                     clientCnx.ctx().writeAndFlush(op.cmd, 
clientCnx.ctx().voidPromise());
                 } else {
-                    invalidateLookupCache(op);
                     op.cb.completeExceptionally(
                             new PulsarClientException.LookupException(op.topic 
+ " endTxn channel is not active"));
                     onResponse(op);
                 }
             } else {
-                log.error("endTxn error topic: [{}]", op.topic, throwable);
-                invalidateLookupCache(op);
-                op.cb.completeExceptionally(
-                        new 
PulsarClientException.LookupException(throwable.getMessage()));
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.error("endTxn error topic: [{}]", op.topic, cause);
+                if (cause instanceof 
PulsarClientException.BrokerMetadataException) {
+                    op.cb.complete(null);
+                } else {
+                    op.cb.completeExceptionally(
+                            new 
PulsarClientException.LookupException(cause.getMessage()));
+                }
                 onResponse(op);
             }
         });
@@ -202,12 +184,9 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
                 log.error("[{}] Got end txn on topic response for request {} 
error {}", op.topic,
                         response.getRequestId(),
                         response.getError());
-                invalidateLookupCache(op);
                 
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
                         response.getMessage()));
             }
-        } catch (Exception e) {
-            log.error("[{}] Got exception when complete EndTxnOnTopic op for 
request {}", op.topic, e);
         } finally {
             onResponse(op);
         }
@@ -235,12 +214,9 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
             } else {
                 log.error("[{}] Got end txn on subscription response for 
request {} error {}",
                         op.topic, response.getRequestId(), 
response.getError());
-                invalidateLookupCache(op);
                 
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
                         response.getMessage()));
             }
-        } catch (Exception e) {
-            log.error("[{}] Got exception when complete EndTxnOnSub op for 
request {}", op.topic, e);
         } finally {
             onResponse(op);
         }
@@ -262,21 +238,14 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
                 if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, 
permits - 1)) {
                     OpRequestSend polled = pendingRequests.poll();
                     if (polled != null) {
-                        try {
-                            if (polled.cnx != lookupCache.get(polled.topic)) {
-                                OpRequestSend invalid = polled;
-                                polled = 
OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
-                                        lookupCache.get(invalid.topic));
-                                invalid.recycle();
-                            }
-                            endTxn(polled);
-                        } catch (ExecutionException e) {
-                            log.error("[{}] failed to get client cnx from 
lookup cache", polled.topic, e);
-                            lookupCache.invalidate(polled.topic);
-                            polled.cb.completeExceptionally(new 
PulsarClientException.LookupException(
-                                    e.getCause().getMessage()));
-                            REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+                        CompletableFuture<ClientCnx> clientCnx = 
getClientCnx(polled.topic);
+                        if (polled.cnx != clientCnx) {
+                            OpRequestSend invalid = polled;
+                            polled = OpRequestSend.create(invalid.requestId, 
invalid.topic, invalid.cmd, invalid.cb,
+                                    clientCnx);
+                            invalid.recycle();
                         }
+                        endTxn(polled);
                     } else {
                         REQUEST_CREDITS_UPDATER.incrementAndGet(this);
                     }
@@ -287,16 +256,6 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
         }
     }
 
-    private void invalidateLookupCache(OpRequestSend op) {
-        try {
-            if (lookupCache.get(op.topic) == op.cnx) {
-                lookupCache.invalidate(op.topic);
-            }
-        } catch (ExecutionException e) {
-            lookupCache.invalidate(op.topic);
-        }
-    }
-
     public static final class OpRequestSend {
 
         long requestId;
@@ -336,8 +295,42 @@ public class TransactionBufferHandlerImpl implements 
TransactionBufferHandler {
         };
     }
 
+    public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
+        return pulsarClient.getConnection(topic);
+    }
+
     public CompletableFuture<ClientCnx> getClientCnx(String topic) {
-        return ((PulsarClientImpl) pulsarClient).getConnection(topic);
+        NamespaceService namespaceService = 
pulsarService.getNamespaceService();
+        CompletableFuture<NamespaceBundle> nsBundle = 
namespaceService.getBundleAsync(TopicName.get(topic));
+        return nsBundle
+                .thenCompose(bundle -> namespaceService.getOwnerAsync(bundle))
+                .thenCompose(data -> {
+                    if (data.isPresent()) {
+                        NamespaceEphemeralData ephemeralData = data.get();
+                        try {
+                            if (!ephemeralData.isDisabled()) {
+                                URI uri;
+                                if 
(pulsarClient.getConfiguration().isUseTls()) {
+                                    uri = new 
URI(ephemeralData.getNativeUrlTls());
+                                } else {
+                                    uri = new 
URI(ephemeralData.getNativeUrl());
+                                }
+                                InetSocketAddress brokerAddress =
+                                        
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+                                return 
pulsarClient.getConnection(brokerAddress, brokerAddress);
+                            } else {
+                                // Bundle is unloading, lookup topic
+                                return getClientCnxWithLookup(topic);
+                            }
+                        } catch (URISyntaxException e) {
+                            // Should never go here
+                            return getClientCnxWithLookup(topic);
+                        }
+                    } else {
+                        // Bundle is not loaded yet, lookup topic
+                        return getClientCnxWithLookup(topic);
+                    }
+                });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0d1bbda4568..48015c50cc6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,6 +89,35 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         produceTest(true);
     }
 
+    @Test
+    public void testDeleteNamespaceBeforeCommit() throws Exception {
+        final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
+        PulsarClient pulsarClient = this.pulsarClient;
+        Transaction tnx = pulsarClient.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS)
+                .build().get();
+        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
+        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
+        Assert.assertTrue(txnIdMostBits > -1);
+        Assert.assertTrue(txnIdLeastBits > -1);
+
+        @Cleanup
+        Producer<byte[]> outProducer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        String content = "Hello Txn";
+        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
+
+        try {
+            admin.namespaces().deleteNamespace(NAMESPACE1, true);
+        } catch (Exception ignore) {}
+        tnx.commit().get();
+    }
+
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 082ec451eeb..bdcde1be814 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -155,7 +155,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
             conf.setConfigurationMetadataStoreUrl("zk:localhost:3181");
             conf.setAllowAutoTopicCreationType("non-partitioned");
             conf.setBookkeeperClientExposeStatsToPrometheus(true);
-
+            conf.setForceDeleteNamespaceAllowed(true);
             conf.setBrokerShutdownTimeoutMs(0L);
             conf.setBrokerServicePort(Optional.of(0));
             conf.setBrokerServicePortTls(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index d85f6b42f23..66460778dc2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -30,10 +30,13 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
@@ -45,6 +48,8 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -80,7 +85,7 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace(namespace, 10);
         
admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
 partitions);
-        tbClient = TransactionBufferClientImpl.create(pulsarClient,
+        tbClient = TransactionBufferClientImpl.create(pulsarServiceList.get(0),
                 new HashedWheelTimer(new 
DefaultThreadFactory("transaction-buffer")), 1000, 3000);
     }
 
@@ -145,22 +150,30 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
 
     @Test
     public void testTransactionBufferClientTimeout() throws Exception {
-        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+        PulsarService pulsarService = pulsarServiceList.get(0);
+        PulsarClient mockClient = mock(PulsarClientImpl.class);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+        
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
         when(cnx.channel()).thenReturn(channel);
+        when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>(){
+
+            @Override
+            public PulsarClient answer(InvocationOnMock invocation) throws 
Throwable {
+                return mockClient;
+            }
+        });
 
         when(channel.isActive()).thenReturn(true);
 
         @Cleanup("stop")
         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
         TransactionBufferHandlerImpl transactionBufferHandler =
-                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 
1000, 3000);
+                new TransactionBufferHandlerImpl(pulsarService, 
hashedWheelTimer, 1000, 3000);
         CompletableFuture<TxnID> endFuture =
                 transactionBufferHandler.endTxnOnTopic("test", 1, 1, 
TxnAction.ABORT, 1);
 
@@ -187,23 +200,31 @@ public class TransactionBufferClientTest extends 
TransactionTestBase {
     }
 
     @Test
-    public void testTransactionBufferChannelUnActive() {
-        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+    public void testTransactionBufferChannelUnActive() throws 
PulsarServerException {
+        PulsarService pulsarService = pulsarServiceList.get(0);
+        PulsarClient mockClient = mock(PulsarClientImpl.class);
         CompletableFuture<ClientCnx> completableFuture = new 
CompletableFuture<>();
         ClientCnx clientCnx = mock(ClientCnx.class);
         completableFuture.complete(clientCnx);
-        
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+        
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
         ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
         when(clientCnx.ctx()).thenReturn(cnx);
         Channel channel = mock(Channel.class);
         when(cnx.channel()).thenReturn(channel);
 
         when(channel.isActive()).thenReturn(false);
+        when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>(){
+
+            @Override
+            public PulsarClient answer(InvocationOnMock invocation) throws 
Throwable {
+                return mockClient;
+            }
+        });
 
         @Cleanup("stop")
         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
         TransactionBufferHandlerImpl transactionBufferHandler =
-                new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 
1000, 3000);
+                new TransactionBufferHandlerImpl(pulsarServiceList.get(0), 
hashedWheelTimer, 1000, 3000);
         try {
             transactionBufferHandler.endTxnOnTopic("test", 1, 1, 
TxnAction.ABORT, 1).get();
             fail();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index 5241342635b..af4e442f617 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -18,42 +18,55 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.testng.annotations.Test;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 @Test(groups = "broker")
 public class TransactionBufferHandlerImplTest {
 
     @Test
-    public void testRequestCredits() {
-        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
-        
when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
-        TransactionBufferHandlerImpl handler = spy(
-                new TransactionBufferHandlerImpl(pulsarClient, null, 1000, 
3000));
+    public void testRequestCredits() throws PulsarServerException {
+        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        PulsarService pulsarService = mock(PulsarService.class);
+        NamespaceService namespaceService = mock(NamespaceService.class);
+        when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
+        when(pulsarService.getClient()).thenReturn(pulsarClient);
+        
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
+        Optional<NamespaceEphemeralData> opData = Optional.empty();
+        
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
+        
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+        TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
         doNothing().when(handler).endTxn(any());
+        
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
         for (int i = 0; i < 500; i++) {
-            handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+            handler.endTxnOnTopic("public/default/t", 1L, 1L, 
TxnAction.COMMIT, 1L);
         }
         assertEquals(handler.getAvailableRequestCredits(), 500);
         for (int i = 0; i < 500; i++) {
-            handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+            handler.endTxnOnTopic("public/default/t", 1L, 1L, 
TxnAction.COMMIT, 1L);
         }
         assertEquals(handler.getAvailableRequestCredits(), 0);
-        handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+        handler.endTxnOnTopic("public/default/t", 1L, 1L, TxnAction.COMMIT, 
1L);
         assertEquals(handler.getPendingRequestsCount(), 1);
         handler.onResponse(null);
         assertEquals(handler.getAvailableRequestCredits(), 0);
@@ -61,9 +74,11 @@ public class TransactionBufferHandlerImplTest {
     }
 
     @Test
-    public void testMinRequestCredits() {
-        TransactionBufferHandlerImpl handler = spy(
-                new TransactionBufferHandlerImpl(null, null, 50, 3000));
+    public void testMinRequestCredits() throws PulsarServerException {
+        PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+        PulsarService pulsarService = mock(PulsarService.class);
+        when(pulsarService.getClient()).thenReturn(pulsarClient);
+        TransactionBufferHandlerImpl handler = spy(new 
TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
         assertEquals(handler.getAvailableRequestCredits(), 100);
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6b448f9538b..890ea655059 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -28,6 +28,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -906,7 +907,12 @@ public class PulsarClientImpl implements PulsarClient {
     public CompletableFuture<ClientCnx> getConnection(final String topic) {
         TopicName topicName = TopicName.get(topic);
         return lookup.getBroker(topicName)
-                .thenCompose(pair -> cnxPool.getConnection(pair.getLeft(), 
pair.getRight()));
+                .thenCompose(pair -> getConnection(pair.getLeft(), 
pair.getRight()));
+    }
+
+    public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress 
logicalAddress,
+                                                      final InetSocketAddress 
physicalAddress) {
+        return cnxPool.getConnection(logicalAddress, physicalAddress);
     }
 
     /** visible for pulsar-functions. **/

Reply via email to