This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ad638f9fe [enhancement](transaction) Reduce hold writeLock time for 
DatabaseTransactionMgr to clear transaction (#17414)
0ad638f9fe is described below

commit 0ad638f9fe5217182c9c10cfb118a1ac46191208
Author: caiconghui <[email protected]>
AuthorDate: Mon Mar 6 11:32:21 2023 +0800

    [enhancement](transaction) Reduce hold writeLock time for 
DatabaseTransactionMgr to clear transaction (#17414)
    
    * [enhancement](transaction) Reduce hold writeLock time for 
DatabaseTransactionMgr to clear transaction
    
    * fix ut
    
    * remove unnessary field for remove txn bdbje log
    
    ---------
    
    Co-authored-by: caiconghui1 <[email protected]>
---
 .../org/apache/doris/journal/JournalEntity.java    |  6 ++
 .../BatchRemoveTransactionsOperationV2.java        | 70 +++++++++++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     | 10 ++-
 .../org/apache/doris/persist/OperationType.java    |  5 +-
 .../doris/transaction/DatabaseTransactionMgr.java  | 71 +++++++++++++++-------
 .../doris/transaction/GlobalTransactionMgr.java    | 10 +++
 .../java/org/apache/doris/catalog/FakeEditLog.java |  4 +-
 7 files changed, 148 insertions(+), 28 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 839071e241..1d9cba3b6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -71,6 +71,7 @@ import org.apache.doris.persist.BackendTabletsInfo;
 import org.apache.doris.persist.BatchDropInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.CleanLabelOperationLog;
 import org.apache.doris.persist.ClusterInfo;
 import org.apache.doris.persist.ColocatePersistInfo;
@@ -453,6 +454,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_BATCH_REMOVE_TXNS_V2: {
+                data = BatchRemoveTransactionsOperationV2.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_CREATE_REPOSITORY: {
                 data = Repository.read(in);
                 isRead = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java
new file mode 100644
index 0000000000..0371a61bc0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchRemoveTransactionsOperationV2.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+// Persist the info when removing batch of expired txns
+public class BatchRemoveTransactionsOperationV2 implements Writable {
+
+    @SerializedName(value = "dbId")
+    private long dbId;
+
+    @SerializedName(value = "latestTxnIdForShort")
+    private long latestTxnIdForShort;
+
+    @SerializedName(value = "latestTxnIdForLong")
+    private long latestTxnIdForLong;
+
+    public BatchRemoveTransactionsOperationV2(long dbId, long 
latestTxnIdForShort, long latestTxnIdForLong) {
+        this.dbId = dbId;
+        this.latestTxnIdForShort = latestTxnIdForShort;
+        this.latestTxnIdForLong = latestTxnIdForLong;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public long getLatestTxnIdForShort() {
+        return latestTxnIdForShort;
+    }
+
+    public long getLatestTxnIdForLong() {
+        return latestTxnIdForLong;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static BatchRemoveTransactionsOperationV2 read(DataInput in) throws 
IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, 
BatchRemoveTransactionsOperationV2.class);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 7c27274530..e8cd29235a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -562,6 +562,12 @@ public class EditLog {
                     
Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
                     break;
                 }
+                case OperationType.OP_BATCH_REMOVE_TXNS_V2: {
+                    final BatchRemoveTransactionsOperationV2 operation =
+                            (BatchRemoveTransactionsOperationV2) 
journal.getData();
+                    
Env.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactionV2(operation);
+                    break;
+                }
                 case OperationType.OP_CREATE_REPOSITORY: {
                     Repository repository = (Repository) journal.getData();
                     
env.getBackupHandler().getRepoMgr().addAndInitRepoIfNotExist(repository, true);
@@ -1604,8 +1610,8 @@ public class EditLog {
         logEdit(OperationType.OP_REPLACE_TABLE, log);
     }
 
-    public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation 
op) {
-        logEdit(OperationType.OP_BATCH_REMOVE_TXNS, op);
+    public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 
op) {
+        logEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op);
     }
 
     public void logModifyComment(ModifyCommentOperationLog op) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 102270a007..32434d08c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -163,12 +163,15 @@ public class OperationType {
     //real time load 100 -108
     public static final short OP_UPSERT_TRANSACTION_STATE = 100;
     @Deprecated
-    // use OP_BATCH_REMOVE_TXNS instead
+    // use OP_BATCH_REMOVE_TXNS_V2 instead
     public static final short OP_DELETE_TRANSACTION_STATE = 101;
     public static final short OP_FINISHING_ROLLUP = 102;
     public static final short OP_FINISHING_SCHEMA_CHANGE = 103;
     public static final short OP_SAVE_TRANSACTION_ID = 104;
+    @Deprecated
+    // use OP_BATCH_REMOVE_TXNS_V2 instead
     public static final short OP_BATCH_REMOVE_TXNS = 105;
+    public static final short OP_BATCH_REMOVE_TXNS_V2 = 106;
 
     // routine load 110~120
     public static final short OP_ROUTINE_LOAD_JOB = 110;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index ed5c0cff7f..29fcf2b296 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -46,7 +46,7 @@ import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.AgentBatchTask;
@@ -742,6 +742,33 @@ public class DatabaseTransactionMgr {
         }
     }
 
+    public void 
replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 operation) {
+        writeLock();
+        try {
+            if (operation.getLatestTxnIdForShort() != -1) {
+                while (!finalStatusTransactionStateDequeShort.isEmpty()) {
+                    TransactionState transactionState = 
finalStatusTransactionStateDequeShort.pop();
+                    clearTransactionState(transactionState.getTransactionId());
+                    if (operation.getLatestTxnIdForShort() == 
transactionState.getTransactionId()) {
+                        break;
+                    }
+                }
+            }
+
+            if (operation.getLatestTxnIdForLong() != -1) {
+                while (!finalStatusTransactionStateDequeLong.isEmpty()) {
+                    TransactionState transactionState = 
finalStatusTransactionStateDequeLong.pop();
+                    clearTransactionState(transactionState.getTransactionId());
+                    if (operation.getLatestTxnIdForLong() == 
transactionState.getTransactionId()) {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     public TransactionStatus getLabelState(String label) {
         readLock();
         try {
@@ -1368,23 +1395,21 @@ public class DatabaseTransactionMgr {
     }
 
     public void removeExpiredTxns(long currentMillis) {
-        List<Long> expiredTxnIds = Lists.newArrayList();
         // delete expired txns
-        int leftNum = MAX_REMOVE_TXN_PER_ROUND;
         writeLock();
         try {
-            leftNum = unprotectedRemoveExpiredTxns(currentMillis, 
expiredTxnIds,
-                    finalStatusTransactionStateDequeShort, leftNum);
-            leftNum = unprotectedRemoveExpiredTxns(currentMillis, 
expiredTxnIds,
-                    finalStatusTransactionStateDequeLong, leftNum);
-
-            if (!expiredTxnIds.isEmpty()) {
-                Map<Long, List<Long>> dbExpiredTxnIds = Maps.newHashMap();
-                dbExpiredTxnIds.put(dbId, expiredTxnIds);
-                BatchRemoveTransactionsOperation op = new 
BatchRemoveTransactionsOperation(dbExpiredTxnIds);
+            Pair<Long, Integer> expiredTxnsInfoForShort = 
unprotectedRemoveExpiredTxns(currentMillis,
+                    finalStatusTransactionStateDequeShort, 
MAX_REMOVE_TXN_PER_ROUND);
+            Pair<Long, Integer> expiredTxnsInfoForLong = 
unprotectedRemoveExpiredTxns(currentMillis,
+                    finalStatusTransactionStateDequeLong,
+                    MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second);
+            int numOfClearedTransaction = expiredTxnsInfoForShort.second + 
expiredTxnsInfoForLong.second;
+            if (numOfClearedTransaction > 0) {
+                BatchRemoveTransactionsOperationV2 op = new 
BatchRemoveTransactionsOperationV2(dbId,
+                        expiredTxnsInfoForShort.first, 
expiredTxnsInfoForLong.first);
                 editLog.logBatchRemoveTransactions(op);
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Remove {} expired transactions", 
MAX_REMOVE_TXN_PER_ROUND - leftNum);
+                    LOG.debug("Remove {} expired transactions", 
numOfClearedTransaction);
                 }
             }
         } finally {
@@ -1392,22 +1417,22 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    private int unprotectedRemoveExpiredTxns(long currentMillis, List<Long> 
expiredTxnIds,
-                                             ArrayDeque<TransactionState> 
finalStatusTransactionStateDequeShort,
-                                             int maxNumber) {
-        int left = maxNumber;
-        while (!finalStatusTransactionStateDequeShort.isEmpty() && left > 0) {
-            TransactionState transactionState = 
finalStatusTransactionStateDequeShort.getFirst();
+    private Pair<Long, Integer> unprotectedRemoveExpiredTxns(long 
currentMillis,
+            ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int 
left) {
+        long latestTxnId = -1;
+        int numOfClearedTransaction = 0;
+        while (!finalStatusTransactionStateDeque.isEmpty() && 
numOfClearedTransaction < left) {
+            TransactionState transactionState = 
finalStatusTransactionStateDeque.getFirst();
             if (transactionState.isExpired(currentMillis)) {
-                finalStatusTransactionStateDequeShort.pop();
+                finalStatusTransactionStateDeque.pop();
                 clearTransactionState(transactionState.getTransactionId());
-                expiredTxnIds.add(transactionState.getTransactionId());
-                left--;
+                latestTxnId = transactionState.getTransactionId();
+                numOfClearedTransaction++;
             } else {
                 break;
             }
         }
-        return left;
+        return Pair.of(latestTxnId, numOfClearedTransaction);
     }
 
     private void clearTransactionState(long txnId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 3c84056789..18bf57bb19 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -35,6 +35,7 @@ import org.apache.doris.metric.AutoMappedMetric;
 import org.apache.doris.metric.GaugeMetricImpl;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TUniqueId;
@@ -441,6 +442,15 @@ public class GlobalTransactionMgr implements Writable {
         }
     }
 
+    public void 
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) {
+        try {
+            DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(operation.getDbId());
+            dbTransactionMgr.replayBatchRemoveTransaction(operation);
+        } catch (AnalysisException e) {
+            LOG.warn("replay batch remove transactions failed. db " + 
operation.getDbId(), e);
+        }
+    }
+
     public List<List<Comparable>> getDbInfo() {
         List<List<Comparable>> infos = new ArrayList<List<Comparable>>();
         List<Long> dbIds = 
Lists.newArrayList(dbIdToDatabaseTransactionMgrs.keySet());
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
index bffcc9c4f5..08800510a1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java
@@ -20,7 +20,7 @@ package org.apache.doris.catalog;
 import org.apache.doris.alter.AlterJobV2;
 import org.apache.doris.alter.BatchAlterJobPersistInfo;
 import org.apache.doris.cluster.Cluster;
-import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
@@ -91,7 +91,7 @@ public class FakeEditLog extends MockUp<EditLog> {
     }
 
     @Mock
-    public void logBatchRemoveTransactions(BatchRemoveTransactionsOperation 
info) {
+    public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 
info) {
 
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to