This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0994254  [Transaction] Fix transaction sequenceId generate error. 
(#13209)
0994254 is described below

commit 0994254c24be4aec8fef810bcdbf62e06453fe53
Author: congbo <[email protected]>
AuthorDate: Fri Dec 10 16:41:49 2021 +0800

    [Transaction] Fix transaction sequenceId generate error. (#13209)
---
 .../pulsar/broker/transaction/TransactionTest.java | 11 ++--
 .../coordinator/impl/MLTransactionLogImpl.java     | 41 ------------
 .../impl/MLTransactionLogInterceptor.java          | 63 +++++++++++++++----
 .../impl/MLTransactionMetadataStore.java           | 24 ++++----
 .../impl/MLTransactionMetadataStoreProvider.java   |  6 +-
 .../MLTransactionMetadataStoreTest.java            | 72 +++++++++++++++-------
 6 files changed, 125 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e2f81bb..def6d71 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -53,7 +53,6 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.collections.map.SingletonMap;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -65,7 +64,6 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
-import 
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -572,7 +570,8 @@ public class TransactionTest extends TransactionTestBase {
                     null);
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
-
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog =
                 new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
                         persistentTopic.getManagedLedger().getConfig());
@@ -591,7 +590,8 @@ public class TransactionTest extends TransactionTestBase {
         doNothing().when(timeoutTracker).start();
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, 
transactionRecoverTracker);
+                        mlTransactionLog, timeoutTracker, 
transactionRecoverTracker,
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, 
"Ready"));
@@ -604,7 +604,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, 
transactionRecoverTracker);
+                        mlTransactionLog, timeoutTracker, 
transactionRecoverTracker,
+                        mlTransactionLogInterceptor.getSequenceId());
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, 
"Ready"));
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 2d11d98..e154bb8 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -68,15 +68,11 @@ public class MLTransactionLogImpl implements TransactionLog 
{
 
     private final TopicName topicName;
 
-    private final MLTransactionLogInterceptor mlTransactionLogInterceptor;
-
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
                                 ManagedLedgerConfig managedLedgerConfig) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
-        this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
         this.entryQueue = new SpscArrayQueue<>(2000);
@@ -161,7 +157,6 @@ public class MLTransactionLogImpl implements TransactionLog 
{
             @Override
             public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
                 buf.release();
-                
mlTransactionLogInterceptor.setMaxLocalTxnId(transactionMetadataEntry.getMaxLocalTxnId());
                 completableFuture.complete(position);
             }
 
@@ -242,42 +237,6 @@ public class MLTransactionLogImpl implements 
TransactionLog {
         }
     }
 
-    public CompletableFuture<Long> getMaxLocalTxnId() {
-
-        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
-        PositionImpl position = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
-
-        if (position != null && position.getEntryId() != -1
-                && ((ManagedLedgerImpl) 
managedLedger).ledgerExists(position.getLedgerId())) {
-            ((ManagedLedgerImpl) this.managedLedger).asyncReadEntry(position, 
new AsyncCallbacks.ReadEntryCallback() {
-                @Override
-                public void readEntryComplete(Entry entry, Object ctx) {
-                    TransactionMetadataEntry lastConfirmEntry = new 
TransactionMetadataEntry();
-                    ByteBuf buffer = entry.getDataBuffer();
-                    lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
-                    
completableFuture.complete(lastConfirmEntry.getMaxLocalTxnId());
-                }
-
-                @Override
-                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
-                    log.error("[{}] MLTransactionLog recover MaxLocalTxnId 
fail!", topicName, exception);
-                    completableFuture.completeExceptionally(exception);
-                }
-            }, null);
-        } else if (managedLedger.getProperties()
-                .get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID) != null) {
-            
completableFuture.complete(Long.parseLong(managedLedger.getProperties()
-                    .get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID)));
-        } else {
-            log.error("[{}] MLTransactionLog recover MaxLocalTxnId fail! "
-                    + "not found MaxLocalTxnId in managedLedger and 
properties", topicName);
-            completableFuture.completeExceptionally(new 
ManagedLedgerException(topicName
-                    + "MLTransactionLog recover MaxLocalTxnId fail! "
-                    + "not found MaxLocalTxnId in managedLedger and 
properties"));
-        }
-        return completableFuture;
-    }
-
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback 
{
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
index e97b104..68add4a 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
@@ -18,14 +18,18 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import 
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover 
transaction log.
@@ -33,31 +37,68 @@ import java.util.concurrent.CompletableFuture;
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
     public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
-        return null;
+        return op;
     }
 
+    // When all of ledger have been deleted, we will generate sequenceId from 
managedLedger properties
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> 
propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
 
+        if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+            
sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+        }
     }
 
+    // When we don't roll over ledger, we can init sequenceId from the 
getLastAddConfirmed transaction metadata entry
     @Override
-    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle ledgerHandle) {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = 
entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                TransactionMetadataEntry lastConfirmEntry = 
new TransactionMetadataEntry();
+                                ByteBuf buffer = ledgerEntry.getEntryBuffer();
+                                lastConfirmEntry.parseFrom(buffer, 
buffer.readableBytes());
+                                
this.sequenceId.set(lastConfirmEntry.getMaxLocalTxnId());
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the tc 
sequenceId from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
+                        }
+                    } else {
+                        promise.complete(null);
+                    }
+                }
+            });
+        } else {
+            promise.complete(null);
+        }
+        return promise;
     }
 
+    // roll over ledger will update sequenceId to managedLedger properties
     @Override
     public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
-        propertiesMap.put(MAX_LOCAL_TXN_ID, maxLocalTxnId + "");
-    }
-
-    protected void setMaxLocalTxnId(long maxLocalTxnId) {
-        this.maxLocalTxnId = maxLocalTxnId;
+        propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
     }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 05faaad..6ef4f17 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -60,9 +60,8 @@ public class MLTransactionMetadataStore
     private static final Logger log = 
LoggerFactory.getLogger(MLTransactionMetadataStore.class);
 
     private final TransactionCoordinatorID tcID;
-    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+    private final AtomicLong sequenceId;
     private final MLTransactionLogImpl transactionLog;
-    private static final long TC_ID_NOT_USED = -1L;
     private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> 
txnMetaMap = new ConcurrentSkipListMap<>();
     private final TransactionTimeoutTracker timeoutTracker;
     private final TransactionMetadataStoreStats transactionMetadataStoreStats;
@@ -75,8 +74,10 @@ public class MLTransactionMetadataStore
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
-                                      TransactionRecoverTracker 
recoverTracker) {
+                                      TransactionRecoverTracker recoverTracker,
+                                      AtomicLong sequenceId) {
         super(State.None);
+        this.sequenceId = sequenceId;
         this.tcID = tcID;
         this.transactionLog = mlTransactionLog;
         this.timeoutTracker = timeoutTracker;
@@ -96,16 +97,13 @@ public class MLTransactionMetadataStore
 
             @Override
             public void replayComplete() {
-                mlTransactionLog.getMaxLocalTxnId().thenAccept(id -> {
-                    recoverTracker.appendOpenTransactionToTimeoutTracker();
-                    sequenceId.set(id);
-                    if (!changeToReadyState()) {
-                        log.error("Managed ledger transaction metadata store 
change state error when replay complete");
-                    } else {
-                        
recoverTracker.handleCommittingAndAbortingTransaction();
-                        timeoutTracker.start();
-                    }
-                });
+                recoverTracker.appendOpenTransactionToTimeoutTracker();
+                if (!changeToReadyState()) {
+                    log.error("Managed ledger transaction metadata store 
change state error when replay complete");
+                } else {
+                    recoverTracker.handleCommittingAndAbortingTransaction();
+                    timeoutTracker.start();
+                }
             }
 
             @Override
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 8731e07..8018a21 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -42,10 +42,14 @@ public class MLTransactionMetadataStoreProvider implements 
TransactionMetadataSt
                                                                  
ManagedLedgerConfig managedLedgerConfig,
                                                                  
TransactionTimeoutTracker timeoutTracker,
                                                                  
TransactionRecoverTracker recoverTracker) {
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(new 
MLTransactionLogInterceptor());
         MLTransactionLogImpl txnLog = new 
MLTransactionLogImpl(transactionCoordinatorId,
                 managedLedgerFactory, managedLedgerConfig);
 
+        // MLTransactionLogInterceptor will init sequenceId and update the 
sequenceId to managedLedger properties.
         return txnLog.initialize().thenApply(__ ->
-                new MLTransactionMetadataStore(transactionCoordinatorId, 
txnLog, timeoutTracker, recoverTracker));
+                new MLTransactionMetadataStore(transactionCoordinatorId, 
txnLog, timeoutTracker,
+                        recoverTracker, 
mlTransactionLogInterceptor.getSequenceId()));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index e9b3e0c..03aa1be 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
@@ -41,6 +42,7 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -65,12 +67,16 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -122,13 +128,13 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         }
     }
 
-    @DataProvider(name = "isUseManagedLedger")
+    @DataProvider(name = "isUseManagedLedgerProperties")
     public Object[][] versions() {
         return new Object[][] { { true }, { false } };
     }
 
-    @Test(dataProvider = "isUseManagedLedger")
-    public void testRecoverSequenceId(boolean isUseManagedLedger) throws 
Exception {
+    @Test(dataProvider = "isUseManagedLedgerProperties")
+    public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) 
throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new 
ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -136,18 +142,21 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         managedLedgerConfig.setMaxEntriesPerLedger(3);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
         transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, 
TxnStatus.OPEN, false).get();
-        if (isUseManagedLedger) {
+        if (isUseManagedLedgerProperties) {
             transactionMetadataStore.updateTxnStatus(txnID, 
TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
         }
         assertEquals(txnID.getLeastSigBits(), 0);
@@ -155,16 +164,20 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         field.setAccessible(true);
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
field.get(mlTransactionLog);
         Position position = managedLedger.getLastConfirmedEntry();
-
-        if (isUseManagedLedger) {
+        if (isUseManagedLedgerProperties) {
             Awaitility.await().until(() -> {
                 managedLedger.rollCurrentLedgerIfFull();
                 return !managedLedger.ledgerExists(position.getLedgerId());
             });
         }
+        mlTransactionLog.closeAsync().get();
+        mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, 
factory,
+                managedLedgerConfig);
+        mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -181,12 +194,15 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -224,11 +240,12 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 transactionMetadataStore.closeAsync();
 
                 MLTransactionLogImpl txnLog2 = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                        new ManagedLedgerConfig());
+                        managedLedgerConfig);
                 txnLog2.initialize().join();
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new 
MLTransactionMetadataStore(transactionCoordinatorID,
-                                txnLog2, new TransactionTimeoutTrackerImpl(), 
new TransactionRecoverTrackerImpl());
+                                txnLog2, new TransactionTimeoutTrackerImpl(), 
new TransactionRecoverTrackerImpl(),
+                                mlTransactionLogInterceptor.getSequenceId());
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -288,12 +305,16 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -351,12 +372,16 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -370,11 +395,12 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, 
TxnStatus.ABORTING, false).get();
 
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, 
factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
@@ -387,12 +413,16 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new 
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new 
TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new 
MLTransactionLogInterceptor();
+        
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         transactionMetadataStore.newTransaction(5000).get();

Reply via email to