This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d5514fa07e6 [enhancement](cloud) add CloudTabletStatMgr to capture
stats in cloud mode (#31818)
d5514fa07e6 is described below
commit d5514fa07e6a240e8e611569412bf7c690e16c67
Author: zhengyu <[email protected]>
AuthorDate: Wed Mar 6 21:57:51 2024 +0800
[enhancement](cloud) add CloudTabletStatMgr to capture stats in cloud mode
(#31818)
Signed-off-by: freemandealer <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 3 +
.../apache/doris/catalog/CloudTabletStatMgr.java | 273 +++++++++++++++++++++
.../java/org/apache/doris/catalog/Database.java | 5 +
.../main/java/org/apache/doris/catalog/Env.java | 9 +-
.../java/org/apache/doris/catalog/Replica.java | 17 ++
5 files changed, 306 insertions(+), 1 deletion(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 522b8ae51bc..aa935200e25 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2579,6 +2579,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int cloud_cold_read_percent = 10; // 10%
+ @ConfField(mutable = true)
+ public static int get_tablet_stat_batch_size = 1000;
+
// The original meta read lock is not enough to keep a snapshot of
partition versions,
// so the execution of `createScanRangeLocations` are delayed to
`Coordinator::exec`,
// to help to acquire a snapshot of partition versions.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
new file mode 100644
index 00000000000..136c9264a4e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
+import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
+import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
+import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.rpc.RpcException;
+
+import lombok.Getter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
+
+/*
+ * CloudTabletStatMgr is for collecting tablet(replica) statistics from
backends.
+ * Each FE will collect by itself.
+ */
+public class CloudTabletStatMgr extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(CloudTabletStatMgr.class);
+
+ private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
+ // <(dbId, tableId) -> CloudTableStats>
+ private ConcurrentHashMap<Pair<Long, Long>, CloudTableStats>
cloudTableStatsMap = new ConcurrentHashMap<>();
+
+ public CloudTabletStatMgr() {
+ super("cloud tablet stat mgr",
Config.tablet_stat_update_interval_second * 1000);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ LOG.info("cloud tablet stat begin");
+ long start = System.currentTimeMillis();
+
+ List<GetTabletStatsRequest> reqList = new
ArrayList<GetTabletStatsRequest>();
+ GetTabletStatsRequest.Builder builder =
GetTabletStatsRequest.newBuilder();
+ List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
+ for (Long dbId : dbIds) {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+
+ List<Table> tableList = db.getTables();
+ for (Table table : tableList) {
+ if (table.getType() != TableType.OLAP) {
+ continue;
+ }
+
+ table.readLock();
+ try {
+ OlapTable tbl = (OlapTable) table;
+ for (Partition partition : tbl.getAllPartitions()) {
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ for (Long tabletId : index.getTabletIdsInOrder()) {
+ Tablet tablet = index.getTablet(tabletId);
+ TabletIndexPB.Builder tabletBuilder =
TabletIndexPB.newBuilder();
+ tabletBuilder.setDbId(dbId);
+ tabletBuilder.setTableId(table.getId());
+ tabletBuilder.setIndexId(index.getId());
+
tabletBuilder.setPartitionId(partition.getId());
+ tabletBuilder.setTabletId(tablet.getId());
+ builder.addTabletIdx(tabletBuilder);
+
+ if (builder.getTabletIdxCount() >=
Config.get_tablet_stat_batch_size) {
+ reqList.add(builder.build());
+ builder =
GetTabletStatsRequest.newBuilder();
+ }
+ }
+ }
+ } // partitions
+ } finally {
+ table.readUnlock();
+ }
+ } // tables
+ } // end for dbs
+
+ if (builder.getTabletIdxCount() > 0) {
+ reqList.add(builder.build());
+ }
+
+ for (GetTabletStatsRequest req : reqList) {
+ GetTabletStatsResponse resp;
+ try {
+ resp = getTabletStats(req);
+ } catch (RpcException e) {
+ LOG.info("get tablet stats exception:", e);
+ continue;
+ }
+
+ if (resp.getStatus().getCode() != MetaServiceCode.OK) {
+ continue;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ int i = 0;
+ for (TabletIndexPB idx : req.getTabletIdxList()) {
+ LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id:
{} size: {}",
+ idx.getDbId(), idx.getTableId(), idx.getIndexId(),
idx.getTabletId(),
+ resp.getTabletStats(i++).getDataSize());
+ }
+ }
+ updateTabletStat(resp);
+ }
+
+ LOG.info("finished to get tablet stat of all backends. cost: {} ms",
+ (System.currentTimeMillis() - start));
+
+ // after update replica in all backends, update index row num
+ start = System.currentTimeMillis();
+ ConcurrentHashMap<Pair<Long, Long>, CloudTableStats>
newCloudTableStatsMap = new ConcurrentHashMap<>();
+ for (Long dbId : dbIds) {
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+
+ List<Table> tableList = db.getTables();
+ for (Table table : tableList) {
+ if (table.getType() != TableType.OLAP) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) table;
+
+ String dbName = db.getName();
+ Long tableId = table.getId();
+ String tableName = table.getName();
+
+ Long tableDataSize = 0L;
+ Long tableRowsetCount = 0L;
+ Long tableSegmentCount = 0L;
+ Long tableRowCount = 0L;
+
+ if (!table.writeLockIfExist()) {
+ continue;
+ }
+
+ try {
+ for (Partition partition : olapTable.getAllPartitions()) {
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ long indexRowCount = 0L;
+ for (Tablet tablet : index.getTablets()) {
+ long tabletDataSize = 0L;
+ long tabletRowsetCount = 0L;
+ long tabletSegmentCount = 0L;
+ long tabletRowCount = 0L;
+
+ for (Replica replica : tablet.getReplicas()) {
+ if (replica.getDataSize() >
tabletDataSize) {
+ tabletDataSize = replica.getDataSize();
+ }
+
+ if (replica.getRowsetCount() >
tabletRowsetCount) {
+ tabletRowsetCount =
replica.getRowsetCount();
+ }
+
+ if (replica.getSegmentCount() >
tabletSegmentCount) {
+ tabletSegmentCount =
replica.getSegmentCount();
+ }
+
+ if (replica.getRowCount() >
tabletRowCount) {
+ tabletRowCount = replica.getRowCount();
+ }
+ }
+
+ tableDataSize += tabletDataSize;
+ tableRowsetCount += tabletRowsetCount;
+ tableSegmentCount += tabletSegmentCount;
+ tableRowCount += tabletRowCount;
+
+ indexRowCount += tabletRowCount;
+ } // end for tablets
+ index.setRowCount(indexRowCount);
+ } // end for indices
+ } // end for partitions
+ LOG.debug("finished to set row num for table: {} in
database: {}",
+ table.getName(), db.getFullName());
+ } finally {
+ table.writeUnlock();
+ }
+
+ newCloudTableStatsMap.put(Pair.of(dbId, tableId), new
CloudTableStats(dbName, tableName,
+ tableDataSize, tableRowsetCount, tableSegmentCount,
tableRowCount));
+ }
+ }
+ this.cloudTableStatsMap = newCloudTableStatsMap;
+ LOG.info("finished to update index row num of all databases. cost: {}
ms",
+ (System.currentTimeMillis() - start));
+ }
+
+ private void updateTabletStat(GetTabletStatsResponse response) {
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ for (TabletStatsPB stat : response.getTabletStatsList()) {
+ if (invertedIndex.getTabletMeta(stat.getIdx().getTabletId()) !=
null) {
+ List<Replica> replicas =
invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
+ if (replicas != null && !replicas.isEmpty() && replicas.get(0)
!= null) {
+ replicas.get(0).updateCloudStat(stat.getDataSize(),
stat.getNumRowsets(),
+ stat.getNumSegments(), stat.getNumRows());
+ }
+ }
+ }
+ }
+
+ private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest
request)
+ throws RpcException {
+ GetTabletStatsResponse response;
+ try {
+ response = MetaServiceProxy.getInstance().getTabletStats(request);
+ } catch (RpcException e) {
+ LOG.info("get tablet stat get exception:", e);
+ throw e;
+ }
+ return response;
+ }
+
+ public ConcurrentHashMap<Pair<Long, Long>, CloudTableStats>
getCloudTableStatsMap() {
+ return this.cloudTableStatsMap;
+ }
+
+ public static class CloudTableStats {
+ @Getter
+ private String dbName;
+ @Getter
+ private String tableName;
+
+ @Getter
+ private Long tableDataSize;
+ @Getter
+ private Long tableRowsetCount;
+ @Getter
+ private Long tableSegmentCount;
+ @Getter
+ private Long tableRowCount;
+
+ public CloudTableStats(String dbName, String tableName, Long
tableDataSize, Long tableRowsetCount,
+ Long tableSegmentCount, Long tableRowCount) {
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.tableDataSize = tableDataSize;
+ this.tableRowsetCount = tableRowsetCount;
+ this.tableSegmentCount = tableSegmentCount;
+ this.tableRowCount = tableRowCount;
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index fd709963a7b..d1f33b2b10c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -213,6 +213,11 @@ public class Database extends MetaObject implements
Writable, DatabaseIf<Table>
return fullQualifiedName;
}
+ public String getName() {
+ String[] strs = fullQualifiedName.split(":");
+ return strs.length == 2 ? strs[1] : strs[0];
+ }
+
public void setNameWithLock(String newName) {
writeLock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 0e55139bc51..28abfefd2e3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -455,6 +455,8 @@ public class Env {
private TabletStatMgr tabletStatMgr;
+ private CloudTabletStatMgr cloudTabletStatMgr;
+
private Auth auth;
private AccessControllerManager accessManager;
@@ -704,6 +706,7 @@ public class Env {
this.globalTransactionMgr =
EnvFactory.getInstance().createGlobalTransactionMgr(this);
this.tabletStatMgr = new TabletStatMgr();
+ this.cloudTabletStatMgr = new CloudTabletStatMgr();
this.auth = new Auth();
this.accessManager = new AccessControllerManager(auth);
@@ -1720,7 +1723,11 @@ public class Env {
private void startNonMasterDaemonThreads() {
// start load manager thread
loadManager.start();
- tabletStatMgr.start();
+ if (Config.isNotCloudMode()) {
+ tabletStatMgr.start();
+ } else {
+ cloudTabletStatMgr.start();
+ }
// load and export job label cleaner thread
labelCleaner.start();
// es repository
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 762ce95839c..f39b38a83c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -172,6 +172,8 @@ public class Replica implements Writable {
*/
private long preWatermarkTxnId = -1;
private long postWatermarkTxnId = -1;
+ private long segmentCount = 0L;
+ private long rowsetCount = 0L;
private long userDropTime = -1;
@@ -258,6 +260,14 @@ public class Replica implements Writable {
return rowCount;
}
+ public long getSegmentCount() {
+ return segmentCount;
+ }
+
+ public long getRowsetCount() {
+ return rowsetCount;
+ }
+
public long getLastFailedVersion() {
return lastFailedVersion;
}
@@ -351,6 +361,13 @@ public class Replica implements Writable {
this.versionCount = versionCount;
}
+ public synchronized void updateCloudStat(long dataSize, long rowsetNum,
long segmentNum, long rowNum) {
+ this.dataSize = dataSize;
+ this.rowsetCount = rowsetNum;
+ this.segmentCount = segmentNum;
+ this.rowCount = rowNum;
+ }
+
public synchronized void updateVersionInfo(long newVersion, long
newDataSize, long newRemoteDataSize,
long newRowCount) {
updateReplicaInfo(newVersion, this.lastFailedVersion,
this.lastSuccessVersion, newDataSize, newRemoteDataSize,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]