This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 411bf71  [Transaction] Fix transaction log replay not handle right. 
(#8723)
411bf71 is described below

commit 411bf71006039af661be13c54dda4ce074f97679
Author: congbo <[email protected]>
AuthorDate: Tue Dec 1 12:56:27 2020 +0800

    [Transaction] Fix transaction log replay not handle right. (#8723)
    
    ## Motivation
    1. Transaction log replay don't handle right, because when replay compare 
lastConfirm.entryId, it is not right to replay.
    2. Transaction handle proto is async, but recycle the command will happen 
before send response, we use requestId or others will not right.
    3. We should afferent the managedLedgerConfig when we use MLTransactionLog, 
we can config the TransactionLog manageLedger.
    4. add the TransactionNotFound exception in proto, it will return when 
transaction commit or abort and others transaction operation repeat or error 
handle.
    ## implement
    1. we compare the markerDeletePosition and lastConfirmPosition to replay.
    2. use local variable replace command get method.
    3. afferent the managedLedgerConfig.
---
 conf/broker.conf                                   |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  2 +-
 .../broker/TransactionMetadataStoreService.java    | 27 +++++---
 .../broker/service/BrokerServiceException.java     |  2 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 73 ++++++++++++----------
 .../TransactionMetadataStoreServiceTest.java       | 17 +++--
 .../TransactionCoordinatorClientTest.java          |  2 +-
 .../client/impl/TransactionEndToEndTest.java       |  8 +--
 .../TransactionCoordinatorClientException.java     |  9 +++
 pulsar-client-cpp/include/pulsar/Result.h          |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  3 +
 pulsar-client-cpp/lib/Result.cc                    |  3 +
 .../client/impl/TransactionMetaStoreHandler.java   |  2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 +
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 .../TransactionMetadataStoreProvider.java          |  8 ++-
 .../InMemTransactionMetadataStoreProvider.java     |  4 +-
 .../coordinator/impl/MLTransactionLogImpl.java     | 27 ++++----
 .../impl/MLTransactionMetadataStoreProvider.java   |  9 ++-
 .../MLTransactionMetadataStoreTest.java            | 14 +++--
 .../TransactionMetadataStoreProviderTest.java      |  2 +-
 21 files changed, 142 insertions(+), 77 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f22fb3c..97d4ff0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1175,4 +1175,4 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
 
 # Enable transaction coordinator in broker
 transactionCoordinatorEnabled=false
-transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
+transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f6e3d35..f4a8b75 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1860,7 +1860,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
             doc = "Class name for transaction metadata store provider"
     )
     private String transactionMetadataStoreProviderClassName =
-            
"org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider";
+            
"org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider";
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 3fbf2d9..3531d9d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvide
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import 
org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,15 +112,23 @@ public class TransactionMetadataStoreService {
     }
 
     public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        transactionMetadataStoreProvider.openStore(tcId, 
pulsarService.getManagedLedgerFactory())
-            .whenComplete((store, ex) -> {
-                if (ex != null) {
-                    LOG.error("Add transaction metadata store with id {} 
error", tcId.getId(), ex);
-                } else {
-                    stores.put(tcId, store);
-                    LOG.info("Added new transaction meta store {}", tcId);
-                }
-            });
+        pulsarService.getBrokerService()
+                
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX
 + tcId))
+                .whenComplete((v ,e) -> {
+                    if (e != null) {
+                        LOG.error("Add transaction metadata store with id {} 
error", tcId.getId(), e);
+                    } else {
+                        transactionMetadataStoreProvider.openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v)
+                                .whenComplete((store, ex) -> {
+                                    if (ex != null) {
+                                        LOG.error("Add transaction metadata 
store with id {} error", tcId.getId(), ex);
+                                    } else {
+                                        stores.put(tcId, store);
+                                        LOG.info("Added new transaction meta 
store {}", tcId);
+                                    }
+                                });
+                    }
+        });
     }
 
     public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 4564617..9c6b2d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -219,6 +219,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.NotAllowedError;
         } else if (t instanceof TransactionConflictException) {
             return ServerError.TransactionConflict;
+        } else if (t instanceof 
CoordinatorException.TransactionNotFoundException) {
+            return ServerError.TransactionNotFound;
         } else {
             if (checkCauseIfUnknown) {
                 return getClientErrorCode(t.getCause(), false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cb25e3b..b5bbb84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -34,6 +34,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -1262,18 +1263,21 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     protected void handleAck(CommandAck ack) {
         checkArgument(state == State.Connected);
         CompletableFuture<Consumer> consumerFuture = 
consumers.get(ack.getConsumerId());
+        final long requestId = ack.getRequestId();
+        final boolean hasRequestId = ack.hasRequestId();
+        final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
             consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
-                        if (ack.hasRequestId()) {
+                        if (hasRequestId) {
                             ctx.writeAndFlush(Commands.newAckResponse(
-                                    ack.getRequestId(), null, null, 
ack.getConsumerId()));
+                                    requestId, null, null, consumerId));
                         }
                     }).exceptionally(e -> {
-                        if (ack.hasRequestId()) {
-                            
ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(),
+                        if (hasRequestId) {
+                            
ctx.writeAndFlush(Commands.newAckResponse(requestId,
                                     
BrokerServiceException.getClientErrorCode(e),
-                                    e.getMessage(), ack.getConsumerId()));
+                                    e.getMessage(), consumerId));
                         }
                         return null;
                     });
@@ -1660,45 +1664,47 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
+        final long requestId = command.getRequestId();
+        final TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTcId());
         if (log.isDebugEnabled()) {
-            log.debug("Receive new txn request {} to transaction meta store {} 
from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
+            log.debug("Receive new txn request {} to transaction meta store {} 
from {}.", requestId, tcId, remoteAddress);
         }
-        TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTcId());
         
service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, 
command.getTxnTtlSeconds())
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response {} for new txn request {}", 
tcId.getId(),  command.getRequestId());
+                        log.debug("Send response {} for new txn request {}", 
tcId.getId(), requestId);
                     }
-                    
ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), 
txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, 
txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response error for new txn request 
{}", command.getRequestId(), ex);
+                        log.debug("Send response error for new txn request 
{}", requestId, ex);
                     }
-                    
ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), tcId.getId(), 
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, 
tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                 }
             }));
     }
 
     @Override
     protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn 
command) {
-            TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
-            log.debug("Receive add published partition to txn request {} from 
{} with txnId {}", command.getRequestId(), remoteAddress, txnID);
+            log.debug("Receive add published partition to txn request {} from 
{} with txnId {}", requestId, remoteAddress, txnID);
         }
         
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
 command.getPartitionsList())
             .whenComplete(((v, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response success for add published 
partition to txn request {}",  command.getRequestId());
+                        log.debug("Send response success for add published 
partition to txn request {}",  requestId);
                     }
-                    
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
+                    
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response error for add published 
partition to txn request {}",  command.getRequestId(), ex);
+                        log.debug("Send response error for add published 
partition to txn request {}",  requestId, ex);
                     }
-                    
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), 
txnID.getMostSigBits(),
+                    
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
                             BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage()));
                 }
             }));
@@ -1717,7 +1723,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 }).exceptionally(throwable -> {
                     log.error("Send response error for end txn request.", 
throwable);
-                    
ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), 
txnID.getMostSigBits(),
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, 
txnID.getMostSigBits(),
                             
BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
                     return null; });
     }
@@ -1725,20 +1731,22 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition 
command) {
         final long requestId = command.getRequestId();
+        final String topic = command.getTopic();
+        final List<MessageIdData> messageIdDataList = 
command.getMessageIdList();
         final int txnAction = command.getTxnAction().getNumber();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
 
-        
service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic,
 t) -> {
-            if (!topic.isPresent()) {
+        
service.getTopics().get(TopicName.get(topic).toString()).whenComplete((optionalTopic,
 t) -> {
+            if (!optionalTopic.isPresent()) {
                 ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
-                        command.getRequestId(), ServerError.TopicNotFound,
-                        "Topic " + command.getTopic() + " is not found."));
+                        requestId, ServerError.TopicNotFound,
+                        "Topic " + topic + " is not found."));
                 return;
             }
-            topic.get().endTxn(txnID, txnAction, command.getMessageIdList())
+            optionalTopic.get().endTxn(txnID, txnAction, messageIdDataList)
                 .whenComplete((ignored, throwable) -> {
                     if (throwable != null) {
-                        log.error("Handle endTxnOnPartition {} failed.", 
command.getTopic(), throwable);
+                        log.error("Handle endTxnOnPartition {} failed.", 
topic, throwable);
                         
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                                 requestId, ServerError.UnknownError, 
throwable.getMessage()));
                         return;
@@ -1758,10 +1766,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final String subName = command.getSubscription().getSubscription();
         final int txnAction = command.getTxnAction().getNumber();
 
-        
service.getTopics().get(TopicName.get(command.getSubscription().getTopic()).toString())
+        service.getTopics().get(TopicName.get(topic).toString())
             .thenAccept(optionalTopic -> {
                 if (!optionalTopic.isPresent()) {
-                    log.error("The topic {} is not exist in broker.", 
command.getSubscription().getTopic());
+                    log.error("The topic {} is not exist in broker.", topic);
                     ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                             requestId, txnidLeastBits, txnidMostBits,
                             ServerError.UnknownError,
@@ -1817,10 +1825,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     @Override
     protected void 
handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn command) {
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from 
{} with txnId {}",
-                    command.getRequestId(), remoteAddress, txnID);
+                    requestId, remoteAddress, txnID);
         }
 
         
service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
@@ -1829,17 +1838,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published 
partition to txn request {}",
-                                    command.getRequestId());
+                                    requestId);
                         }
-                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
+                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                         log.info("handle add partition to txn finish.");
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response error for add published 
partition to txn request {}",
-                                    command.getRequestId(), ex);
+                                    requestId, ex);
                         }
-                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
+                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 8b91aea..99fb073 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -25,6 +25,7 @@ import 
org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import 
org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -33,6 +34,7 @@ import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.assertEquals;
 
@@ -58,10 +60,12 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         Assert.assertNotNull(transactionMetadataStoreService);
 
         
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
 
         
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 0);
     }
 
     @Test
@@ -70,7 +74,8 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
         
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
         
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
-        
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 3);
         TxnID txnID0 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
0).get();
         TxnID txnID1 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 
0).get();
         TxnID txnID2 = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 
0).get();
@@ -87,7 +92,8 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     public void testAddProducedPartitionToTxn() throws ExecutionException, 
InterruptedException {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
         
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
         TxnID txnID = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
0).get();
         List<String> partitions = new ArrayList<>();
         partitions.add("ptn-0");
@@ -104,7 +110,8 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     public void testAddAckedPartitionToTxn() throws ExecutionException, 
InterruptedException {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
         
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
         TxnID txnID = 
transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 
0).get();
         List<TransactionSubscription> partitions = new ArrayList<>();
         
partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index c63db84..d37a5dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -94,7 +94,7 @@ public class TransactionCoordinatorClientTest extends 
TransactionMetaStoreTestBa
         try {
             transactionCoordinatorClient.abort(txnID, Collections.emptyList());
             Assert.fail("Should be fail, because the txn is in committing 
state, can't abort now.");
-        } catch 
(TransactionCoordinatorClientException.InvalidTxnStatusException ignore) {
+        } catch (TransactionCoordinatorClientException ignore) {
            // Ok here
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index fc8532d..db3d9fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -54,7 +54,7 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -328,8 +328,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 // recommit one transaction should be failed
                 log.info("expected exception for recommit one transaction.");
                 Assert.assertNotNull(reCommitError);
-                Assert.assertTrue(reCommitError.getCause() instanceof
-                        
TransactionCoordinatorClientException.InvalidTxnStatusException);
+                Assert.assertTrue(reCommitError.getCause() instanceof 
TransactionNotFoundException);
             }
         }
     }
@@ -533,8 +532,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 // recommit one transaction should be failed
                 log.info("expected exception for recommit one transaction.");
                 Assert.assertNotNull(reCommitError);
-                Assert.assertTrue(reCommitError.getCause() instanceof
-                        
TransactionCoordinatorClientException.InvalidTxnStatusException);
+                Assert.assertTrue(reCommitError.getCause() instanceof 
TransactionNotFoundException);
             }
 
             message = consumer.receive(1, TimeUnit.SECONDS);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
index d1ad443..0e1f6c7 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
@@ -71,6 +71,15 @@ public class TransactionCoordinatorClientException extends 
IOException {
     }
 
     /**
+     * Thrown when transaction not found in transaction coordinator.
+     */
+    public static class TransactionNotFoundException extends 
TransactionCoordinatorClientException {
+        public TransactionNotFoundException(String message) {
+            super(message);
+        }
+    }
+
+    /**
      * Thrown when transaction meta store handler not exists.
      */
     public static class MetaStoreHandlerNotExistsException extends 
TransactionCoordinatorClientException {
diff --git a/pulsar-client-cpp/include/pulsar/Result.h 
b/pulsar-client-cpp/include/pulsar/Result.h
index 01a2474..1106aae 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -84,6 +84,7 @@ enum Result
     ResultInvalidTxnStatusError,                     /// Invalid txn status 
error
     ResultNotAllowedError,                           /// Not allowed
     ResultTransactionConflict,                       /// Transaction ack 
conflict
+    ResultTransactionNotFound,                       /// Transaction not found
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 824d54a..d17a9b6 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -123,6 +123,9 @@ static Result getResult(ServerError serverError) {
 
         case TransactionConflict:
             return ResultTransactionConflict;
+
+        case TransactionNotFound:
+            return ResultTransactionNotFound;
     }
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 5f074a4..89dfe8e 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -150,6 +150,9 @@ const char* strResult(Result result) {
 
         case ResultTransactionConflict:
             return "ResultTransactionConflict";
+
+        case ResultTransactionNotFound:
+            return "ResultTransactionNotFound";
     };
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 2b4fa48..64688f6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -379,6 +379,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                 return new 
TransactionCoordinatorClientException.CoordinatorNotFoundException(msg);
             case InvalidTxnStatus:
                 return new 
TransactionCoordinatorClientException.InvalidTxnStatusException(msg);
+            case TransactionNotFound:
+                return new 
TransactionCoordinatorClientException.TransactionNotFoundException(msg);
             default:
                 return new TransactionCoordinatorClientException(msg);
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index b30c405..9e1e9d3 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -84,6 +84,7 @@ public final class PulsarApi {
     InvalidTxnStatus(21, 21),
     NotAllowedError(22, 22),
     TransactionConflict(23, 23),
+    TransactionNotFound(24, 24),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -110,6 +111,7 @@ public final class PulsarApi {
     public static final int InvalidTxnStatus_VALUE = 21;
     public static final int NotAllowedError_VALUE = 22;
     public static final int TransactionConflict_VALUE = 23;
+    public static final int TransactionNotFound_VALUE = 24;
     
     
     public final int getNumber() { return value; }
@@ -140,6 +142,7 @@ public final class PulsarApi {
         case 21: return InvalidTxnStatus;
         case 22: return NotAllowedError;
         case 23: return TransactionConflict;
+        case 24: return TransactionNotFound;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index e722708..5c12ca3 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -205,6 +205,7 @@ enum ServerError {
     NotAllowedError = 22; // Not allowed error
 
     TransactionConflict = 23; // Ack with transaction conflict
+    TransactionNotFound = 24; // Transaction not found
 }
 
 enum AuthMethod {
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index 8a3b6d5..a18912f 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.Beta;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 
 /**
@@ -55,10 +56,15 @@ public interface TransactionMetadataStoreProvider {
      * Open the transaction metadata store for transaction coordinator
      * identified by <tt>transactionCoordinatorId</tt>.
      *
+     * @param transactionCoordinatorId {@link TransactionCoordinatorID} the 
coordinator id.
+     * @param managedLedgerFactory {@link ManagedLedgerFactory} the 
managedLedgerFactory to create managedLedger.
+     * @param managedLedgerConfig {@link ManagedLedgerConfig} the 
managedLedgerConfig to create managedLedger.
+     *
      * @return a future represents the result of the operation.
      *         an instance of {@link TransactionMetadataStore} is returned
      *         if the operation succeeds.
      */
     CompletableFuture<TransactionMetadataStore> openStore(
-        TransactionCoordinatorID transactionCoordinatorId, 
ManagedLedgerFactory managedLedgerFactory);
+            TransactionCoordinatorID transactionCoordinatorId, 
ManagedLedgerFactory managedLedgerFactory,
+            ManagedLedgerConfig managedLedgerConfig);
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index f237d6b..8174b3a 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -31,7 +32,8 @@ public class InMemTransactionMetadataStoreProvider implements 
TransactionMetadat
 
     @Override
     public CompletableFuture<TransactionMetadataStore> 
openStore(TransactionCoordinatorID transactionCoordinatorId,
-         ManagedLedgerFactory managedLedgerFactory) {
+                                                                 
ManagedLedgerFactory managedLedgerFactory,
+                                                                 
ManagedLedgerConfig managedLedgerConfig) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 3036f63..e60563a 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
@@ -51,7 +52,7 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final ManagedLedger managedLedger;
 
-    private final static String TRANSACTION_LOG_PREFIX = 
NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
+    public final static String TRANSACTION_LOG_PREFIX = 
NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
 
     private final ManagedCursor cursor;
 
@@ -62,17 +63,21 @@ public class MLTransactionLogImpl implements TransactionLog 
{
     //this is for replay
     private final PositionImpl lastConfirmedEntry;
 
+    private PositionImpl currentLoadPosition;
+
     private final long tcId;
 
     private final String topicName;
 
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
-                                ManagedLedgerFactory managedLedgerFactory) 
throws Exception {
+                                ManagedLedgerFactory managedLedgerFactory,
+                                ManagedLedgerConfig managedLedgerConfig) 
throws Exception {
         this.topicName = TRANSACTION_LOG_PREFIX + tcID;
         this.tcId = tcID.getId();
-        this.managedLedger = managedLedgerFactory.open(topicName);
+        this.managedLedger = managedLedgerFactory.open(topicName, 
managedLedgerConfig);
         this.cursor =  managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
                 CommandSubscribe.InitialPosition.Earliest);
+        this.currentLoadPosition = (PositionImpl) 
this.cursor.getMarkDeletedPosition();
         this.entryQueue = new SpscArrayQueue<>(2000);
         this.lastConfirmedEntry = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
     }
@@ -163,9 +168,8 @@ public class MLTransactionLogImpl implements TransactionLog 
{
 
     class TransactionLogReplayer {
 
-        private FillEntryQueueCallback fillEntryQueueCallback;
-        private long currentLoadEntryId;
-        private TransactionLogReplayCallback transactionLogReplayCallback;
+        private final FillEntryQueueCallback fillEntryQueueCallback;
+        private final TransactionLogReplayCallback 
transactionLogReplayCallback;
 
         TransactionLogReplayer(TransactionLogReplayCallback 
transactionLogReplayCallback) {
             this.fillEntryQueueCallback = new FillEntryQueueCallback();
@@ -173,16 +177,13 @@ public class MLTransactionLogImpl implements 
TransactionLog {
         }
 
         public void start() {
-            if (((PositionImpl) 
cursor.getMarkDeletedPosition()).compareTo(lastConfirmedEntry) == 0) {
-                this.transactionLogReplayCallback.replayComplete();
-                return;
-            }
-            while (currentLoadEntryId < lastConfirmedEntry.getEntryId()) {
+
+            while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
                 fillEntryQueueCallback.fillQueue();
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     ByteBuf buffer = entry.getDataBuffer();
-                    currentLoadEntryId = entry.getEntryId();
+                    currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                     ByteBufCodedInputStream stream = 
ByteBufCodedInputStream.get(buffer);
                     TransactionMetadataEntry.Builder 
transactionMetadataEntryBuilder =
                             TransactionMetadataEntry.newBuilder();
@@ -213,7 +214,7 @@ public class MLTransactionLogImpl implements TransactionLog 
{
 
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback 
{
 
-        private AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
 
         void fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 0f6c1c8..ef63089 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -35,13 +36,15 @@ public class MLTransactionMetadataStoreProvider implements 
TransactionMetadataSt
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionMetadataStoreProvider.class);
 
     @Override
-    public CompletableFuture<TransactionMetadataStore>
-    openStore(TransactionCoordinatorID transactionCoordinatorId, 
ManagedLedgerFactory managedLedgerFactory) {
+    public CompletableFuture<TransactionMetadataStore> 
openStore(TransactionCoordinatorID transactionCoordinatorId,
+                                                                 
ManagedLedgerFactory managedLedgerFactory,
+                                                                 
ManagedLedgerConfig managedLedgerConfig) {
         TransactionMetadataStore transactionMetadataStore;
         try {
             transactionMetadataStore =
                     new MLTransactionMetadataStore(transactionCoordinatorId,
-                            new MLTransactionLogImpl(transactionCoordinatorId, 
managedLedgerFactory));
+                            new MLTransactionLogImpl(transactionCoordinatorId,
+                                    managedLedgerFactory, 
managedLedgerConfig));
         } catch (Exception e) {
             log.error("MLTransactionMetadataStore init fail", e);
             return FutureUtil.failedFuture(e);
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 84e68c7..aff0d5a 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -48,7 +49,8 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, 
factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog);
         int checkReplayRetryCount = 0;
@@ -108,7 +110,10 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, 
factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
+                managedLedgerConfig);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog);
         int checkReplayRetryCount = 0;
@@ -150,7 +155,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new 
MLTransactionMetadataStore(transactionCoordinatorID,
 
-                                new 
MLTransactionLogImpl(transactionCoordinatorID, factory));
+                                new 
MLTransactionLogImpl(transactionCoordinatorID, factory, new 
ManagedLedgerConfig()));
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -208,7 +213,8 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, 
factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog);
         int checkReplayRetryCount = 0;
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 9a79b37..84f9a47 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,7 @@ public class TransactionMetadataStoreProviderTest {
     @BeforeMethod
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
-        this.store = this.provider.openStore(tcId, null).get();
+        this.store = this.provider.openStore(tcId, null, null).get();
     }
 
     @Test

Reply via email to