This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 46baae6a5e8 [improve][transaction] Optimize topic lookup when TC end
tx. (#14991)
46baae6a5e8 is described below
commit 46baae6a5e89a5feed171d231b59042a2ce9e1f8
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Apr 12 18:53:03 2022 +0800
[improve][transaction] Optimize topic lookup when TC end tx. (#14991)
### Motivation
When TC ends tx, it has to look up the topic. The original way uses
pulsar-client to do this. It will look up every topic.
If using bundle cache to find the topic owner broker, it can avoid lookup
every topic and then decrease the lookup time.
### Modifications
- Using bundle cache to find the topic owner broker. If occurs an error,
fall back to look up the topic.
- Remove the topic cache, because using bundle cache, the original cache is
useless.
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../buffer/impl/TransactionBufferClientImpl.java | 9 +-
.../buffer/impl/TransactionBufferHandlerImpl.java | 155 ++++++++++-----------
.../broker/transaction/TransactionProduceTest.java | 29 ++++
.../broker/transaction/TransactionTestBase.java | 2 +-
.../buffer/TransactionBufferClientTest.java | 37 +++--
.../buffer/TransactionBufferHandlerImplTest.java | 41 ++++--
.../pulsar/client/impl/PulsarClientImpl.java | 8 +-
8 files changed, 174 insertions(+), 109 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b060cbbcfcf..7369453c9d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -751,7 +751,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.transactionBufferSnapshotService = new
SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionTimer =
new HashedWheelTimer(new
DefaultThreadFactory("pulsar-transaction-timer"));
- transactionBufferClient =
TransactionBufferClientImpl.create(getClient(), transactionTimer,
+ transactionBufferClient =
TransactionBufferClientImpl.create(this, transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests(),
config.getTransactionBufferClientOperationTimeoutInMills());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 060476e573c..c531f9f1871 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
@@ -39,9 +40,9 @@ public class TransactionBufferClientImpl implements
TransactionBufferClient {
this.tbHandler = tbHandler;
}
- public static TransactionBufferClient create(PulsarClient pulsarClient,
HashedWheelTimer timer,
- int maxConcurrentRequests, long operationTimeoutInMills) {
- TransactionBufferHandler handler = new
TransactionBufferHandlerImpl(pulsarClient, timer,
+ public static TransactionBufferClient create(PulsarService pulsarService,
HashedWheelTimer timer,
+ int maxConcurrentRequests, long operationTimeoutInMills) throws
PulsarServerException {
+ TransactionBufferHandler handler = new
TransactionBufferHandlerImpl(pulsarService, timer,
maxConcurrentRequests, operationTimeoutInMills);
return new TransactionBufferClientImpl(handler);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index b80a273bc6f..3f9a083787b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -18,21 +18,23 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -42,7 +44,10 @@ import
org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@Slf4j
@@ -53,31 +58,17 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
private final AtomicLong requestIdGenerator = new AtomicLong();
private final long operationTimeoutInMills;
private final HashedWheelTimer timer;
- private final PulsarClient pulsarClient;
+ private final PulsarService pulsarService;
+ private final PulsarClientImpl pulsarClient;
private static final
AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER
=
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class,
"requestCredits");
private volatile int requestCredits;
- private final LoadingCache<String, CompletableFuture<ClientCnx>>
lookupCache = CacheBuilder.newBuilder()
- .maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES)
- .build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
- @Override
- public CompletableFuture<ClientCnx> load(String topic) {
- CompletableFuture<ClientCnx> siFuture =
getClientCnx(topic);
- siFuture.whenComplete((si, cause) -> {
- if (null != cause) {
- lookupCache.invalidate(topic);
- }
- });
- return siFuture;
- }
- });
-
- public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
HashedWheelTimer timer,
- int maxConcurrentRequests, long
operationTimeoutInMills) {
- this.pulsarClient = pulsarClient;
+ public TransactionBufferHandlerImpl(PulsarService pulsarService,
HashedWheelTimer timer,
+ int maxConcurrentRequests, long operationTimeoutInMills) throws
PulsarServerException {
+ this.pulsarService = pulsarService;
+ this.pulsarClient = (PulsarClientImpl) pulsarService.getClient();
this.outstandingRequests = new ConcurrentSkipListMap<>();
this.pendingRequests = new GrowableArrayBlockingQueue<>();
this.operationTimeoutInMills = operationTimeoutInMills;
@@ -97,15 +88,9 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits,
txnIdMostBits,
topic, action, lowWaterMark);
- try {
- OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb,
lookupCache.get(topic));
- if (checkRequestCredits(op)) {
- endTxn(op);
- }
- } catch (ExecutionException e) {
- log.error("[{}] failed to get client cnx from lookup cache",
topic, e);
- lookupCache.invalidate(topic);
- cb.completeExceptionally(new
PulsarClientException.LookupException(e.getCause().getMessage()));
+ OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb,
getClientCnx(topic));
+ if (checkRequestCredits(op)) {
+ endTxn(op);
}
return cb;
}
@@ -122,15 +107,9 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId,
txnIdLeastBits, txnIdMostBits,
topic, subscription, action, lowWaterMark);
- try {
- OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb,
lookupCache.get(topic));
- if (checkRequestCredits(op)) {
- endTxn(op);
- }
- } catch (ExecutionException e) {
- log.error("[{}] failed to get client cnx from lookup cache",
topic, e);
- lookupCache.invalidate(topic);
- cb.completeExceptionally(new
PulsarClientException.LookupException(e.getCause().getMessage()));
+ OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb,
getClientCnx(topic));
+ if (checkRequestCredits(op)) {
+ endTxn(op);
}
return cb;
}
@@ -150,8 +129,8 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
public void endTxn(OpRequestSend op) {
- op.cnx.whenComplete((clientCnx, throwable) -> {
- if (throwable == null) {
+ op.cnx.whenComplete((clientCnx, ex) -> {
+ if (ex == null) {
if (clientCnx.ctx().channel().isActive()) {
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
outstandingRequests.put(op.requestId, op);
@@ -166,16 +145,19 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
op.cmd.retain();
clientCnx.ctx().writeAndFlush(op.cmd,
clientCnx.ctx().voidPromise());
} else {
- invalidateLookupCache(op);
op.cb.completeExceptionally(
new PulsarClientException.LookupException(op.topic
+ " endTxn channel is not active"));
onResponse(op);
}
} else {
- log.error("endTxn error topic: [{}]", op.topic, throwable);
- invalidateLookupCache(op);
- op.cb.completeExceptionally(
- new
PulsarClientException.LookupException(throwable.getMessage()));
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("endTxn error topic: [{}]", op.topic, cause);
+ if (cause instanceof
PulsarClientException.BrokerMetadataException) {
+ op.cb.complete(null);
+ } else {
+ op.cb.completeExceptionally(
+ new
PulsarClientException.LookupException(cause.getMessage()));
+ }
onResponse(op);
}
});
@@ -202,12 +184,9 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
log.error("[{}] Got end txn on topic response for request {}
error {}", op.topic,
response.getRequestId(),
response.getError());
- invalidateLookupCache(op);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
}
- } catch (Exception e) {
- log.error("[{}] Got exception when complete EndTxnOnTopic op for
request {}", op.topic, e);
} finally {
onResponse(op);
}
@@ -235,12 +214,9 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
} else {
log.error("[{}] Got end txn on subscription response for
request {} error {}",
op.topic, response.getRequestId(),
response.getError());
- invalidateLookupCache(op);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
}
- } catch (Exception e) {
- log.error("[{}] Got exception when complete EndTxnOnSub op for
request {}", op.topic, e);
} finally {
onResponse(op);
}
@@ -262,21 +238,14 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits,
permits - 1)) {
OpRequestSend polled = pendingRequests.poll();
if (polled != null) {
- try {
- if (polled.cnx != lookupCache.get(polled.topic)) {
- OpRequestSend invalid = polled;
- polled =
OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
- lookupCache.get(invalid.topic));
- invalid.recycle();
- }
- endTxn(polled);
- } catch (ExecutionException e) {
- log.error("[{}] failed to get client cnx from
lookup cache", polled.topic, e);
- lookupCache.invalidate(polled.topic);
- polled.cb.completeExceptionally(new
PulsarClientException.LookupException(
- e.getCause().getMessage()));
- REQUEST_CREDITS_UPDATER.incrementAndGet(this);
+ CompletableFuture<ClientCnx> clientCnx =
getClientCnx(polled.topic);
+ if (polled.cnx != clientCnx) {
+ OpRequestSend invalid = polled;
+ polled = OpRequestSend.create(invalid.requestId,
invalid.topic, invalid.cmd, invalid.cb,
+ clientCnx);
+ invalid.recycle();
}
+ endTxn(polled);
} else {
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
}
@@ -287,16 +256,6 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
}
}
- private void invalidateLookupCache(OpRequestSend op) {
- try {
- if (lookupCache.get(op.topic) == op.cnx) {
- lookupCache.invalidate(op.topic);
- }
- } catch (ExecutionException e) {
- lookupCache.invalidate(op.topic);
- }
- }
-
public static final class OpRequestSend {
long requestId;
@@ -336,8 +295,42 @@ public class TransactionBufferHandlerImpl implements
TransactionBufferHandler {
};
}
+ public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
+ return pulsarClient.getConnection(topic);
+ }
+
public CompletableFuture<ClientCnx> getClientCnx(String topic) {
- return ((PulsarClientImpl) pulsarClient).getConnection(topic);
+ NamespaceService namespaceService =
pulsarService.getNamespaceService();
+ CompletableFuture<NamespaceBundle> nsBundle =
namespaceService.getBundleAsync(TopicName.get(topic));
+ return nsBundle
+ .thenCompose(bundle -> namespaceService.getOwnerAsync(bundle))
+ .thenCompose(data -> {
+ if (data.isPresent()) {
+ NamespaceEphemeralData ephemeralData = data.get();
+ try {
+ if (!ephemeralData.isDisabled()) {
+ URI uri;
+ if
(pulsarClient.getConfiguration().isUseTls()) {
+ uri = new
URI(ephemeralData.getNativeUrlTls());
+ } else {
+ uri = new
URI(ephemeralData.getNativeUrl());
+ }
+ InetSocketAddress brokerAddress =
+
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
+ return
pulsarClient.getConnection(brokerAddress, brokerAddress);
+ } else {
+ // Bundle is unloading, lookup topic
+ return getClientCnxWithLookup(topic);
+ }
+ } catch (URISyntaxException e) {
+ // Should never go here
+ return getClientCnxWithLookup(topic);
+ }
+ } else {
+ // Bundle is not loaded yet, lookup topic
+ return getClientCnxWithLookup(topic);
+ }
+ });
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 0d1bbda4568..48015c50cc6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,6 +89,35 @@ public class TransactionProduceTest extends
TransactionTestBase {
produceTest(true);
}
+ @Test
+ public void testDeleteNamespaceBeforeCommit() throws Exception {
+ final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
+ PulsarClient pulsarClient = this.pulsarClient;
+ Transaction tnx = pulsarClient.newTransaction()
+ .withTransactionTimeout(60, TimeUnit.SECONDS)
+ .build().get();
+ long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
+ long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
+ Assert.assertTrue(txnIdMostBits > -1);
+ Assert.assertTrue(txnIdLeastBits > -1);
+
+ @Cleanup
+ Producer<byte[]> outProducer = pulsarClient
+ .newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ String content = "Hello Txn";
+ outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
+
+ try {
+ admin.namespaces().deleteNamespace(NAMESPACE1, true);
+ } catch (Exception ignore) {}
+ tnx.commit().get();
+ }
+
@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 082ec451eeb..bdcde1be814 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -155,7 +155,7 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
conf.setConfigurationMetadataStoreUrl("zk:localhost:3181");
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setBookkeeperClientExposeStatsToPrometheus(true);
-
+ conf.setForceDeleteNamespaceAllowed(true);
conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index d85f6b42f23..66460778dc2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -30,10 +30,13 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
@@ -45,6 +48,8 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -80,7 +85,7 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(namespace, 10);
admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
partitions);
- tbClient = TransactionBufferClientImpl.create(pulsarClient,
+ tbClient = TransactionBufferClientImpl.create(pulsarServiceList.get(0),
new HashedWheelTimer(new
DefaultThreadFactory("transaction-buffer")), 1000, 3000);
}
@@ -145,22 +150,30 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
@Test
public void testTransactionBufferClientTimeout() throws Exception {
- PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ PulsarService pulsarService = pulsarServiceList.get(0);
+ PulsarClient mockClient = mock(PulsarClientImpl.class);
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
-
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
when(cnx.channel()).thenReturn(channel);
+ when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>(){
+
+ @Override
+ public PulsarClient answer(InvocationOnMock invocation) throws
Throwable {
+ return mockClient;
+ }
+ });
when(channel.isActive()).thenReturn(true);
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
- new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer,
1000, 3000);
+ new TransactionBufferHandlerImpl(pulsarService,
hashedWheelTimer, 1000, 3000);
CompletableFuture<TxnID> endFuture =
transactionBufferHandler.endTxnOnTopic("test", 1, 1,
TxnAction.ABORT, 1);
@@ -187,23 +200,31 @@ public class TransactionBufferClientTest extends
TransactionTestBase {
}
@Test
- public void testTransactionBufferChannelUnActive() {
- PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+ public void testTransactionBufferChannelUnActive() throws
PulsarServerException {
+ PulsarService pulsarService = pulsarServiceList.get(0);
+ PulsarClient mockClient = mock(PulsarClientImpl.class);
CompletableFuture<ClientCnx> completableFuture = new
CompletableFuture<>();
ClientCnx clientCnx = mock(ClientCnx.class);
completableFuture.complete(clientCnx);
-
when(mockClient.getConnection(anyString())).thenReturn(completableFuture);
+
when(((PulsarClientImpl)mockClient).getConnection(anyString())).thenReturn(completableFuture);
ChannelHandlerContext cnx = mock(ChannelHandlerContext.class);
when(clientCnx.ctx()).thenReturn(cnx);
Channel channel = mock(Channel.class);
when(cnx.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(false);
+ when(pulsarService.getClient()).thenAnswer(new Answer<PulsarClient>(){
+
+ @Override
+ public PulsarClient answer(InvocationOnMock invocation) throws
Throwable {
+ return mockClient;
+ }
+ });
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
- new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer,
1000, 3000);
+ new TransactionBufferHandlerImpl(pulsarServiceList.get(0),
hashedWheelTimer, 1000, 3000);
try {
transactionBufferHandler.endTxnOnTopic("test", 1, 1,
TxnAction.ABORT, 1).get();
fail();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
index 5241342635b..af4e442f617 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java
@@ -18,42 +18,55 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
-
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.testng.annotations.Test;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@Test(groups = "broker")
public class TransactionBufferHandlerImplTest {
@Test
- public void testRequestCredits() {
- PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
-
when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
- TransactionBufferHandlerImpl handler = spy(
- new TransactionBufferHandlerImpl(pulsarClient, null, 1000,
3000));
+ public void testRequestCredits() throws PulsarServerException {
+ PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+ PulsarService pulsarService = mock(PulsarService.class);
+ NamespaceService namespaceService = mock(NamespaceService.class);
+ when(pulsarService.getNamespaceService()).thenReturn(namespaceService);
+ when(pulsarService.getClient()).thenReturn(pulsarClient);
+
when(namespaceService.getBundleAsync(any())).thenReturn(CompletableFuture.completedFuture(mock(NamespaceBundle.class)));
+ Optional<NamespaceEphemeralData> opData = Optional.empty();
+
when(namespaceService.getOwnerAsync(any())).thenReturn(CompletableFuture.completedFuture(opData));
+
when(((PulsarClientImpl)pulsarClient).getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
+ TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
+
doReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))).when(handler).getClientCnx(anyString());
for (int i = 0; i < 500; i++) {
- handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+ handler.endTxnOnTopic("public/default/t", 1L, 1L,
TxnAction.COMMIT, 1L);
}
assertEquals(handler.getAvailableRequestCredits(), 500);
for (int i = 0; i < 500; i++) {
- handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+ handler.endTxnOnTopic("public/default/t", 1L, 1L,
TxnAction.COMMIT, 1L);
}
assertEquals(handler.getAvailableRequestCredits(), 0);
- handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
+ handler.endTxnOnTopic("public/default/t", 1L, 1L, TxnAction.COMMIT,
1L);
assertEquals(handler.getPendingRequestsCount(), 1);
handler.onResponse(null);
assertEquals(handler.getAvailableRequestCredits(), 0);
@@ -61,9 +74,11 @@ public class TransactionBufferHandlerImplTest {
}
@Test
- public void testMinRequestCredits() {
- TransactionBufferHandlerImpl handler = spy(
- new TransactionBufferHandlerImpl(null, null, 50, 3000));
+ public void testMinRequestCredits() throws PulsarServerException {
+ PulsarClient pulsarClient = mock(PulsarClientImpl.class);
+ PulsarService pulsarService = mock(PulsarService.class);
+ when(pulsarService.getClient()).thenReturn(pulsarClient);
+ TransactionBufferHandlerImpl handler = spy(new
TransactionBufferHandlerImpl(pulsarService, null, 50, 3000));
assertEquals(handler.getAvailableRequestCredits(), 100);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6b448f9538b..890ea655059 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -28,6 +28,7 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
@@ -906,7 +907,12 @@ public class PulsarClientImpl implements PulsarClient {
public CompletableFuture<ClientCnx> getConnection(final String topic) {
TopicName topicName = TopicName.get(topic);
return lookup.getBroker(topicName)
- .thenCompose(pair -> cnxPool.getConnection(pair.getLeft(),
pair.getRight()));
+ .thenCompose(pair -> getConnection(pair.getLeft(),
pair.getRight()));
+ }
+
+ public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress
logicalAddress,
+ final InetSocketAddress
physicalAddress) {
+ return cnxPool.getConnection(logicalAddress, physicalAddress);
}
/** visible for pulsar-functions. **/