This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2439eda30fe08978f4254d3698972374155852b0 Author: congbo <[email protected]> AuthorDate: Mon Aug 9 23:44:47 2021 +0800 [Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. (#11304) now, when topic have been deleted we will not handle the `endTxnOnTopic` and endTxnOnSub. now, when sub have been deleted we will not handle the `endTxnOnSub`. when topic or sub have been deleted, we should return success to tc on `endTxnOnTopic` and endTxnOnSub operation. when topic not exist in this broker, we should judge the topic if it have been created. if not created, we should return success. (cherry picked from commit a6e66dd38bbb9edc552fe6877d0d600bd6d5506b) --- .../bookkeeper/mledger/ManagedLedgerFactory.java | 11 ++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 5 + .../apache/bookkeeper/mledger/impl/MetaStore.java | 12 ++ .../bookkeeper/mledger/impl/MetaStoreImpl.java | 6 + .../apache/pulsar/broker/service/ServerCnx.java | 167 ++++++++++------ .../broker/transaction/TransactionTestBase.java | 5 +- .../buffer/TransactionBufferClientTest.java | 219 ++++++++++----------- .../apache/pulsar/common/protocol/Commands.java | 5 +- 8 files changed, 247 insertions(+), 183 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index 4e103d0..cfc1d3d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.bookkeeper.common.annotation.InterfaceAudience; @@ -160,4 +161,14 @@ public interface ManagedLedgerFactory { */ void shutdown() throws InterruptedException, ManagedLedgerException; + /** + * Check managed ledger has been initialized before. + * + * @param ledgerName {@link String} + * @return a future represents the result of the operation. + * an instance of {@link Boolean} is returned + * if the operation succeeds. + */ + CompletableFuture<Boolean> asyncExists(String ledgerName); + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 16d577a..699dc68 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -525,6 +525,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { } @Override + public CompletableFuture<Boolean> asyncExists(String ledgerName) { + return store.asyncExists(ledgerName); + } + + @Override public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException { class Result { ManagedLedgerInfo info = null; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 8b99203..9f1563b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.mledger.impl; import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; @@ -129,4 +131,14 @@ public interface MetaStore { * @throws MetaStoreException */ Iterable<String> getManagedLedgers() throws MetaStoreException; + + /** + * Check ledger exists. + * + * @param ledgerName {@link String} + * @return a future represents the result of the operation. + * an instance of {@link Boolean} is returned + * if the operation succeeds. + */ + CompletableFuture<Boolean> asyncExists(String ledgerName); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 9558d11..a295114 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -23,6 +23,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import io.netty.buffer.ByteBuf; @@ -257,6 +258,11 @@ public class MetaStoreImpl implements MetaStore { } } + @Override + public CompletableFuture<Boolean> asyncExists(String path) { + return store.exists(PREFIX + path); + } + // // update timestamp if missing or 0 // 3 cases - timestamp does not exist for ledgers serialized before diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index e0a056a..ac50c53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1981,43 +1981,68 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { final String topic = command.getTopic(); final int txnAction = command.getTxnAction().getValue(); TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark(); if (log.isDebugEnabled()) { log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic, txnID, txnAction); } - CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); - if (topicFuture != null) { - topicFuture.whenComplete((optionalTopic, t) -> { - if (!optionalTopic.isPresent()) { - log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, " - + "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction)); - ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( - requestId, ServerError.ServiceNotReady, - "Topic " + topic + " is not found.")); - return; - } - optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark()) + CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); + topicFuture.thenAccept(optionalTopic -> { + if (optionalTopic.isPresent()) { + optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark) .whenComplete((ignored, throwable) -> { if (throwable != null) { - log.error("Handle endTxnOnPartition {} failed.", topic, throwable); + log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], " + + "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable); ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( requestId, BrokerServiceException.getClientErrorCode(throwable), - throwable.getMessage())); + throwable.getMessage(), + txnID.getLeastSigBits(), txnID.getMostSigBits())); return; } ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); }); - }); - } else { - log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, " - + "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction)); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), - ServerError.ServiceNotReady, - "The topic " + topic + " is not exist in broker.")); - } + + } else { + getBrokerService().getManagedLedgerFactory() + .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()) + .thenAccept((b) -> { + if (b) { + log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, " + + "txnId: [{}], txnAction: [{}]", topic, + txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, + ServerError.ServiceNotReady, + "The topic " + topic + " does not exist in broker.", + txnID.getMostSigBits(), txnID.getLeastSigBits())); + } else { + log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, " + + "txnId: [{}], txnAction: [{}]", + topic, txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, + txnID.getLeastSigBits(), txnID.getMostSigBits())); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnPartition fail ! topic {} , " + + "txnId: [{}], txnAction: [{}]", topic, txnID, + TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( + requestId, ServerError.ServiceNotReady, + e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits())); + return null; + }); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnPartition fail ! topic {} , " + + "txnId: [{}], txnAction: [{}]", topic, txnID, + TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( + requestId, ServerError.ServiceNotReady, + e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits())); + return null; + }); } @Override @@ -2028,70 +2053,82 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { final String topic = command.getSubscription().getTopic(); final String subName = command.getSubscription().getSubscription(); final int txnAction = command.getTxnAction().getValue(); + final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); + final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark(); if (log.isDebugEnabled()) { - log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, + log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, subName, new TxnID(txnidMostBits, txnidLeastBits), txnAction); } - CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); - if (topicFuture != null) { - topicFuture.thenAccept(optionalTopic -> { - - if (!optionalTopic.isPresent()) { - log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: " - + "[{}], txnAction: [{}]", topic, - new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - ServerError.ServiceNotReady, - "The topic " + topic + " is not exist in broker.")); - return; - } - + CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); + topicFuture.thenAccept(optionalTopic -> { + if (optionalTopic.isPresent()) { Subscription subscription = optionalTopic.get().getSubscription(subName); if (subscription == null) { - log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - ServerError.ServiceNotReady, - "Topic " + optionalTopic.get().getName() - + " subscription " + subName + " is not exist.")); + log.warn("handleEndTxnOnSubscription fail! " + + "topic {} subscription {} does not exist. txnId: [{}], txnAction: [{}]", + optionalTopic.get().getName(), subName, txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush( + Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); return; } CompletableFuture<Void> completableFuture = - subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, - command.getTxnidLeastBitsOfLowWatermark()); - completableFuture.whenComplete((ignored, throwable) -> { - if (throwable != null) { - log.error("Handle end txn on subscription failed for request {}", requestId, throwable); + subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark); + completableFuture.whenComplete((ignored, e) -> { + if (e != null) { + log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( requestId, txnidLeastBits, txnidMostBits, - BrokerServiceException.getClientErrorCode(throwable), + BrokerServiceException.getClientErrorCode(e), "Handle end txn on subscription failed.")); return; } ctx.writeAndFlush( Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); }); - }).exceptionally(e -> { - log.error("Handle end txn on subscription failed for request {}", requestId, e); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - ServerError.ServiceNotReady, - "Handle end txn on subscription failed.")); - return null; - }); - } else { - log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: " - + "[{}], txnAction: [{}]", topic, - new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); + } else { + getBrokerService().getManagedLedgerFactory() + .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()) + .thenAccept((b) -> { + if (b) { + log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, " + + "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName, + new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( + requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), + ServerError.ServiceNotReady, + "The topic " + topic + " does not exist in broker.")); + } else { + log.warn("handleEndTxnOnSubscription fail ! The topic {} has not been created, " + + "subscription: {} txnId: [{}], txnAction: [{}]", + topic, subName, txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, + txnID.getLeastSigBits(), txnID.getMostSigBits())); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( + requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), + ServerError.ServiceNotReady, e.getMessage())); + return null; + }); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( requestId, txnidLeastBits, txnidMostBits, ServerError.ServiceNotReady, - "The topic " + topic + " is not exist in broker.")); - } + "Handle end txn on subscription failed.")); + return null; + }); } private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 2c3cb4d..67ecd87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -21,8 +21,9 @@ package org.apache.pulsar.broker.transaction; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; import java.util.ArrayList; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index 6081580..08de910 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -18,16 +18,10 @@ */ package org.apache.pulsar.broker.transaction.buffer; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doReturn; - import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; import lombok.Cleanup; @@ -37,15 +31,12 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.pulsar.broker.intercept.MockBrokerInterceptor; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl; -import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; @@ -56,13 +47,12 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.lang.reflect.Field; import java.util.concurrent.ConcurrentSkipListMap; @@ -74,76 +64,38 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @Test(groups = "broker") -public class TransactionBufferClientTest extends TransactionMetaStoreTestBase { +public class TransactionBufferClientTest extends TransactionTestBase { private static final Logger log = LoggerFactory.getLogger(TransactionBufferClientTest.class); private TransactionBufferClient tbClient; TopicName partitionedTopicName = TopicName.get("persistent", "public", "test", "tb-client"); int partitions = 10; - BrokerService[] brokerServices; - private final static String namespace = "public/test"; - - private EventLoopGroup eventLoopGroup; + private static final String namespace = "public/test"; @Override - protected void afterSetup() throws Exception { - pulsarAdmins[0].clusters().createCluster("my-cluster", ClusterData.builder().serviceUrl(pulsarServices[0].getWebServiceAddress()).build()); - pulsarAdmins[0].tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster"))); - pulsarAdmins[0].namespaces().createNamespace(namespace, 10); - pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); - String subName = "test"; - pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(), subName, MessageId.latest); + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + setBrokerCount(3); + internalSetup(); + String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); + String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(namespace, 10); + admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); pulsarClient.newConsumer() .topic(partitionedTopicName.getPartitionedTopicName()) - .subscriptionName(subName).subscribe(); - tbClient = TransactionBufferClientImpl.create( - ((PulsarClientImpl) pulsarClient), + .subscriptionName("test").subscribe(); + tbClient = TransactionBufferClientImpl.create(pulsarClient, new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer"))); } @Override + @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - if (tbClient != null) { - tbClient.close(); - } - if (brokerServices != null) { - for (BrokerService bs : brokerServices) { - bs.close(); - } - brokerServices = null; - } - super.cleanup(); - eventLoopGroup.shutdownGracefully().get(); - } - - @Override - protected void afterPulsarStart() throws Exception { - eventLoopGroup = new NioEventLoopGroup(); - brokerServices = new BrokerService[pulsarServices.length]; - AtomicLong atomicLong = new AtomicLong(0); - for (int i = 0; i < pulsarServices.length; i++) { - Subscription mockSubscription = mock(Subscription.class); - Mockito.when(mockSubscription.endTxn(Mockito.anyLong(), - Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong())) - .thenReturn(CompletableFuture.completedFuture(null)); - - Topic mockTopic = mock(Topic.class); - Mockito.when(mockTopic.endTxn(any(), Mockito.anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.when(mockTopic.getSubscription(any())).thenReturn(mockSubscription); - - ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap = - mock(ConcurrentOpenHashMap.class); - Mockito.when(topicMap.get(Mockito.anyString())).thenReturn( - CompletableFuture.completedFuture(Optional.of(mockTopic))); - - BrokerService brokerService = Mockito.spy(new BrokerService(pulsarServices[i], eventLoopGroup)); - doReturn(new MockBrokerInterceptor()).when(brokerService).getInterceptor(); - doReturn(atomicLong.getAndIncrement() + "").when(brokerService).generateUniqueProducerName(); - brokerServices[i] = brokerService; - Mockito.when(brokerService.getTopics()).thenReturn(topicMap); - Mockito.when(pulsarServices[i].getBrokerService()).thenReturn(brokerService); - } + tbClient.close(); + super.internalCleanup(); } @Test @@ -199,43 +151,6 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase { } @Test - public void testTransactionBufferOpFail() throws InterruptedException, ExecutionException { - ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>[] originalMaps = - new ConcurrentOpenHashMap[brokerServices.length]; - ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap = new ConcurrentOpenHashMap<>(); - for (int i = 0; i < brokerServices.length; i++) { - originalMaps[i] = brokerServices[i].getTopics(); - when(brokerServices[i].getTopics()).thenReturn(topicMap); - } - - try { - tbClient.abortTxnOnSubscription( - partitionedTopicName.getPartition(0).toString(), "test", 1L, 1, -1L).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof PulsarClientException.LookupException); - } - - try { - tbClient.abortTxnOnTopic( - partitionedTopicName.getPartition(0).toString(), 1L, 1, -1L).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof PulsarClientException.LookupException); - } - - for (int i = 0; i < brokerServices.length; i++) { - when(brokerServices[i].getTopics()).thenReturn(originalMaps[i]); - } - - tbClient.abortTxnOnSubscription( - partitionedTopicName.getPartition(0).toString(), "test", 1L, 1, -1L).get(); - - tbClient.abortTxnOnTopic( - partitionedTopicName.getPartition(0).toString(), 1L, 1, -1L).get(); - } - - @Test public void testTransactionBufferClientTimeout() throws Exception { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>(); @@ -314,12 +229,27 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase { } @Test - public void testTransactionBufferLookUp() throws ExecutionException, InterruptedException { + public void testTransactionBufferLookUp() throws Exception { String topic = "persistent://" + namespace + "/testTransactionBufferLookUp"; - tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get(); - tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get(); - tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get(); - tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get(); + String subName = "test"; + + String abortTopic = topic + "_abort_sub"; + String commitTopic = topic + "_commit_sub"; + admin.topics().createNonPartitionedTopic(abortTopic); + admin.topics().createSubscription(abortTopic, subName, MessageId.earliest); + + admin.topics().createNonPartitionedTopic(commitTopic); + admin.topics().createSubscription(commitTopic, subName, MessageId.earliest); + + waitPendingAckInit(abortTopic, subName); + waitPendingAckInit(commitTopic, subName); + + tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get(); + + tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get(); + + tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get(); + tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get(); } @Test @@ -333,10 +263,69 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase { field.setAccessible(true); field.set(transactionBufferHandler, new Semaphore(2)); - String topic = "persistent://" + namespace + "/testTransactionBufferLookUp"; - tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get(); + + String topic = "persistent://" + namespace + "/testTransactionBufferHandlerSemaphore"; + String subName = "test"; + + String abortTopic = topic + "_abort_sub"; + String commitTopic = topic + "_commit_sub"; + + admin.topics().createNonPartitionedTopic(abortTopic); + admin.topics().createSubscription(abortTopic, subName, MessageId.earliest); + + admin.topics().createNonPartitionedTopic(commitTopic); + admin.topics().createSubscription(commitTopic, subName, MessageId.earliest); + + waitPendingAckInit(abortTopic, subName); + waitPendingAckInit(commitTopic, subName); + tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get(); + + tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get(); + + tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get(); + tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get(); + } + + @Test + public void testEndTopicNotExist() throws Exception { + String topic = "persistent://" + namespace + "/testEndTopicNotExist"; + String sub = "test"; + tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get(); - tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get(); tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get(); + + tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get(); + tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get(); + } + + @Test + public void testEndSubNotExist() throws Exception { + + String topic = "persistent://" + namespace + "/testEndTopicNotExist"; + String sub = "test"; + admin.topics().createNonPartitionedTopic(topic + "_abort_sub"); + + admin.topics().createNonPartitionedTopic(topic + "_commit_sub"); + + tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get(); + tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get(); + } + + private void waitPendingAckInit(String topic, String sub) throws Exception { + + boolean exist = false; + for (int i = 0; i < getPulsarServiceList().size(); i++) { + CompletableFuture<Optional<Topic>> completableFuture = getPulsarServiceList().get(i) + .getBrokerService().getTopics().get(topic); + if (completableFuture != null) { + PersistentSubscription persistentSubscription = + (PersistentSubscription) completableFuture.get().get().getSubscription(sub); + Awaitility.await().untilAsserted(() -> + assertEquals(persistentSubscription.getTransactionPendingAckStats().state, "Ready")); + exist = true; + } + } + + assertTrue(exist); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7c93ecc..16ffd96 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1314,10 +1314,13 @@ public class Commands { return serializeWithSize(cmd); } - public static ByteBuf newEndTxnOnPartitionResponse(long requestId, ServerError error, String errorMsg) { + public static ByteBuf newEndTxnOnPartitionResponse(long requestId, ServerError error, String errorMsg, + long txnIdLeastBits, long txnIdMostBits) { BaseCommand cmd = localCmd(Type.END_TXN_ON_PARTITION_RESPONSE); CommandEndTxnOnPartitionResponse response = cmd.setEndTxnOnPartitionResponse() .setRequestId(requestId) + .setTxnidMostBits(txnIdMostBits) + .setTxnidLeastBits(txnIdLeastBits) .setError(error); if (errorMsg != null) { response.setMessage(errorMsg);
