This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36c3bc3  [Transaction] Tc recover handle transaction in committing and 
aborting status . (#10179)
36c3bc3 is described below

commit 36c3bc3178a2e7b5d05b8eb0148242379896c63a
Author: congbo <[email protected]>
AuthorDate: Mon Apr 19 08:32:48 2021 +0800

    [Transaction] Tc recover handle transaction in committing and aborting 
status . (#10179)
    
    ## Motivation
    Now recover don't handle transaction in committing or aborting status, it 
only add to ```transactionTimeOutTracker```.
    
    ## implement
    Add ```TransactionRecoverTracker``` to handle different status transaction.
    
    ```
        /**
         * Handle recover transaction update status.
         * @param sequenceId {@link long} the sequenceId of this transaction.
         * @param txnStatus {@link long} the txn status of this operation.
         */
        void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) 
throws CoordinatorException.InvalidTxnStatusException;
    
        /**
         * Handle recover transaction in open status.
         * @param sequenceId {@link Long} the sequenceId of this transaction.
         * @param timeout {@link long} the timeout time of this transaction.
         */
        void handleOpenStatusTransaction(long sequenceId, long timeout);
    
        /**
         * Handle the transaction in open status append to transaction timeout 
tracker.
         */
        void appendOpenTransactionToTimeoutTracker();
    
        /**
         * Handle the transaction in committing and aborting.
         */
        void handleCommittingAndAbortingTransaction();
    ```
    ### Verifying this change
    Add the tests for it
    
    Does this pull request potentially affect one of the following parts:
    If yes was chosen, please highlight the changes
    
    Dependencies (does it add or upgrade a dependency): (no)
    The public API: (no)
    The schema: (no)
    The default values of configurations: (no)
    The wire protocol: (no)
    The rest endpoints: (no)
    The admin cli options: (no)
    Anything that affects deployment: (no)
---
 .../broker/TransactionMetadataStoreService.java    |   9 +-
 .../recover/TransactionRecoverTrackerImpl.java     | 131 +++++++++++++++++++++
 .../broker/transaction/recover/package-info.java   |  22 ++++
 .../recover/TransactionRecoverTrackerTest.java     | 117 ++++++++++++++++++
 .../TransactionMetadataStoreProvider.java          |   4 +-
 .../coordinator/TransactionRecoverTracker.java     |  52 ++++++++
 .../InMemTransactionMetadataStoreProvider.java     |   5 +-
 .../impl/MLTransactionMetadataStore.java           |  29 +++--
 .../impl/MLTransactionMetadataStoreProvider.java   |   6 +-
 .../MLTransactionMetadataStoreTest.java            |  36 +++++-
 .../TransactionMetadataStoreProviderTest.java      |   3 +-
 11 files changed, 390 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 8b1d14e..d1a39c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
+import 
org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
 import 
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
@@ -47,7 +48,9 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import 
org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
@@ -137,8 +140,12 @@ public class TransactionMetadataStoreService {
                     if (e != null) {
                         LOG.error("Add transaction metadata store with id {} 
error", tcId.getId(), e);
                     } else {
+                        TransactionTimeoutTracker timeoutTracker = 
timeoutTrackerFactory.newTracker(tcId);
+                        TransactionRecoverTracker recoverTracker =
+                                new 
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+                                        timeoutTracker, tcId.getId());
                         transactionMetadataStoreProvider.openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v,
-                                timeoutTrackerFactory.newTracker(tcId))
+                                timeoutTracker, recoverTracker)
                                 .whenComplete((store, ex) -> {
                                     if (ex != null) {
                                         LOG.error("Add transaction metadata 
store with id {} error", tcId.getId(), ex);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
new file mode 100644
index 0000000..dc10162
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+
+/**
+ * The transaction recover tracker implementation {@link 
TransactionRecoverTracker}.
+ */
+@Slf4j
+public class TransactionRecoverTrackerImpl implements 
TransactionRecoverTracker {
+
+    private final long tcId;
+    private final TransactionMetadataStoreService 
transactionMetadataStoreService;
+    private final TransactionTimeoutTracker timeoutTracker;
+
+    /**
+     * This is for recover open status transaction. The key is this 
transaction's sequenceId, the value is this
+     * transaction timeout time.
+     * <p>
+     *     When transaction update status to committing or aborting, it will 
be remove form this.
+     * <p>
+     *     When transactionMetadataStore recover complete, the transaction 
don't update status, it will send all
+     *     transaction to transactionTimeoutTracker.
+     *
+     */
+    private final Map<Long, Long> openTransactions;
+
+    /**
+     * Update transaction to committing status.
+     * <p>
+     *     When transaction update status to committing, it will be add in.
+     * <p>
+     *     When transaction update status to committed status, the transaction 
will remove from it.
+     * <p>
+     *     When transactionMetadataStore recover complete, all transaction in 
this will endTransaction by commit action.
+     */
+    private final Set<Long> committingTransactions;
+
+    /**
+     * Update transaction to aborting status.
+     * <p>
+     *     When transaction update status to aborting, it will be add in.
+     * <p>
+     *     When transaction update status to aborted status, the transaction 
will remove from it.
+     * <p>
+     *     When transactionMetadataStore recover complete, all transaction in 
this will endTransaction by abort action.
+     */
+    private final Set<Long> abortingTransactions;
+
+    public TransactionRecoverTrackerImpl(TransactionMetadataStoreService 
transactionMetadataStoreService,
+                                  TransactionTimeoutTracker timeoutTracker, 
long tcId) {
+        this.tcId = tcId;
+        this.transactionMetadataStoreService = transactionMetadataStoreService;
+        this.openTransactions = new HashMap<>();
+        this.committingTransactions = new HashSet<>();
+        this.abortingTransactions = new HashSet<>();
+        this.timeoutTracker = timeoutTracker;
+    }
+
+    @Override
+    public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) 
throws InvalidTxnStatusException {
+        switch (txnStatus) {
+            case COMMITTING:
+                openTransactions.remove(sequenceId);
+                committingTransactions.add(sequenceId);
+                break;
+            case ABORTING:
+                openTransactions.remove(sequenceId);
+                abortingTransactions.add(sequenceId);
+                break;
+            case ABORTED:
+                abortingTransactions.remove(sequenceId);
+                break;
+            case COMMITTED:
+                committingTransactions.remove(sequenceId);
+                break;
+            default:
+                throw new InvalidTxnStatusException("Transaction recover 
tracker`"
+                        + new TxnID(tcId, sequenceId) + "` load replay 
metadata operation "
+                        + "from transaction log with unknown operation");
+        }
+    }
+
+    @Override
+    public void handleOpenStatusTransaction(long sequenceId, long timeout) {
+        openTransactions.put(sequenceId, timeout);
+    }
+
+    @Override
+    public void appendOpenTransactionToTimeoutTracker() {
+        openTransactions.forEach(timeoutTracker::replayAddTransaction);
+    }
+
+    @Override
+    public void handleCommittingAndAbortingTransaction() {
+        committingTransactions.forEach(k ->
+                transactionMetadataStoreService.endTransaction(new TxnID(tcId, 
k), TxnAction.COMMIT_VALUE));
+
+        abortingTransactions.forEach(k ->
+                transactionMetadataStoreService.endTransaction(new TxnID(tcId, 
k), TxnAction.ABORT_VALUE));
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
new file mode 100644
index 0000000..9b99bb6
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of a transaction recover tracker.
+ */
+package org.apache.pulsar.broker.transaction.recover;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
new file mode 100644
index 0000000..dddb10c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import 
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
+import 
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TransactionRecoverTrackerTest {
+
+    @Test
+    public void openStatusRecoverTrackerTest() throws Exception {
+        TransactionMetadataStoreService transactionMetadataStoreService = 
mock(TransactionMetadataStoreService.class);
+        TransactionTimeoutTracker timeoutTracker = new 
TransactionTimeoutTrackerFactoryImpl(
+                transactionMetadataStoreService, new 
HashedWheelTimer()).newTracker(TransactionCoordinatorID.get(1));
+        TransactionRecoverTrackerImpl recoverTracker =
+                new 
TransactionRecoverTrackerImpl(transactionMetadataStoreService, timeoutTracker, 
1);
+
+        recoverTracker.handleOpenStatusTransaction(1, 200);
+        recoverTracker.handleOpenStatusTransaction(2, 300);
+
+        Field field = 
TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+        field.setAccessible(true);
+        Map<Long, Long> map = (Map<Long, Long>) field.get(recoverTracker);
+
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1L).longValue(), 200L);
+        assertEquals(map.get(2L).longValue(), 300L);
+
+        field = 
TransactionTimeoutTrackerImpl.class.getDeclaredField("priorityQueue");
+        field.setAccessible(true);
+        TripleLongPriorityQueue priorityQueue = (TripleLongPriorityQueue) 
field.get(timeoutTracker);
+        assertEquals(priorityQueue.size(), 0);
+
+        recoverTracker.appendOpenTransactionToTimeoutTracker();
+        assertEquals(priorityQueue.size(), 2);
+    }
+
+    @Test
+    public void updateStatusRecoverTest() throws Exception {
+        TransactionRecoverTrackerImpl recoverTracker =
+                new 
TransactionRecoverTrackerImpl(mock(TransactionMetadataStoreService.class),
+                        mock(TransactionTimeoutTrackerImpl.class), 1);
+        long committingSequenceId = 1L;
+        long committedSequenceId = 2L;
+        long abortingSequenceId = 3L;
+        long abortedSequenceId = 4L;
+        recoverTracker.handleOpenStatusTransaction(committingSequenceId, 100);
+        recoverTracker.handleOpenStatusTransaction(committedSequenceId, 100);
+        recoverTracker.handleOpenStatusTransaction(abortingSequenceId, 100);
+        recoverTracker.handleOpenStatusTransaction(abortedSequenceId, 100);
+
+        Field field = 
TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+        field.setAccessible(true);
+        Map<Long, Long> openMap = (Map<Long, Long>) field.get(recoverTracker);
+        assertEquals(4, openMap.size());
+
+        recoverTracker.updateTransactionStatus(committingSequenceId, 
TxnStatus.COMMITTING);
+        assertEquals(3, openMap.size());
+        recoverTracker.updateTransactionStatus(committedSequenceId, 
TxnStatus.COMMITTING);
+        assertEquals(2, openMap.size());
+        recoverTracker.updateTransactionStatus(committedSequenceId, 
TxnStatus.COMMITTED);
+
+        recoverTracker.updateTransactionStatus(abortingSequenceId, 
TxnStatus.ABORTING);
+        assertEquals(1, openMap.size());
+        recoverTracker.updateTransactionStatus(abortedSequenceId, 
TxnStatus.ABORTING);
+        assertEquals(0, openMap.size());
+        recoverTracker.updateTransactionStatus(abortedSequenceId, 
TxnStatus.ABORTED);
+
+        field = 
TransactionRecoverTrackerImpl.class.getDeclaredField("committingTransactions");
+        field.setAccessible(true);
+        Set<Long> commitSet = (Set<Long>) field.get(recoverTracker);
+
+        assertEquals(commitSet.size(), 1);
+        assertTrue(commitSet.contains(committingSequenceId));
+        assertFalse(commitSet.contains(committedSequenceId));
+
+        field = 
TransactionRecoverTrackerImpl.class.getDeclaredField("abortingTransactions");
+        field.setAccessible(true);
+        Set<Long> abortSet = (Set<Long>) field.get(recoverTracker);
+
+        assertEquals(1, abortSet.size());
+        assertTrue(abortSet.contains(abortingSequenceId));
+        assertFalse(abortSet.contains(abortedSequenceId));
+    }
+}
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index c723cf2..4a39824 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -60,11 +60,13 @@ public interface TransactionMetadataStoreProvider {
      * @param managedLedgerFactory {@link ManagedLedgerFactory} the 
managedLedgerFactory to create managedLedger.
      * @param managedLedgerConfig {@link ManagedLedgerConfig} the 
managedLedgerConfig to create managedLedger.
      * @param timeoutTracker {@link TransactionTimeoutTracker} the 
timeoutTracker to handle transaction time out.
+     * @param recoverTracker {@link TransactionRecoverTracker} the 
recoverTracker to handle transaction recover.
      * @return a future represents the result of the operation.
      *         an instance of {@link TransactionMetadataStore} is returned
      *         if the operation succeeds.
      */
     CompletableFuture<TransactionMetadataStore> openStore(
             TransactionCoordinatorID transactionCoordinatorId, 
ManagedLedgerFactory managedLedgerFactory,
-            ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker 
timeoutTracker);
+            ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker 
timeoutTracker,
+            TransactionRecoverTracker recoverTracker);
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
new file mode 100644
index 0000000..6598625
--- /dev/null
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+/**
+ * This tracker is for transaction metadata store recover handle the different 
status transaction.
+ */
+public interface TransactionRecoverTracker {
+
+    /**
+     * Handle recover transaction update status.
+     * @param sequenceId {@link long} the sequenceId of this transaction.
+     * @param txnStatus {@link long} the txn status of this operation.
+     */
+    void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws 
CoordinatorException.InvalidTxnStatusException;
+
+    /**
+     * Handle recover transaction in open status.
+     * @param sequenceId {@link Long} the sequenceId of this transaction.
+     * @param timeout {@link long} the timeout time of this transaction.
+     */
+    void handleOpenStatusTransaction(long sequenceId, long timeout);
+
+    /**
+     * Handle the transaction in open status append to transaction timeout 
tracker.
+     */
+    void appendOpenTransactionToTimeoutTracker();
+
+    /**
+     * Handle the transaction in committing and aborting status.
+     */
+    void handleCommittingAndAbortingTransaction();
+}
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 152d8fd..4c4c04d 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -24,8 +24,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
-import 
org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
 
 /**
  * The provider that offers in-memory implementation of {@link 
TransactionMetadataStore}.
@@ -36,7 +36,8 @@ public class InMemTransactionMetadataStoreProvider implements 
TransactionMetadat
     public CompletableFuture<TransactionMetadataStore> 
openStore(TransactionCoordinatorID transactionCoordinatorId,
                                                                  
ManagedLedgerFactory managedLedgerFactory,
                                                                  
ManagedLedgerConfig managedLedgerConfig,
-                                                                 
TransactionTimeoutTracker timeoutTracker) {
+                                                                 
TransactionTimeoutTracker timeoutTracker,
+                                                                 
TransactionRecoverTracker recoverTracker) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f86e566..8d2e220 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -37,6 +37,7 @@ import 
org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
@@ -68,7 +69,8 @@ public class MLTransactionMetadataStore
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
-                                      TransactionTimeoutTracker 
timeoutTracker) {
+                                      TransactionTimeoutTracker timeoutTracker,
+                                      TransactionRecoverTracker 
recoverTracker) {
         super(State.None);
         this.tcID = tcID;
         this.transactionLog = mlTransactionLog;
@@ -82,9 +84,11 @@ public class MLTransactionMetadataStore
 
             @Override
             public void replayComplete() {
+                recoverTracker.appendOpenTransactionToTimeoutTracker();
                 if (!changeToReadyState()) {
                     log.error("Managed ledger transaction metadata store 
change state error when replay complete");
                 } else {
+                    recoverTracker.handleCommittingAndAbortingTransaction();
                     timeoutTracker.start();
                 }
             }
@@ -98,8 +102,9 @@ public class MLTransactionMetadataStore
                             transactionMetadataEntry.getTxnidLeastBits());
                     switch (transactionMetadataEntry.getMetadataOp()) {
                         case NEW:
-                            if (sequenceId.get() < 
transactionMetadataEntry.getTxnidLeastBits()) {
-                                
sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
+                            long txnSequenceId = 
transactionMetadataEntry.getTxnidLeastBits();
+                            if (sequenceId.get() < txnSequenceId) {
+                                sequenceId.set(txnSequenceId);
                             }
                             if (txnMetaMap.containsKey(txnID)) {
                                 txnMetaMap.get(txnID).getRight().add(position);
@@ -108,8 +113,9 @@ public class MLTransactionMetadataStore
                                 positions.add(position);
                                 txnMetaMap.put(txnID, MutablePair.of(new 
TxnMetaImpl(txnID), positions));
                                 
txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
-                                
timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(),
-                                        
transactionMetadataEntry.getTimeoutMs());
+                                
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                        transactionMetadataEntry.getTimeoutMs()
+                                                + 
transactionMetadataEntry.getStartTime());
                             }
                             break;
                         case ADD_PARTITION:
@@ -136,17 +142,17 @@ public class MLTransactionMetadataStore
                                 
transactionLog.deletePosition(Collections.singletonList(position));
                             } else {
                                 TxnStatus newStatus = 
transactionMetadataEntry.getNewStatus();
+                                txnMetaMap.get(txnID).getLeft()
+                                        
.updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+                                                
transactionMetadataEntry.getExpectedStatus());
+                                txnMetaMap.get(txnID).getRight().add(position);
+                                
recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
                                 if (newStatus == TxnStatus.COMMITTED || 
newStatus == TxnStatus.ABORTED) {
                                     
transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> 
{
-                                        TxnMeta txnMeta = 
txnMetaMap.remove(txnID).getLeft();
+                                        txnMetaMap.remove(txnID).getLeft();
                                         
txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
                                     });
-                                } else {
-                                    txnMetaMap.get(txnID).getLeft()
-                                            
.updateTxnStatus(transactionMetadataEntry.getNewStatus(),
-                                                    
transactionMetadataEntry.getExpectedStatus());
                                 }
-                                txnMetaMap.get(txnID).getRight().add(position);
                             }
                             break;
                         default:
@@ -155,6 +161,7 @@ public class MLTransactionMetadataStore
                                     + "from transaction log with unknown 
operation");
                     }
                 } catch (InvalidTxnStatusException  e) {
+                    
transactionLog.deletePosition(Collections.singletonList(position));
                     log.error(e.getMessage(), e);
                 }
             }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index b8a3055..bdf0d56 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import 
org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
 import org.slf4j.Logger;
@@ -41,13 +42,14 @@ public class MLTransactionMetadataStoreProvider implements 
TransactionMetadataSt
     public CompletableFuture<TransactionMetadataStore> 
openStore(TransactionCoordinatorID transactionCoordinatorId,
                                                                  
ManagedLedgerFactory managedLedgerFactory,
                                                                  
ManagedLedgerConfig managedLedgerConfig,
-                                                                 
TransactionTimeoutTracker timeoutTracker) {
+                                                                 
TransactionTimeoutTracker timeoutTracker,
+                                                                 
TransactionRecoverTracker recoverTracker) {
         TransactionMetadataStore transactionMetadataStore;
         try {
             transactionMetadataStore =
                     new MLTransactionMetadataStore(transactionCoordinatorId,
                             new MLTransactionLogImpl(transactionCoordinatorId,
-                                    managedLedgerFactory, 
managedLedgerConfig), timeoutTracker);
+                                    managedLedgerFactory, 
managedLedgerConfig), timeoutTracker, recoverTracker);
         } catch (Exception e) {
             log.error("MLTransactionMetadataStore init fail", e);
             return FutureUtil.failedFuture(e);
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index b885e78..b6f2702 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@@ -56,7 +57,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -120,7 +121,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 managedLedgerConfig);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -160,7 +161,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new 
MLTransactionMetadataStore(transactionCoordinatorID,
                                 new 
MLTransactionLogImpl(transactionCoordinatorID, factory,
-                                        new ManagedLedgerConfig()), new 
TransactionTimeoutTrackerImpl());
+                                        new ManagedLedgerConfig()), new 
TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -222,7 +223,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -282,7 +283,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
 
 
         Awaitility.await().atMost(3000, 
TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
@@ -299,7 +300,7 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, 
mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new 
TransactionRecoverTrackerImpl());
 
         Awaitility.await().atMost(3000, 
TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
     }
@@ -326,4 +327,27 @@ public class MLTransactionMetadataStoreTest extends 
MockedBookKeeperTestCase {
 
         }
     }
+
+    public static class TransactionRecoverTrackerImpl implements 
TransactionRecoverTracker {
+
+        @Override
+        public void updateTransactionStatus(long sequenceId, TxnStatus 
txnStatus) throws CoordinatorException.InvalidTxnStatusException {
+
+        }
+
+        @Override
+        public void handleOpenStatusTransaction(long sequenceId, long timeout) 
{
+
+        }
+
+        @Override
+        public void appendOpenTransactionToTimeoutTracker() {
+
+        }
+
+        @Override
+        public void handleCommittingAndAbortingTransaction() {
+
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 349bcba..26ced4c 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,8 @@ public class TransactionMetadataStoreProviderTest {
     @BeforeMethod
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
-        this.store = this.provider.openStore(tcId, null, null, null).get();
+        this.store = this.provider.openStore(tcId, null, null,
+                null, new 
MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl()).get();
     }
 
     @Test

Reply via email to