This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 970995b98a484a3e953617a10dc193301315e6ee
Author: caiconghui <[email protected]>
AuthorDate: Fri Mar 17 11:42:40 2023 +0800

    [fix](metric) Fix bug for that register txn replica failed (#17855)
---
 .../doris/load/routineload/KafkaTaskInfo.java      |  9 -------
 .../java/org/apache/doris/metric/MetricRepo.java   | 13 ----------
 .../org/apache/doris/planner/OlapTableSink.java    | 11 --------
 .../doris/transaction/DatabaseTransactionMgr.java  | 30 +---------------------
 .../doris/transaction/GlobalTransactionMgr.java    |  9 ++-----
 .../apache/doris/transaction/TransactionState.java |  8 ------
 6 files changed, 3 insertions(+), 77 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index 9b9efbb5a4..fa4be7855a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -20,7 +20,6 @@ package org.apache.doris.load.routineload;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -32,10 +31,7 @@ import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 import com.google.gson.Gson;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -43,16 +39,11 @@ import java.util.Map;
 import java.util.UUID;
 
 public class KafkaTaskInfo extends RoutineLoadTaskInfo {
-    private static final Logger LOG = 
LogManager.getLogger(KafkaTaskInfo.class);
-
     private RoutineLoadManager routineLoadManager = 
Env.getCurrentEnv().getRoutineLoadManager();
 
     // <partitionId, offset to be consumed>
     private Map<Integer, Long> partitionIdToOffset;
 
-    // Last fetched and cached latest partition offsets.
-    private List<Pair<Integer, Long>> cachedPartitionWithLatestOffsets = 
Lists.newArrayList();
-
     public KafkaTaskInfo(UUID id, long jobId, String clusterName,
             long timeoutMs, Map<Integer, Long> partitionIdToOffset) {
         super(id, jobId, clusterName, timeoutMs);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 85c4479fbc..f38bc84e20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -106,7 +106,6 @@ public final class MetricRepo {
     public static Histogram HISTO_TXN_PUBLISH_LATENCY;
     public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM;
     public static AutoMappedMetric<GaugeMetricImpl<Long>> 
DB_GAUGE_PUBLISH_TXN_NUM;
-    public static AutoMappedMetric<GaugeMetricImpl<Long>> 
DB_GAUGE_TXN_REPLICA_NUM;
 
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
@@ -431,18 +430,6 @@ public final class MetricRepo {
         DORIS_METRIC_REGISTER.addMetrics(publishTxnNum);
         DB_GAUGE_PUBLISH_TXN_NUM = addLabeledMetrics("db",
                 () -> new GaugeMetricImpl<>("publish_txn_num", 
MetricUnit.NOUNIT, "number of publish transactions"));
-
-        GaugeMetric<Long> txnReplicaNum = new 
GaugeMetric<Long>("txn_replica_num", MetricUnit.NOUNIT,
-                "number of writing tablets in all running transactions") {
-            @Override
-            public Long getValue() {
-                return 
Env.getCurrentGlobalTransactionMgr().getAllRunningTxnReplicaNum();
-            }
-        };
-        DORIS_METRIC_REGISTER.addMetrics(txnReplicaNum);
-        DB_GAUGE_TXN_REPLICA_NUM = addLabeledMetrics("db", () -> new 
GaugeMetricImpl<>("txn_replica_num",
-                MetricUnit.NOUNIT, "number of writing tablets in all running 
transactions"));
-
         COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", 
MetricUnit.ROWS,
                 "total rows of routine load");
         DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ROWS);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 422d7e2ddd..d9f7fd7158 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -62,7 +62,6 @@ import org.apache.doris.thrift.TPaloNodesInfo;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TTabletLocation;
 import org.apache.doris.thrift.TUniqueId;
-import org.apache.doris.transaction.DatabaseTransactionMgr;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
@@ -347,7 +346,6 @@ public class OlapTableSink extends DataSink {
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
         Multimap<Long, Long> allBePathsMap = HashMultimap.create();
-        int replicaNum = 0;
         for (Long partitionId : partitionIds) {
             Partition partition = table.getPartition(partitionId);
             int quorum = 
table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum()
 / 2 + 1;
@@ -377,7 +375,6 @@ public class OlapTableSink extends DataSink {
                                 Lists.newArrayList(bePathsMap.keySet())));
                     }
                     allBePathsMap.putAll(bePathsMap);
-                    replicaNum += bePathsMap.size();
                 }
             }
         }
@@ -388,14 +385,6 @@ public class OlapTableSink extends DataSink {
         if (!st.ok()) {
             throw new DdlException(st.getErrorMsg());
         }
-        long dbId = tDataSink.getOlapTableSink().getDbId();
-        long txnId = tDataSink.getOlapTableSink().getTxnId();
-        try {
-            DatabaseTransactionMgr mgr = 
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
-            mgr.registerTxnReplicas(txnId, replicaNum);
-        } catch (Exception e) {
-            LOG.error("register txn replica failed, txnId={}, dbId={}", txnId, 
dbId);
-        }
         return Arrays.asList(locationParam, slaveLocationParam);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 8a24e4361d..9c5c2d86a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -129,7 +129,6 @@ public class DatabaseTransactionMgr {
 
     // count the number of running txns of database, except for the routine 
load txn
     private volatile int runningTxnNums = 0;
-    private volatile int runningTxnReplicaNums = 0;
 
     // count only the number of running routine load txns of database
     private volatile int runningRoutineLoadTxnNums = 0;
@@ -1115,20 +1114,6 @@ public class DatabaseTransactionMgr {
         updateTxnLabels(transactionState);
     }
 
-    public void registerTxnReplicas(long txnId, int replicaNum) throws 
UserException {
-        writeLock();
-        try {
-            TransactionState transactionState = 
idToRunningTransactionState.get(txnId);
-            if (transactionState == null) {
-                throw new UserException("running transaction not found, 
txnId=" + txnId);
-            }
-            transactionState.setReplicaNum(replicaNum);
-            runningTxnReplicaNums += replicaNum;
-        } finally {
-            writeUnlock();
-        }
-    }
-
     public int getRunningTxnNum() {
         readLock();
         try {
@@ -1138,21 +1123,8 @@ public class DatabaseTransactionMgr {
         }
     }
 
-    public int getRunningTxnReplicaNum() {
-        readLock();
-        try {
-            return runningTxnReplicaNums;
-        } finally {
-            readUnlock();
-        }
-    }
-
     private void updateTxnLabels(TransactionState transactionState) {
-        Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
-        if (txnIds == null) {
-            txnIds = Sets.newHashSet();
-            labelToTxnIds.put(transactionState.getLabel(), txnIds);
-        }
+        Set<Long> txnIds = 
labelToTxnIds.computeIfAbsent(transactionState.getLabel(), k -> 
Sets.newHashSet());
         txnIds.add(transactionState.getTransactionId());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index cb647ad1ef..f2bb98925b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -688,18 +688,13 @@ public class GlobalTransactionMgr implements Writable {
     }
 
     public long getAllRunningTxnNum() {
-        return updateTxnMetric(databaseTransactionMgr -> 
Long.valueOf(databaseTransactionMgr.getRunningTxnNum()),
+        return updateTxnMetric(databaseTransactionMgr -> (long) 
databaseTransactionMgr.getRunningTxnNum(),
                 MetricRepo.DB_GAUGE_TXN_NUM);
     }
 
-    public long getAllRunningTxnReplicaNum() {
-        return updateTxnMetric(databaseTransactionMgr -> 
Long.valueOf(databaseTransactionMgr.getRunningTxnReplicaNum()),
-                MetricRepo.DB_GAUGE_TXN_REPLICA_NUM);
-    }
-
     public long getAllPublishTxnNum() {
         return updateTxnMetric(
-                databaseTransactionMgr -> 
Long.valueOf(databaseTransactionMgr.getCommittedTxnList().size()),
+                databaseTransactionMgr -> (long) 
databaseTransactionMgr.getCommittedTxnList().size(),
                 MetricRepo.DB_GAUGE_PUBLISH_TXN_NUM);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 7be1682f0a..b43d9cf1b8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -498,14 +498,6 @@ public class TransactionState implements Writable {
         return tableIdList;
     }
 
-    public int getReplicaNum() {
-        return replicaNum;
-    }
-
-    public void setReplicaNum(int replicaNum) {
-        this.replicaNum = replicaNum;
-    }
-
     public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
         return idToTableCommitInfos;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to