This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 970995b98a484a3e953617a10dc193301315e6ee Author: caiconghui <[email protected]> AuthorDate: Fri Mar 17 11:42:40 2023 +0800 [fix](metric) Fix bug for that register txn replica failed (#17855) --- .../doris/load/routineload/KafkaTaskInfo.java | 9 ------- .../java/org/apache/doris/metric/MetricRepo.java | 13 ---------- .../org/apache/doris/planner/OlapTableSink.java | 11 -------- .../doris/transaction/DatabaseTransactionMgr.java | 30 +--------------------- .../doris/transaction/GlobalTransactionMgr.java | 9 ++----- .../apache/doris/transaction/TransactionState.java | 8 ------ 6 files changed, 3 insertions(+), 77 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 9b9efbb5a4..fa4be7855a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -20,7 +20,6 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -32,10 +31,7 @@ import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; -import com.google.common.collect.Lists; import com.google.gson.Gson; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -43,16 +39,11 @@ import java.util.Map; import java.util.UUID; public class KafkaTaskInfo extends RoutineLoadTaskInfo { - private static final Logger LOG = LogManager.getLogger(KafkaTaskInfo.class); - private RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager(); // <partitionId, offset to be consumed> private Map<Integer, Long> partitionIdToOffset; - // Last fetched and cached latest partition offsets. - private List<Pair<Integer, Long>> cachedPartitionWithLatestOffsets = Lists.newArrayList(); - public KafkaTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs, Map<Integer, Long> partitionIdToOffset) { super(id, jobId, clusterName, timeoutMs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 85c4479fbc..f38bc84e20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -106,7 +106,6 @@ public final class MetricRepo { public static Histogram HISTO_TXN_PUBLISH_LATENCY; public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM; public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_PUBLISH_TXN_NUM; - public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_REPLICA_NUM; public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS; public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES; @@ -431,18 +430,6 @@ public final class MetricRepo { DORIS_METRIC_REGISTER.addMetrics(publishTxnNum); DB_GAUGE_PUBLISH_TXN_NUM = addLabeledMetrics("db", () -> new GaugeMetricImpl<>("publish_txn_num", MetricUnit.NOUNIT, "number of publish transactions")); - - GaugeMetric<Long> txnReplicaNum = new GaugeMetric<Long>("txn_replica_num", MetricUnit.NOUNIT, - "number of writing tablets in all running transactions") { - @Override - public Long getValue() { - return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnReplicaNum(); - } - }; - DORIS_METRIC_REGISTER.addMetrics(txnReplicaNum); - DB_GAUGE_TXN_REPLICA_NUM = addLabeledMetrics("db", () -> new GaugeMetricImpl<>("txn_replica_num", - MetricUnit.NOUNIT, "number of writing tablets in all running transactions")); - COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", MetricUnit.ROWS, "total rows of routine load"); DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_ROWS); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 422d7e2ddd..d9f7fd7158 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -62,7 +62,6 @@ import org.apache.doris.thrift.TPaloNodesInfo; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.transaction.DatabaseTransactionMgr; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -347,7 +346,6 @@ public class OlapTableSink extends DataSink { TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash Multimap<Long, Long> allBePathsMap = HashMultimap.create(); - int replicaNum = 0; for (Long partitionId : partitionIds) { Partition partition = table.getPartition(partitionId); int quorum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1; @@ -377,7 +375,6 @@ public class OlapTableSink extends DataSink { Lists.newArrayList(bePathsMap.keySet()))); } allBePathsMap.putAll(bePathsMap); - replicaNum += bePathsMap.size(); } } } @@ -388,14 +385,6 @@ public class OlapTableSink extends DataSink { if (!st.ok()) { throw new DdlException(st.getErrorMsg()); } - long dbId = tDataSink.getOlapTableSink().getDbId(); - long txnId = tDataSink.getOlapTableSink().getTxnId(); - try { - DatabaseTransactionMgr mgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId); - mgr.registerTxnReplicas(txnId, replicaNum); - } catch (Exception e) { - LOG.error("register txn replica failed, txnId={}, dbId={}", txnId, dbId); - } return Arrays.asList(locationParam, slaveLocationParam); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 8a24e4361d..9c5c2d86a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -129,7 +129,6 @@ public class DatabaseTransactionMgr { // count the number of running txns of database, except for the routine load txn private volatile int runningTxnNums = 0; - private volatile int runningTxnReplicaNums = 0; // count only the number of running routine load txns of database private volatile int runningRoutineLoadTxnNums = 0; @@ -1115,20 +1114,6 @@ public class DatabaseTransactionMgr { updateTxnLabels(transactionState); } - public void registerTxnReplicas(long txnId, int replicaNum) throws UserException { - writeLock(); - try { - TransactionState transactionState = idToRunningTransactionState.get(txnId); - if (transactionState == null) { - throw new UserException("running transaction not found, txnId=" + txnId); - } - transactionState.setReplicaNum(replicaNum); - runningTxnReplicaNums += replicaNum; - } finally { - writeUnlock(); - } - } - public int getRunningTxnNum() { readLock(); try { @@ -1138,21 +1123,8 @@ public class DatabaseTransactionMgr { } } - public int getRunningTxnReplicaNum() { - readLock(); - try { - return runningTxnReplicaNums; - } finally { - readUnlock(); - } - } - private void updateTxnLabels(TransactionState transactionState) { - Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel()); - if (txnIds == null) { - txnIds = Sets.newHashSet(); - labelToTxnIds.put(transactionState.getLabel(), txnIds); - } + Set<Long> txnIds = labelToTxnIds.computeIfAbsent(transactionState.getLabel(), k -> Sets.newHashSet()); txnIds.add(transactionState.getTransactionId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index cb647ad1ef..f2bb98925b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -688,18 +688,13 @@ public class GlobalTransactionMgr implements Writable { } public long getAllRunningTxnNum() { - return updateTxnMetric(databaseTransactionMgr -> Long.valueOf(databaseTransactionMgr.getRunningTxnNum()), + return updateTxnMetric(databaseTransactionMgr -> (long) databaseTransactionMgr.getRunningTxnNum(), MetricRepo.DB_GAUGE_TXN_NUM); } - public long getAllRunningTxnReplicaNum() { - return updateTxnMetric(databaseTransactionMgr -> Long.valueOf(databaseTransactionMgr.getRunningTxnReplicaNum()), - MetricRepo.DB_GAUGE_TXN_REPLICA_NUM); - } - public long getAllPublishTxnNum() { return updateTxnMetric( - databaseTransactionMgr -> Long.valueOf(databaseTransactionMgr.getCommittedTxnList().size()), + databaseTransactionMgr -> (long) databaseTransactionMgr.getCommittedTxnList().size(), MetricRepo.DB_GAUGE_PUBLISH_TXN_NUM); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 7be1682f0a..b43d9cf1b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -498,14 +498,6 @@ public class TransactionState implements Writable { return tableIdList; } - public int getReplicaNum() { - return replicaNum; - } - - public void setReplicaNum(int replicaNum) { - this.replicaNum = replicaNum; - } - public Map<Long, TableCommitInfo> getIdToTableCommitInfos() { return idToTableCommitInfos; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
