This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4b59b38b06d2285f894340116cfe4356d9d49d8b Author: congbo <[email protected]> AuthorDate: Tue Dec 1 12:56:27 2020 +0800 [Transaction] Fix transaction log replay not handle right. (#8723) 1. Transaction log replay don't handle right, because when replay compare lastConfirm.entryId, it is not right to replay. 2. Transaction handle proto is async, but recycle the command will happen before send response, we use requestId or others will not right. 3. We should afferent the managedLedgerConfig when we use MLTransactionLog, we can config the TransactionLog manageLedger. 4. add the TransactionNotFound exception in proto, it will return when transaction commit or abort and others transaction operation repeat or error handle. 1. we compare the markerDeletePosition and lastConfirmPosition to replay. 2. use local variable replace command get method. 3. afferent the managedLedgerConfig. (cherry picked from commit 411bf71006039af661be13c54dda4ce074f97679) --- conf/broker.conf | 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../broker/TransactionMetadataStoreService.java | 27 +++++--- .../broker/service/BrokerServiceException.java | 2 + .../apache/pulsar/broker/service/ServerCnx.java | 73 ++++++++++++---------- .../TransactionMetadataStoreServiceTest.java | 17 +++-- .../TransactionCoordinatorClientTest.java | 2 +- .../client/impl/TransactionEndToEndTest.java | 8 +-- .../TransactionCoordinatorClientException.java | 9 +++ pulsar-client-cpp/include/pulsar/Result.h | 1 + pulsar-client-cpp/lib/ClientConnection.cc | 3 + pulsar-client-cpp/lib/Result.cc | 3 + .../client/impl/TransactionMetaStoreHandler.java | 2 + .../apache/pulsar/common/api/proto/PulsarApi.java | 3 + pulsar-common/src/main/proto/PulsarApi.proto | 1 + .../TransactionMetadataStoreProvider.java | 9 ++- .../InMemTransactionMetadataStoreProvider.java | 5 +- .../coordinator/impl/MLTransactionLogImpl.java | 27 ++++---- .../impl/MLTransactionMetadataStoreProvider.java | 10 +-- .../MLTransactionMetadataStoreTest.java | 14 +++-- .../TransactionMetadataStoreProviderTest.java | 2 +- 21 files changed, 142 insertions(+), 80 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index f22fb3c..97d4ff0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1175,4 +1175,4 @@ brokerServicePurgeInactiveFrequencyInSeconds=60 # Enable transaction coordinator in broker transactionCoordinatorEnabled=false -transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider +transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f6e3d35..f4a8b75 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1860,7 +1860,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Class name for transaction metadata store provider" ) private String transactionMetadataStoreProviderClassName = - "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider"; + "org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider"; @FieldContext( category = CATEGORY_TRANSACTION, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 2a65b14..301b393 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -41,6 +41,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvide import org.apache.pulsar.transaction.coordinator.TransactionSubscription; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,15 +117,23 @@ public class TransactionMetadataStoreService { } public void addTransactionMetadataStore(TransactionCoordinatorID tcId) { - transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory()) - .whenComplete((store, ex) -> { - if (ex != null) { - LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex); - } else { - stores.put(tcId, store); - LOG.info("Added new transaction meta store {}", tcId); - } - }); + pulsarService.getBrokerService() + .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId)) + .whenComplete((v ,e) -> { + if (e != null) { + LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e); + } else { + transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v) + .whenComplete((store, ex) -> { + if (ex != null) { + LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex); + } else { + stores.put(tcId, store); + LOG.info("Added new transaction meta store {}", tcId); + } + }); + } + }); } public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 4564617..9c6b2d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -219,6 +219,8 @@ public class BrokerServiceException extends Exception { return ServerError.NotAllowedError; } else if (t instanceof TransactionConflictException) { return ServerError.TransactionConflict; + } else if (t instanceof CoordinatorException.TransactionNotFoundException) { + return ServerError.TransactionNotFound; } else { if (checkCauseIfUnknown) { return getClientErrorCode(t.getCause(), false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a99ee3e..06d5b75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -39,6 +39,7 @@ import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Promise; import java.net.SocketAddress; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; @@ -1268,18 +1269,21 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { protected void handleAck(CommandAck ack) { checkArgument(state == State.Connected); CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId()); + final long requestId = ack.getRequestId(); + final boolean hasRequestId = ack.hasRequestId(); + final long consumerId = ack.getConsumerId(); if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> { - if (ack.hasRequestId()) { + if (hasRequestId) { ctx.writeAndFlush(Commands.newAckResponse( - ack.getRequestId(), null, null, ack.getConsumerId())); + requestId, null, null, consumerId)); } }).exceptionally(e -> { - if (ack.hasRequestId()) { - ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(), + if (hasRequestId) { + ctx.writeAndFlush(Commands.newAckResponse(requestId, BrokerServiceException.getClientErrorCode(e), - e.getMessage(), ack.getConsumerId())); + e.getMessage(), consumerId)); } return null; }); @@ -1666,45 +1670,47 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { @Override protected void handleNewTxn(CommandNewTxn command) { + final long requestId = command.getRequestId(); + final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId()); if (log.isDebugEnabled()) { - log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress); + log.debug("Receive new txn request {} to transaction meta store {} from {}.", requestId, tcId, remoteAddress); } - TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId()); service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds()) .whenComplete(((txnID, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { - log.debug("Send response {} for new txn request {}", tcId.getId(), command.getRequestId()); + log.debug("Send response {} for new txn request {}", tcId.getId(), requestId); } - ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), txnID.getLeastSigBits(), txnID.getMostSigBits())); + ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); } else { if (log.isDebugEnabled()) { - log.debug("Send response error for new txn request {}", command.getRequestId(), ex); + log.debug("Send response error for new txn request {}", requestId, ex); } - ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); + ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); } })); } @Override protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) { - TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + final long requestId = command.getRequestId(); if (log.isDebugEnabled()) { - log.debug("Receive add published partition to txn request {} from {} with txnId {}", command.getRequestId(), remoteAddress, txnID); + log.debug("Receive add published partition to txn request {} from {} with txnId {}", requestId, remoteAddress, txnID); } service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList()) .whenComplete(((v, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { - log.debug("Send response success for add published partition to txn request {}", command.getRequestId()); + log.debug("Send response success for add published partition to txn request {}", requestId); } - ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), + ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); } else { if (log.isDebugEnabled()) { - log.debug("Send response error for add published partition to txn request {}", command.getRequestId(), ex); + log.debug("Send response error for add published partition to txn request {}", requestId, ex); } - ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), txnID.getMostSigBits(), + ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); } })); @@ -1723,7 +1729,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { txnID.getLeastSigBits(), txnID.getMostSigBits())); }).exceptionally(throwable -> { log.error("Send response error for end txn request.", throwable); - ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(), + ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage())); return null; }); } @@ -1731,20 +1737,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { @Override protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition command) { final long requestId = command.getRequestId(); + final String topic = command.getTopic(); + final List<MessageIdData> messageIdDataList = command.getMessageIdList(); final int txnAction = command.getTxnAction().getNumber(); TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); - service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic, t) -> { - if (!topic.isPresent()) { + service.getTopics().get(TopicName.get(topic).toString()).whenComplete((optionalTopic, t) -> { + if (!optionalTopic.isPresent()) { ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( - command.getRequestId(), ServerError.TopicNotFound, - "Topic " + command.getTopic() + " is not found.")); + requestId, ServerError.TopicNotFound, + "Topic " + topic + " is not found.")); return; } - topic.get().endTxn(txnID, txnAction, command.getMessageIdList()) + optionalTopic.get().endTxn(txnID, txnAction, messageIdDataList) .whenComplete((ignored, throwable) -> { if (throwable != null) { - log.error("Handle endTxnOnPartition {} failed.", command.getTopic(), throwable); + log.error("Handle endTxnOnPartition {} failed.", topic, throwable); ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( requestId, ServerError.UnknownError, throwable.getMessage())); return; @@ -1764,10 +1772,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { final String subName = command.getSubscription().getSubscription(); final int txnAction = command.getTxnAction().getNumber(); - service.getTopics().get(TopicName.get(command.getSubscription().getTopic()).toString()) + service.getTopics().get(TopicName.get(topic).toString()) .thenAccept(optionalTopic -> { if (!optionalTopic.isPresent()) { - log.error("The topic {} is not exist in broker.", command.getSubscription().getTopic()); + log.error("The topic {} is not exist in broker.", topic); ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( requestId, txnidLeastBits, txnidMostBits, ServerError.UnknownError, @@ -1823,10 +1831,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { @Override protected void handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn command) { - TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + final long requestId = command.getRequestId(); if (log.isDebugEnabled()) { log.debug("Receive add published partition to txn request {} from {} with txnId {}", - command.getRequestId(), remoteAddress, txnID); + requestId, remoteAddress, txnID); } service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, @@ -1835,17 +1844,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (ex == null) { if (log.isDebugEnabled()) { log.debug("Send response success for add published partition to txn request {}", - command.getRequestId()); + requestId); } - ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(), + ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); log.info("handle add partition to txn finish."); } else { if (log.isDebugEnabled()) { log.debug("Send response error for add published partition to txn request {}", - command.getRequestId(), ex); + requestId, ex); } - ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(), + ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage())); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index 8b91aea..99fb073 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus; +import org.awaitility.Awaitility; import org.junit.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -33,6 +34,7 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; @@ -58,10 +60,12 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { Assert.assertNotNull(transactionMetadataStoreService); transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0)); - Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1); + Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> + transactionMetadataStoreService.getStores().size() == 1); transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)); - Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0); + Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> + transactionMetadataStoreService.getStores().size() == 0); } @Test @@ -70,7 +74,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0)); transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1)); transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2)); - Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3); + Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> + transactionMetadataStoreService.getStores().size() == 3); TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get(); TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 0).get(); TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 0).get(); @@ -87,7 +92,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { public void testAddProducedPartitionToTxn() throws ExecutionException, InterruptedException { TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0)); - Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1); + Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> + transactionMetadataStoreService.getStores().size() == 1); TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get(); List<String> partitions = new ArrayList<>(); partitions.add("ptn-0"); @@ -104,7 +110,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase { public void testAddAckedPartitionToTxn() throws ExecutionException, InterruptedException { TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0)); - Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1); + Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS).until(() -> + transactionMetadataStoreService.getStores().size() == 1); TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get(); List<TransactionSubscription> partitions = new ArrayList<>(); partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java index c63db84..d37a5dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java @@ -94,7 +94,7 @@ public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBa try { transactionCoordinatorClient.abort(txnID, Collections.emptyList()); Assert.fail("Should be fail, because the txn is in committing state, can't abort now."); - } catch (TransactionCoordinatorClientException.InvalidTxnStatusException ignore) { + } catch (TransactionCoordinatorClientException ignore) { // Ok here } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index fc8532d..db3d9fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -54,7 +54,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; -import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.internal.DefaultImplementation; @@ -328,8 +328,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { // recommit one transaction should be failed log.info("expected exception for recommit one transaction."); Assert.assertNotNull(reCommitError); - Assert.assertTrue(reCommitError.getCause() instanceof - TransactionCoordinatorClientException.InvalidTxnStatusException); + Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); } } } @@ -533,8 +532,7 @@ public class TransactionEndToEndTest extends TransactionTestBase { // recommit one transaction should be failed log.info("expected exception for recommit one transaction."); Assert.assertNotNull(reCommitError); - Assert.assertTrue(reCommitError.getCause() instanceof - TransactionCoordinatorClientException.InvalidTxnStatusException); + Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); } message = consumer.receive(1, TimeUnit.SECONDS); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java index d1ad443..0e1f6c7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java @@ -71,6 +71,15 @@ public class TransactionCoordinatorClientException extends IOException { } /** + * Thrown when transaction not found in transaction coordinator. + */ + public static class TransactionNotFoundException extends TransactionCoordinatorClientException { + public TransactionNotFoundException(String message) { + super(message); + } + } + + /** * Thrown when transaction meta store handler not exists. */ public static class MetaStoreHandlerNotExistsException extends TransactionCoordinatorClientException { diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h index 01a2474..1106aae 100644 --- a/pulsar-client-cpp/include/pulsar/Result.h +++ b/pulsar-client-cpp/include/pulsar/Result.h @@ -84,6 +84,7 @@ enum Result ResultInvalidTxnStatusError, /// Invalid txn status error ResultNotAllowedError, /// Not allowed ResultTransactionConflict, /// Transaction ack conflict + ResultTransactionNotFound, /// Transaction not found }; // Return string representation of result code diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 824d54a..d17a9b6 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -123,6 +123,9 @@ static Result getResult(ServerError serverError) { case TransactionConflict: return ResultTransactionConflict; + + case TransactionNotFound: + return ResultTransactionNotFound; } // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc index 5f074a4..89dfe8e 100644 --- a/pulsar-client-cpp/lib/Result.cc +++ b/pulsar-client-cpp/lib/Result.cc @@ -150,6 +150,9 @@ const char* strResult(Result result) { case ResultTransactionConflict: return "ResultTransactionConflict"; + + case ResultTransactionNotFound: + return "ResultTransactionNotFound"; }; // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 2b4fa48..64688f6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -379,6 +379,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg); case InvalidTxnStatus: return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg); + case TransactionNotFound: + return new TransactionCoordinatorClientException.TransactionNotFoundException(msg); default: return new TransactionCoordinatorClientException(msg); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index b30c405..9e1e9d3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -84,6 +84,7 @@ public final class PulsarApi { InvalidTxnStatus(21, 21), NotAllowedError(22, 22), TransactionConflict(23, 23), + TransactionNotFound(24, 24), ; public static final int UnknownError_VALUE = 0; @@ -110,6 +111,7 @@ public final class PulsarApi { public static final int InvalidTxnStatus_VALUE = 21; public static final int NotAllowedError_VALUE = 22; public static final int TransactionConflict_VALUE = 23; + public static final int TransactionNotFound_VALUE = 24; public final int getNumber() { return value; } @@ -140,6 +142,7 @@ public final class PulsarApi { case 21: return InvalidTxnStatus; case 22: return NotAllowedError; case 23: return TransactionConflict; + case 24: return TransactionNotFound; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index e722708..5c12ca3 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -205,6 +205,7 @@ enum ServerError { NotAllowedError = 22; // Not allowed error TransactionConflict = 23; // Ack with transaction conflict + TransactionNotFound = 24; // Transaction not found } enum AuthMethod { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java index 4030558..bc55f77 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java @@ -24,7 +24,7 @@ import com.google.common.annotations.Beta; import java.io.IOException; import java.util.concurrent.CompletableFuture; - +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; /** @@ -58,10 +58,15 @@ public interface TransactionMetadataStoreProvider { * Open the transaction metadata store for transaction coordinator * identified by <tt>transactionCoordinatorId</tt>. * + * @param transactionCoordinatorId {@link TransactionCoordinatorID} the coordinator id. + * @param managedLedgerFactory {@link ManagedLedgerFactory} the managedLedgerFactory to create managedLedger. + * @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger. + * * @return a future represents the result of the operation. * an instance of {@link TransactionMetadataStore} is returned * if the operation succeeds. */ CompletableFuture<TransactionMetadataStore> openStore( - TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory); + TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java index 614ef1a..8174b3a 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java @@ -19,7 +19,7 @@ package org.apache.pulsar.transaction.coordinator.impl; import java.util.concurrent.CompletableFuture; - +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -32,7 +32,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat @Override public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId, - ManagedLedgerFactory managedLedgerFactory) { + ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig) { return CompletableFuture.completedFuture( new InMemTransactionMetadataStore(transactionCoordinatorId)); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index 761169c..89c84c1 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.Position; @@ -53,7 +54,7 @@ public class MLTransactionLogImpl implements TransactionLog { private final ManagedLedger managedLedger; - private final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-"; + public final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-"; private final ManagedCursor cursor; @@ -64,17 +65,21 @@ public class MLTransactionLogImpl implements TransactionLog { //this is for replay private final PositionImpl lastConfirmedEntry; + private PositionImpl currentLoadPosition; + private final long tcId; private final String topicName; public MLTransactionLogImpl(TransactionCoordinatorID tcID, - ManagedLedgerFactory managedLedgerFactory) throws Exception { + ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig) throws Exception { this.topicName = TRANSACTION_LOG_PREFIX + tcID; this.tcId = tcID.getId(); - this.managedLedger = managedLedgerFactory.open(topicName); + this.managedLedger = managedLedgerFactory.open(topicName, managedLedgerConfig); this.cursor = managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME, CommandSubscribe.InitialPosition.Earliest); + this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition(); this.entryQueue = new SpscArrayQueue<>(2000); this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry(); } @@ -165,9 +170,8 @@ public class MLTransactionLogImpl implements TransactionLog { class TransactionLogReplayer { - private FillEntryQueueCallback fillEntryQueueCallback; - private long currentLoadEntryId; - private TransactionLogReplayCallback transactionLogReplayCallback; + private final FillEntryQueueCallback fillEntryQueueCallback; + private final TransactionLogReplayCallback transactionLogReplayCallback; TransactionLogReplayer(TransactionLogReplayCallback transactionLogReplayCallback) { this.fillEntryQueueCallback = new FillEntryQueueCallback(); @@ -175,16 +179,13 @@ public class MLTransactionLogImpl implements TransactionLog { } public void start() { - if (((PositionImpl) cursor.getMarkDeletedPosition()).compareTo(lastConfirmedEntry) == 0) { - this.transactionLogReplayCallback.replayComplete(); - return; - } - while (currentLoadEntryId < lastConfirmedEntry.getEntryId()) { + + while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) { fillEntryQueueCallback.fillQueue(); Entry entry = entryQueue.poll(); if (entry != null) { ByteBuf buffer = entry.getDataBuffer(); - currentLoadEntryId = entry.getEntryId(); + currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(buffer); TransactionMetadataEntry.Builder transactionMetadataEntryBuilder = TransactionMetadataEntry.newBuilder(); @@ -215,7 +216,7 @@ public class MLTransactionLogImpl implements TransactionLog { class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { - private AtomicLong outstandingReadsRequests = new AtomicLong(0); + private final AtomicLong outstandingReadsRequests = new AtomicLong(0); void fillQueue() { if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index 5e1061d..ef63089 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -19,7 +19,7 @@ package org.apache.pulsar.transaction.coordinator.impl; import java.util.concurrent.CompletableFuture; - +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; @@ -36,13 +36,15 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStoreProvider.class); @Override - public CompletableFuture<TransactionMetadataStore> - openStore(TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory) { + public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId, + ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig) { TransactionMetadataStore transactionMetadataStore; try { transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorId, - new MLTransactionLogImpl(transactionCoordinatorId, managedLedgerFactory)); + new MLTransactionLogImpl(transactionCoordinatorId, + managedLedgerFactory, managedLedgerConfig)); } catch (Exception e) { log.error("MLTransactionMetadataStore init fail", e); return FutureUtil.failedFuture(e); diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 84e68c7..aff0d5a 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.transaction.coordinator; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -48,7 +49,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { factoryConf.setMaxCacheSize(0); ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf); TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); - MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory); + MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + new ManagedLedgerConfig()); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog); int checkReplayRetryCount = 0; @@ -108,7 +110,10 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { factoryConf.setMaxCacheSize(0); ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf); TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); - MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(2); + MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + managedLedgerConfig); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog); int checkReplayRetryCount = 0; @@ -150,7 +155,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { MLTransactionMetadataStore transactionMetadataStoreTest = new MLTransactionMetadataStore(transactionCoordinatorID, - new MLTransactionLogImpl(transactionCoordinatorID, factory)); + new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig())); while (true) { if (checkReplayRetryCount > 6) { @@ -208,7 +213,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase { factoryConf.setMaxCacheSize(0); ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf); TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); - MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory); + MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + new ManagedLedgerConfig()); MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog); int checkReplayRetryCount = 0; diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java index 9a79b37..84f9a47 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java @@ -62,7 +62,7 @@ public class TransactionMetadataStoreProviderTest { @BeforeMethod public void setup() throws Exception { this.tcId = new TransactionCoordinatorID(1L); - this.store = this.provider.openStore(tcId, null).get(); + this.store = this.provider.openStore(tcId, null, null).get(); } @Test
