This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0994254 [Transaction] Fix transaction sequenceId generate error.
(#13209)
0994254 is described below
commit 0994254c24be4aec8fef810bcdbf62e06453fe53
Author: congbo <[email protected]>
AuthorDate: Fri Dec 10 16:41:49 2021 +0800
[Transaction] Fix transaction sequenceId generate error. (#13209)
---
.../pulsar/broker/transaction/TransactionTest.java | 11 ++--
.../coordinator/impl/MLTransactionLogImpl.java | 41 ------------
.../impl/MLTransactionLogInterceptor.java | 63 +++++++++++++++----
.../impl/MLTransactionMetadataStore.java | 24 ++++----
.../impl/MLTransactionMetadataStoreProvider.java | 6 +-
.../MLTransactionMetadataStoreTest.java | 72 +++++++++++++++-------
6 files changed, 125 insertions(+), 92 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e2f81bb..def6d71 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -53,7 +53,6 @@ import
org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.collections.map.SingletonMap;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -65,7 +64,6 @@ import
org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
-import
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -572,7 +570,8 @@ public class TransactionTest extends TransactionTestBase {
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
-
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog =
new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
persistentTopic.getManagedLedger().getConfig());
@@ -591,7 +590,8 @@ public class TransactionTest extends TransactionTestBase {
doNothing().when(timeoutTracker).start();
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
- mlTransactionLog, timeoutTracker,
transactionRecoverTracker);
+ mlTransactionLog, timeoutTracker,
transactionRecoverTracker,
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state,
"Ready"));
@@ -604,7 +604,8 @@ public class TransactionTest extends TransactionTestBase {
MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
- mlTransactionLog, timeoutTracker,
transactionRecoverTracker);
+ mlTransactionLog, timeoutTracker,
transactionRecoverTracker,
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state,
"Ready"));
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 2d11d98..e154bb8 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -68,15 +68,11 @@ public class MLTransactionLogImpl implements TransactionLog
{
private final TopicName topicName;
- private final MLTransactionLogInterceptor mlTransactionLogInterceptor;
-
public MLTransactionLogImpl(TransactionCoordinatorID tcID,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig) {
this.topicName = getMLTransactionLogName(tcID);
this.tcId = tcID.getId();
- this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
this.managedLedgerFactory = managedLedgerFactory;
this.managedLedgerConfig = managedLedgerConfig;
this.entryQueue = new SpscArrayQueue<>(2000);
@@ -161,7 +157,6 @@ public class MLTransactionLogImpl implements TransactionLog
{
@Override
public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
buf.release();
-
mlTransactionLogInterceptor.setMaxLocalTxnId(transactionMetadataEntry.getMaxLocalTxnId());
completableFuture.complete(position);
}
@@ -242,42 +237,6 @@ public class MLTransactionLogImpl implements
TransactionLog {
}
}
- public CompletableFuture<Long> getMaxLocalTxnId() {
-
- CompletableFuture<Long> completableFuture = new CompletableFuture<>();
- PositionImpl position = (PositionImpl)
managedLedger.getLastConfirmedEntry();
-
- if (position != null && position.getEntryId() != -1
- && ((ManagedLedgerImpl)
managedLedger).ledgerExists(position.getLedgerId())) {
- ((ManagedLedgerImpl) this.managedLedger).asyncReadEntry(position,
new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- TransactionMetadataEntry lastConfirmEntry = new
TransactionMetadataEntry();
- ByteBuf buffer = entry.getDataBuffer();
- lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
-
completableFuture.complete(lastConfirmEntry.getMaxLocalTxnId());
- }
-
- @Override
- public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
- log.error("[{}] MLTransactionLog recover MaxLocalTxnId
fail!", topicName, exception);
- completableFuture.completeExceptionally(exception);
- }
- }, null);
- } else if (managedLedger.getProperties()
- .get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID) != null) {
-
completableFuture.complete(Long.parseLong(managedLedger.getProperties()
- .get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID)));
- } else {
- log.error("[{}] MLTransactionLog recover MaxLocalTxnId fail! "
- + "not found MaxLocalTxnId in managedLedger and
properties", topicName);
- completableFuture.completeExceptionally(new
ManagedLedgerException(topicName
- + "MLTransactionLog recover MaxLocalTxnId fail! "
- + "not found MaxLocalTxnId in managedLedger and
properties"));
- }
- return completableFuture;
- }
-
class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback
{
private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
index e97b104..68add4a 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
@@ -18,14 +18,18 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Store max sequenceID in ManagedLedger properties, in order to recover
transaction log.
@@ -33,31 +37,68 @@ import java.util.concurrent.CompletableFuture;
public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
private static final Logger log =
LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+ private static final long TC_ID_NOT_USED = -1L;
public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
- private volatile long maxLocalTxnId = -1;
+ @Getter
+ private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
@Override
public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
- return null;
+ return op;
}
+ // When all of ledger have been deleted, we will generate sequenceId from
managedLedger properties
@Override
public void onManagedLedgerPropertiesInitialize(Map<String, String>
propertiesMap) {
+ if (propertiesMap == null || propertiesMap.size() == 0) {
+ return;
+ }
+ if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+
sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+ }
}
+ // When we don't roll over ledger, we can init sequenceId from the
getLastAddConfirmed transaction metadata entry
@Override
- public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String
name, LedgerHandle ledgerHandle) {
- return CompletableFuture.completedFuture(null);
+ public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String
name, LedgerHandle lh) {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ if (lh.getLastAddConfirmed() >= 0) {
+ lh.readAsync(lh.getLastAddConfirmed(),
lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Read last entry error.", name, ex);
+ promise.completeExceptionally(ex);
+ } else {
+ if (entries != null) {
+ try {
+ LedgerEntry ledgerEntry =
entries.getEntry(lh.getLastAddConfirmed());
+ if (ledgerEntry != null) {
+ TransactionMetadataEntry lastConfirmEntry =
new TransactionMetadataEntry();
+ ByteBuf buffer = ledgerEntry.getEntryBuffer();
+ lastConfirmEntry.parseFrom(buffer,
buffer.readableBytes());
+
this.sequenceId.set(lastConfirmEntry.getMaxLocalTxnId());
+ }
+ entries.close();
+ promise.complete(null);
+ } catch (Exception e) {
+ log.error("[{}] Failed to recover the tc
sequenceId from the last add confirmed entry.",
+ name, e);
+ promise.completeExceptionally(e);
+ }
+ } else {
+ promise.complete(null);
+ }
+ }
+ });
+ } else {
+ promise.complete(null);
+ }
+ return promise;
}
+ // roll over ledger will update sequenceId to managedLedger properties
@Override
public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
- propertiesMap.put(MAX_LOCAL_TXN_ID, maxLocalTxnId + "");
- }
-
- protected void setMaxLocalTxnId(long maxLocalTxnId) {
- this.maxLocalTxnId = maxLocalTxnId;
+ propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
}
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 05faaad..6ef4f17 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -60,9 +60,8 @@ public class MLTransactionMetadataStore
private static final Logger log =
LoggerFactory.getLogger(MLTransactionMetadataStore.class);
private final TransactionCoordinatorID tcID;
- private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+ private final AtomicLong sequenceId;
private final MLTransactionLogImpl transactionLog;
- private static final long TC_ID_NOT_USED = -1L;
private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>
txnMetaMap = new ConcurrentSkipListMap<>();
private final TransactionTimeoutTracker timeoutTracker;
private final TransactionMetadataStoreStats transactionMetadataStoreStats;
@@ -75,8 +74,10 @@ public class MLTransactionMetadataStore
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
TransactionTimeoutTracker timeoutTracker,
- TransactionRecoverTracker
recoverTracker) {
+ TransactionRecoverTracker recoverTracker,
+ AtomicLong sequenceId) {
super(State.None);
+ this.sequenceId = sequenceId;
this.tcID = tcID;
this.transactionLog = mlTransactionLog;
this.timeoutTracker = timeoutTracker;
@@ -96,16 +97,13 @@ public class MLTransactionMetadataStore
@Override
public void replayComplete() {
- mlTransactionLog.getMaxLocalTxnId().thenAccept(id -> {
- recoverTracker.appendOpenTransactionToTimeoutTracker();
- sequenceId.set(id);
- if (!changeToReadyState()) {
- log.error("Managed ledger transaction metadata store
change state error when replay complete");
- } else {
-
recoverTracker.handleCommittingAndAbortingTransaction();
- timeoutTracker.start();
- }
- });
+ recoverTracker.appendOpenTransactionToTimeoutTracker();
+ if (!changeToReadyState()) {
+ log.error("Managed ledger transaction metadata store
change state error when replay complete");
+ } else {
+ recoverTracker.handleCommittingAndAbortingTransaction();
+ timeoutTracker.start();
+ }
}
@Override
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 8731e07..8018a21 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -42,10 +42,14 @@ public class MLTransactionMetadataStoreProvider implements
TransactionMetadataSt
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+ managedLedgerConfig.setManagedLedgerInterceptor(new
MLTransactionLogInterceptor());
MLTransactionLogImpl txnLog = new
MLTransactionLogImpl(transactionCoordinatorId,
managedLedgerFactory, managedLedgerConfig);
+ // MLTransactionLogInterceptor will init sequenceId and update the
sequenceId to managedLedger properties.
return txnLog.initialize().thenApply(__ ->
- new MLTransactionMetadataStore(transactionCoordinatorId,
txnLog, timeoutTracker, recoverTracker));
+ new MLTransactionMetadataStore(transactionCoordinatorId,
txnLog, timeoutTracker,
+ recoverTracker,
mlTransactionLogInterceptor.getSequenceId()));
}
}
\ No newline at end of file
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index e9b3e0c..03aa1be 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
@@ -41,6 +42,7 @@ import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -65,12 +67,16 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new
TransactionCoordinatorID(1);
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig());
+ managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
int checkReplayRetryCount = 0;
while (true) {
checkReplayRetryCount++;
@@ -122,13 +128,13 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
}
}
- @DataProvider(name = "isUseManagedLedger")
+ @DataProvider(name = "isUseManagedLedgerProperties")
public Object[][] versions() {
return new Object[][] { { true }, { false } };
}
- @Test(dataProvider = "isUseManagedLedger")
- public void testRecoverSequenceId(boolean isUseManagedLedger) throws
Exception {
+ @Test(dataProvider = "isUseManagedLedgerProperties")
+ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties)
throws Exception {
ManagedLedgerFactoryConfig factoryConf = new
ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);
@@ -136,18 +142,21 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new
TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
managedLedgerConfig.setMaxEntriesPerLedger(3);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().until(transactionMetadataStore::checkIfReady);
TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING,
TxnStatus.OPEN, false).get();
- if (isUseManagedLedger) {
+ if (isUseManagedLedgerProperties) {
transactionMetadataStore.updateTxnStatus(txnID,
TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
}
assertEquals(txnID.getLeastSigBits(), 0);
@@ -155,16 +164,20 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
field.setAccessible(true);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
field.get(mlTransactionLog);
Position position = managedLedger.getLastConfirmedEntry();
-
- if (isUseManagedLedger) {
+ if (isUseManagedLedgerProperties) {
Awaitility.await().until(() -> {
managedLedger.rollCurrentLedgerIfFull();
return !managedLedger.ledgerExists(position.getLedgerId());
});
}
+ mlTransactionLog.closeAsync().get();
+ mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID,
factory,
+ managedLedgerConfig);
+ mlTransactionLog.initialize().join();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().until(transactionMetadataStore::checkIfReady);
txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -181,12 +194,15 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
TransactionCoordinatorID transactionCoordinatorID = new
TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -224,11 +240,12 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
transactionMetadataStore.closeAsync();
MLTransactionLogImpl txnLog2 = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig());
+ managedLedgerConfig);
txnLog2.initialize().join();
MLTransactionMetadataStore transactionMetadataStoreTest =
new
MLTransactionMetadataStore(transactionCoordinatorID,
- txnLog2, new TransactionTimeoutTrackerImpl(),
new TransactionRecoverTrackerImpl());
+ txnLog2, new TransactionTimeoutTrackerImpl(),
new TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
while (true) {
if (checkReplayRetryCount > 6) {
@@ -288,12 +305,16 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new
TransactionCoordinatorID(1);
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig());
+ managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -351,12 +372,16 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new
TransactionCoordinatorID(1);
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig());
+ managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -370,11 +395,12 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED,
TxnStatus.ABORTING, false).get();
mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID,
factory,
- new ManagedLedgerConfig());
+ managedLedgerConfig);
mlTransactionLog.initialize().join();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().until(transactionMetadataStore::checkIfReady);
}
@@ -387,12 +413,16 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new
ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new
TransactionCoordinatorID(1);
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ MLTransactionLogInterceptor mlTransactionLogInterceptor = new
MLTransactionLogInterceptor();
+
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig());
+ managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl(),
+ mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().until(transactionMetadataStore::checkIfReady);
transactionMetadataStore.newTransaction(5000).get();