This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36c3bc3 [Transaction] Tc recover handle transaction in committing and
aborting status . (#10179)
36c3bc3 is described below
commit 36c3bc3178a2e7b5d05b8eb0148242379896c63a
Author: congbo <[email protected]>
AuthorDate: Mon Apr 19 08:32:48 2021 +0800
[Transaction] Tc recover handle transaction in committing and aborting
status . (#10179)
## Motivation
Now recover don't handle transaction in committing or aborting status, it
only add to ```transactionTimeOutTracker```.
## implement
Add ```TransactionRecoverTracker``` to handle different status transaction.
```
/**
* Handle recover transaction update status.
* @param sequenceId {@link long} the sequenceId of this transaction.
* @param txnStatus {@link long} the txn status of this operation.
*/
void updateTransactionStatus(long sequenceId, TxnStatus txnStatus)
throws CoordinatorException.InvalidTxnStatusException;
/**
* Handle recover transaction in open status.
* @param sequenceId {@link Long} the sequenceId of this transaction.
* @param timeout {@link long} the timeout time of this transaction.
*/
void handleOpenStatusTransaction(long sequenceId, long timeout);
/**
* Handle the transaction in open status append to transaction timeout
tracker.
*/
void appendOpenTransactionToTimeoutTracker();
/**
* Handle the transaction in committing and aborting.
*/
void handleCommittingAndAbortingTransaction();
```
### Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
---
.../broker/TransactionMetadataStoreService.java | 9 +-
.../recover/TransactionRecoverTrackerImpl.java | 131 +++++++++++++++++++++
.../broker/transaction/recover/package-info.java | 22 ++++
.../recover/TransactionRecoverTrackerTest.java | 117 ++++++++++++++++++
.../TransactionMetadataStoreProvider.java | 4 +-
.../coordinator/TransactionRecoverTracker.java | 52 ++++++++
.../InMemTransactionMetadataStoreProvider.java | 5 +-
.../impl/MLTransactionMetadataStore.java | 29 +++--
.../impl/MLTransactionMetadataStoreProvider.java | 6 +-
.../MLTransactionMetadataStoreTest.java | 36 +++++-
.../TransactionMetadataStoreProviderTest.java | 3 +-
11 files changed, 390 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 8b1d14e..d1a39c4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import
org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
+import
org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
@@ -47,7 +48,9 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import
org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
@@ -137,8 +140,12 @@ public class TransactionMetadataStoreService {
if (e != null) {
LOG.error("Add transaction metadata store with id {}
error", tcId.getId(), e);
} else {
+ TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
+ TransactionRecoverTracker recoverTracker =
+ new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+ timeoutTracker, tcId.getId());
transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
- timeoutTrackerFactory.newTracker(tcId))
+ timeoutTracker, recoverTracker)
.whenComplete((store, ex) -> {
if (ex != null) {
LOG.error("Add transaction metadata
store with id {} error", tcId.getId(), ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
new file mode 100644
index 0000000..dc10162
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+
+/**
+ * The transaction recover tracker implementation {@link
TransactionRecoverTracker}.
+ */
+@Slf4j
+public class TransactionRecoverTrackerImpl implements
TransactionRecoverTracker {
+
+ private final long tcId;
+ private final TransactionMetadataStoreService
transactionMetadataStoreService;
+ private final TransactionTimeoutTracker timeoutTracker;
+
+ /**
+ * This is for recover open status transaction. The key is this
transaction's sequenceId, the value is this
+ * transaction timeout time.
+ * <p>
+ * When transaction update status to committing or aborting, it will
be remove form this.
+ * <p>
+ * When transactionMetadataStore recover complete, the transaction
don't update status, it will send all
+ * transaction to transactionTimeoutTracker.
+ *
+ */
+ private final Map<Long, Long> openTransactions;
+
+ /**
+ * Update transaction to committing status.
+ * <p>
+ * When transaction update status to committing, it will be add in.
+ * <p>
+ * When transaction update status to committed status, the transaction
will remove from it.
+ * <p>
+ * When transactionMetadataStore recover complete, all transaction in
this will endTransaction by commit action.
+ */
+ private final Set<Long> committingTransactions;
+
+ /**
+ * Update transaction to aborting status.
+ * <p>
+ * When transaction update status to aborting, it will be add in.
+ * <p>
+ * When transaction update status to aborted status, the transaction
will remove from it.
+ * <p>
+ * When transactionMetadataStore recover complete, all transaction in
this will endTransaction by abort action.
+ */
+ private final Set<Long> abortingTransactions;
+
+ public TransactionRecoverTrackerImpl(TransactionMetadataStoreService
transactionMetadataStoreService,
+ TransactionTimeoutTracker timeoutTracker,
long tcId) {
+ this.tcId = tcId;
+ this.transactionMetadataStoreService = transactionMetadataStoreService;
+ this.openTransactions = new HashMap<>();
+ this.committingTransactions = new HashSet<>();
+ this.abortingTransactions = new HashSet<>();
+ this.timeoutTracker = timeoutTracker;
+ }
+
+ @Override
+ public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus)
throws InvalidTxnStatusException {
+ switch (txnStatus) {
+ case COMMITTING:
+ openTransactions.remove(sequenceId);
+ committingTransactions.add(sequenceId);
+ break;
+ case ABORTING:
+ openTransactions.remove(sequenceId);
+ abortingTransactions.add(sequenceId);
+ break;
+ case ABORTED:
+ abortingTransactions.remove(sequenceId);
+ break;
+ case COMMITTED:
+ committingTransactions.remove(sequenceId);
+ break;
+ default:
+ throw new InvalidTxnStatusException("Transaction recover
tracker`"
+ + new TxnID(tcId, sequenceId) + "` load replay
metadata operation "
+ + "from transaction log with unknown operation");
+ }
+ }
+
+ @Override
+ public void handleOpenStatusTransaction(long sequenceId, long timeout) {
+ openTransactions.put(sequenceId, timeout);
+ }
+
+ @Override
+ public void appendOpenTransactionToTimeoutTracker() {
+ openTransactions.forEach(timeoutTracker::replayAddTransaction);
+ }
+
+ @Override
+ public void handleCommittingAndAbortingTransaction() {
+ committingTransactions.forEach(k ->
+ transactionMetadataStoreService.endTransaction(new TxnID(tcId,
k), TxnAction.COMMIT_VALUE));
+
+ abortingTransactions.forEach(k ->
+ transactionMetadataStoreService.endTransaction(new TxnID(tcId,
k), TxnAction.ABORT_VALUE));
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
new file mode 100644
index 0000000..9b99bb6
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of a transaction recover tracker.
+ */
+package org.apache.pulsar.broker.transaction.recover;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
new file mode 100644
index 0000000..dddb10c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
+import
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TransactionRecoverTrackerTest {
+
+ @Test
+ public void openStatusRecoverTrackerTest() throws Exception {
+ TransactionMetadataStoreService transactionMetadataStoreService =
mock(TransactionMetadataStoreService.class);
+ TransactionTimeoutTracker timeoutTracker = new
TransactionTimeoutTrackerFactoryImpl(
+ transactionMetadataStoreService, new
HashedWheelTimer()).newTracker(TransactionCoordinatorID.get(1));
+ TransactionRecoverTrackerImpl recoverTracker =
+ new
TransactionRecoverTrackerImpl(transactionMetadataStoreService, timeoutTracker,
1);
+
+ recoverTracker.handleOpenStatusTransaction(1, 200);
+ recoverTracker.handleOpenStatusTransaction(2, 300);
+
+ Field field =
TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+ field.setAccessible(true);
+ Map<Long, Long> map = (Map<Long, Long>) field.get(recoverTracker);
+
+ assertEquals(map.size(), 2);
+ assertEquals(map.get(1L).longValue(), 200L);
+ assertEquals(map.get(2L).longValue(), 300L);
+
+ field =
TransactionTimeoutTrackerImpl.class.getDeclaredField("priorityQueue");
+ field.setAccessible(true);
+ TripleLongPriorityQueue priorityQueue = (TripleLongPriorityQueue)
field.get(timeoutTracker);
+ assertEquals(priorityQueue.size(), 0);
+
+ recoverTracker.appendOpenTransactionToTimeoutTracker();
+ assertEquals(priorityQueue.size(), 2);
+ }
+
+ @Test
+ public void updateStatusRecoverTest() throws Exception {
+ TransactionRecoverTrackerImpl recoverTracker =
+ new
TransactionRecoverTrackerImpl(mock(TransactionMetadataStoreService.class),
+ mock(TransactionTimeoutTrackerImpl.class), 1);
+ long committingSequenceId = 1L;
+ long committedSequenceId = 2L;
+ long abortingSequenceId = 3L;
+ long abortedSequenceId = 4L;
+ recoverTracker.handleOpenStatusTransaction(committingSequenceId, 100);
+ recoverTracker.handleOpenStatusTransaction(committedSequenceId, 100);
+ recoverTracker.handleOpenStatusTransaction(abortingSequenceId, 100);
+ recoverTracker.handleOpenStatusTransaction(abortedSequenceId, 100);
+
+ Field field =
TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+ field.setAccessible(true);
+ Map<Long, Long> openMap = (Map<Long, Long>) field.get(recoverTracker);
+ assertEquals(4, openMap.size());
+
+ recoverTracker.updateTransactionStatus(committingSequenceId,
TxnStatus.COMMITTING);
+ assertEquals(3, openMap.size());
+ recoverTracker.updateTransactionStatus(committedSequenceId,
TxnStatus.COMMITTING);
+ assertEquals(2, openMap.size());
+ recoverTracker.updateTransactionStatus(committedSequenceId,
TxnStatus.COMMITTED);
+
+ recoverTracker.updateTransactionStatus(abortingSequenceId,
TxnStatus.ABORTING);
+ assertEquals(1, openMap.size());
+ recoverTracker.updateTransactionStatus(abortedSequenceId,
TxnStatus.ABORTING);
+ assertEquals(0, openMap.size());
+ recoverTracker.updateTransactionStatus(abortedSequenceId,
TxnStatus.ABORTED);
+
+ field =
TransactionRecoverTrackerImpl.class.getDeclaredField("committingTransactions");
+ field.setAccessible(true);
+ Set<Long> commitSet = (Set<Long>) field.get(recoverTracker);
+
+ assertEquals(commitSet.size(), 1);
+ assertTrue(commitSet.contains(committingSequenceId));
+ assertFalse(commitSet.contains(committedSequenceId));
+
+ field =
TransactionRecoverTrackerImpl.class.getDeclaredField("abortingTransactions");
+ field.setAccessible(true);
+ Set<Long> abortSet = (Set<Long>) field.get(recoverTracker);
+
+ assertEquals(1, abortSet.size());
+ assertTrue(abortSet.contains(abortingSequenceId));
+ assertFalse(abortSet.contains(abortedSequenceId));
+ }
+}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index c723cf2..4a39824 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -60,11 +60,13 @@ public interface TransactionMetadataStoreProvider {
* @param managedLedgerFactory {@link ManagedLedgerFactory} the
managedLedgerFactory to create managedLedger.
* @param managedLedgerConfig {@link ManagedLedgerConfig} the
managedLedgerConfig to create managedLedger.
* @param timeoutTracker {@link TransactionTimeoutTracker} the
timeoutTracker to handle transaction time out.
+ * @param recoverTracker {@link TransactionRecoverTracker} the
recoverTracker to handle transaction recover.
* @return a future represents the result of the operation.
* an instance of {@link TransactionMetadataStore} is returned
* if the operation succeeds.
*/
CompletableFuture<TransactionMetadataStore> openStore(
TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,
- ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker
timeoutTracker);
+ ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker
timeoutTracker,
+ TransactionRecoverTracker recoverTracker);
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
new file mode 100644
index 0000000..6598625
--- /dev/null
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+/**
+ * This tracker is for transaction metadata store recover handle the different
status transaction.
+ */
+public interface TransactionRecoverTracker {
+
+ /**
+ * Handle recover transaction update status.
+ * @param sequenceId {@link long} the sequenceId of this transaction.
+ * @param txnStatus {@link long} the txn status of this operation.
+ */
+ void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws
CoordinatorException.InvalidTxnStatusException;
+
+ /**
+ * Handle recover transaction in open status.
+ * @param sequenceId {@link Long} the sequenceId of this transaction.
+ * @param timeout {@link long} the timeout time of this transaction.
+ */
+ void handleOpenStatusTransaction(long sequenceId, long timeout);
+
+ /**
+ * Handle the transaction in open status append to transaction timeout
tracker.
+ */
+ void appendOpenTransactionToTimeoutTracker();
+
+ /**
+ * Handle the transaction in committing and aborting status.
+ */
+ void handleCommittingAndAbortingTransaction();
+}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 152d8fd..4c4c04d 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -24,8 +24,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
-import
org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
/**
* The provider that offers in-memory implementation of {@link
TransactionMetadataStore}.
@@ -36,7 +36,8 @@ public class InMemTransactionMetadataStoreProvider implements
TransactionMetadat
public CompletableFuture<TransactionMetadataStore>
openStore(TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
-
TransactionTimeoutTracker timeoutTracker) {
+
TransactionTimeoutTracker timeoutTracker,
+
TransactionRecoverTracker recoverTracker) {
return CompletableFuture.completedFuture(
new InMemTransactionMetadataStore(transactionCoordinatorId));
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f86e566..8d2e220 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -37,6 +37,7 @@ import
org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
@@ -68,7 +69,8 @@ public class MLTransactionMetadataStore
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
- TransactionTimeoutTracker
timeoutTracker) {
+ TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker
recoverTracker) {
super(State.None);
this.tcID = tcID;
this.transactionLog = mlTransactionLog;
@@ -82,9 +84,11 @@ public class MLTransactionMetadataStore
@Override
public void replayComplete() {
+ recoverTracker.appendOpenTransactionToTimeoutTracker();
if (!changeToReadyState()) {
log.error("Managed ledger transaction metadata store
change state error when replay complete");
} else {
+ recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
}
}
@@ -98,8 +102,9 @@ public class MLTransactionMetadataStore
transactionMetadataEntry.getTxnidLeastBits());
switch (transactionMetadataEntry.getMetadataOp()) {
case NEW:
- if (sequenceId.get() <
transactionMetadataEntry.getTxnidLeastBits()) {
-
sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
+ long txnSequenceId =
transactionMetadataEntry.getTxnidLeastBits();
+ if (sequenceId.get() < txnSequenceId) {
+ sequenceId.set(txnSequenceId);
}
if (txnMetaMap.containsKey(txnID)) {
txnMetaMap.get(txnID).getRight().add(position);
@@ -108,8 +113,9 @@ public class MLTransactionMetadataStore
positions.add(position);
txnMetaMap.put(txnID, MutablePair.of(new
TxnMetaImpl(txnID), positions));
txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
-
timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(),
-
transactionMetadataEntry.getTimeoutMs());
+
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+ transactionMetadataEntry.getTimeoutMs()
+ +
transactionMetadataEntry.getStartTime());
}
break;
case ADD_PARTITION:
@@ -136,17 +142,17 @@ public class MLTransactionMetadataStore
transactionLog.deletePosition(Collections.singletonList(position));
} else {
TxnStatus newStatus =
transactionMetadataEntry.getNewStatus();
+ txnMetaMap.get(txnID).getLeft()
+
.updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+
transactionMetadataEntry.getExpectedStatus());
+ txnMetaMap.get(txnID).getRight().add(position);
+
recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
if (newStatus == TxnStatus.COMMITTED ||
newStatus == TxnStatus.ABORTED) {
transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v ->
{
- TxnMeta txnMeta =
txnMetaMap.remove(txnID).getLeft();
+ txnMetaMap.remove(txnID).getLeft();
txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
});
- } else {
- txnMetaMap.get(txnID).getLeft()
-
.updateTxnStatus(transactionMetadataEntry.getNewStatus(),
-
transactionMetadataEntry.getExpectedStatus());
}
- txnMetaMap.get(txnID).getRight().add(position);
}
break;
default:
@@ -155,6 +161,7 @@ public class MLTransactionMetadataStore
+ "from transaction log with unknown
operation");
}
} catch (InvalidTxnStatusException e) {
+
transactionLog.deletePosition(Collections.singletonList(position));
log.error(e.getMessage(), e);
}
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index b8a3055..bdf0d56 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import
org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.slf4j.Logger;
@@ -41,13 +42,14 @@ public class MLTransactionMetadataStoreProvider implements
TransactionMetadataSt
public CompletableFuture<TransactionMetadataStore>
openStore(TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
-
TransactionTimeoutTracker timeoutTracker) {
+
TransactionTimeoutTracker timeoutTracker,
+
TransactionRecoverTracker recoverTracker) {
TransactionMetadataStore transactionMetadataStore;
try {
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorId,
new MLTransactionLogImpl(transactionCoordinatorId,
- managedLedgerFactory,
managedLedgerConfig), timeoutTracker);
+ managedLedgerFactory,
managedLedgerConfig), timeoutTracker, recoverTracker);
} catch (Exception e) {
log.error("MLTransactionMetadataStore init fail", e);
return FutureUtil.failedFuture(e);
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index b885e78..b6f2702 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@@ -56,7 +57,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
int checkReplayRetryCount = 0;
while (true) {
checkReplayRetryCount++;
@@ -120,7 +121,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
managedLedgerConfig);
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -160,7 +161,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
MLTransactionMetadataStore transactionMetadataStoreTest =
new
MLTransactionMetadataStore(transactionCoordinatorID,
new
MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig()), new
TransactionTimeoutTrackerImpl());
+ new ManagedLedgerConfig()), new
TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
while (true) {
if (checkReplayRetryCount > 6) {
@@ -222,7 +223,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -282,7 +283,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
Awaitility.await().atMost(3000,
TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
@@ -299,7 +300,7 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
new ManagedLedgerConfig());
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID,
mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new
TransactionRecoverTrackerImpl());
Awaitility.await().atMost(3000,
TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
}
@@ -326,4 +327,27 @@ public class MLTransactionMetadataStoreTest extends
MockedBookKeeperTestCase {
}
}
+
+ public static class TransactionRecoverTrackerImpl implements
TransactionRecoverTracker {
+
+ @Override
+ public void updateTransactionStatus(long sequenceId, TxnStatus
txnStatus) throws CoordinatorException.InvalidTxnStatusException {
+
+ }
+
+ @Override
+ public void handleOpenStatusTransaction(long sequenceId, long timeout)
{
+
+ }
+
+ @Override
+ public void appendOpenTransactionToTimeoutTracker() {
+
+ }
+
+ @Override
+ public void handleCommittingAndAbortingTransaction() {
+
+ }
+ }
}
\ No newline at end of file
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 349bcba..26ced4c 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,8 @@ public class TransactionMetadataStoreProviderTest {
@BeforeMethod
public void setup() throws Exception {
this.tcId = new TransactionCoordinatorID(1L);
- this.store = this.provider.openStore(tcId, null, null, null).get();
+ this.store = this.provider.openStore(tcId, null, null,
+ null, new
MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl()).get();
}
@Test