This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf576b15e4 [improve](cloud) cloud reduce get_tablet_stats rpc to
meta_service (#60543)
0bf576b15e4 is described below
commit 0bf576b15e4d931a7dc5e334a22f03b7347bad13
Author: meiyi <[email protected]>
AuthorDate: Wed Mar 18 20:36:22 2026 +0800
[improve](cloud) cloud reduce get_tablet_stats rpc to meta_service (#60543)
---
be/src/cloud/cloud_meta_mgr.cpp | 25 +-
.../main/java/org/apache/doris/common/Config.java | 9 +-
.../org/apache/doris/alter/CloudRollupJobV2.java | 11 +
.../apache/doris/alter/CloudSchemaChangeJobV2.java | 10 +
.../apache/doris/catalog/CloudTabletStatMgr.java | 280 +++++++++++++++++++--
.../apache/doris/cloud/catalog/CloudReplica.java | 18 +-
.../transaction/CloudGlobalTransactionMgr.java | 31 ++-
.../java/org/apache/doris/common/ClientPool.java | 3 +
.../apache/doris/common/proc/TabletsProcDir.java | 17 ++
.../java/org/apache/doris/qe/SessionVariable.java | 3 +
.../apache/doris/service/FrontendServiceImpl.java | 56 ++++-
.../doris/transaction/GlobalTransactionMgr.java | 3 +-
.../transaction/GlobalTransactionMgrIface.java | 3 +-
gensrc/thrift/FrontendService.thrift | 8 +
14 files changed, 427 insertions(+), 50 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 04a50c75652..ef40c94048b 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1419,15 +1419,18 @@ Status CloudMetaMgr::update_tmp_rowset(const
RowsetMeta& rs_meta) {
// async send TableStats(in res) to FE coz we are in streamload ctx, response
to the user ASAP
static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
- const std::string& label,
CommitTxnResponse& res) {
+ const std::string& label,
CommitTxnResponse& res,
+ const std::vector<int64_t>& tablet_ids) {
std::string protobufBytes;
- res.SerializeToString(&protobufBytes);
+ if (txn_id != -1) {
+ res.SerializeToString(&protobufBytes);
+ }
auto st =
ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
- [db_id, txn_id, label, protobufBytes]() -> Status {
+ [db_id, txn_id, label, protobufBytes, tablet_ids]() -> Status {
TReportCommitTxnResultRequest request;
TStatus result;
- if (protobufBytes.length() <= 0) {
+ if (txn_id != -1 && protobufBytes.length() <= 0) {
LOG(WARNING) << "protobufBytes: " <<
protobufBytes.length();
return Status::OK(); // nobody cares the return status
}
@@ -1436,6 +1439,7 @@ static void send_stats_to_fe_async(const int64_t db_id,
const int64_t txn_id,
request.__set_txnId(txn_id);
request.__set_label(label);
request.__set_payload(protobufBytes);
+ request.__set_tabletIds(tablet_ids);
Status status;
int64_t duration_ns = 0;
@@ -1489,7 +1493,11 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext&
ctx, bool is_2pc) {
auto st = retry_rpc("commit txn", req, &res,
&MetaService_Stub::commit_txn);
if (st.ok()) {
- send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res);
+ std::vector<int64_t> tablet_ids;
+ for (auto& commit_info : ctx.commit_infos) {
+ tablet_ids.emplace_back(commit_info.tabletId);
+ }
+ send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res,
tablet_ids);
}
return st;
@@ -1648,6 +1656,13 @@ Status CloudMetaMgr::commit_tablet_job(const
TabletJobInfoPB& job, FinishTabletJ
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"txn conflict when commit tablet job {}",
job.ShortDebugString());
}
+
+ if (st.ok() && !job.compaction().empty() && job.has_idx()) {
+ CommitTxnResponse commit_txn_resp;
+ std::vector<int64_t> tablet_ids = {job.idx().tablet_id()};
+ send_stats_to_fe_async(-1, -1, "", commit_txn_resp, tablet_ids);
+ }
+
return st;
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ed5539d9388..f9be5d87559 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3276,7 +3276,6 @@ public class Config extends ConfigBase {
+ "to other BEs in cloud mode."})
public static int rehash_tablet_after_be_dead_seconds = 3600;
-
@ConfField(mutable = true, description = {
"Whether to enable the automatic start-stop feature in cloud
model, default is true."})
public static boolean enable_auto_start_for_cloud_cluster = true;
@@ -3286,6 +3285,14 @@ public class Config extends ConfigBase {
+ "model is set to 300 times, which is approximately 5
minutes by default."})
public static int auto_start_wait_to_resume_times = 300;
+ @ConfField(description = {
+ "Maximal concurrent num of master FE sync tablet stats task to
observers and followers in cloud mode."})
+ public static int cloud_sync_tablet_stats_task_threads_num = 4;
+
+ @ConfField(mutable = true, description = {"Version of getting tablet stats
in cloud mode. "
+ + "Version 1: get all tablets; Version 2: get active and interval
expired tablets"})
+ public static int cloud_get_tablet_stats_version = 2;
+
@ConfField(description = {"Maximum concurrent number of get tablet stat
jobs."})
public static int max_get_tablet_stat_task_threads_num = 4;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 8c6a306aedb..e3192f5e491 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -17,6 +17,7 @@
package org.apache.doris.alter;
+import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -54,6 +55,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class CloudRollupJobV2 extends RollupJobV2 {
private static final Logger LOG =
LogManager.getLogger(CloudRollupJobV2.class);
@@ -119,6 +121,15 @@ public class CloudRollupJobV2 extends RollupJobV2 {
LOG.info("onCreateRollupReplicaDone finished, dbId:{}, tableId:{},
jobId:{}, rollupIndexList:{}",
dbId, tableId, jobId, rollupIndexList);
+
+ List<Long> tabletIds = partitionIdToRollupIndex.values().stream()
+ .flatMap(rollupIndex ->
rollupIndex.getTablets().stream()).map(Tablet::getId)
+ .collect(Collectors.toList());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("force sync tablet stats for table: {}, index: {},
tabletNum: {}, tabletIds: {}", tableId,
+ rollupIndexId, tabletIds.size(), tabletIds);
+ }
+ CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 9387a93aaa9..cd1b90fb923 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -17,6 +17,7 @@
package org.apache.doris.alter;
+import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -91,6 +92,15 @@ public class CloudSchemaChangeJobV2 extends
SchemaChangeJobV2 {
}
LOG.info("commitShadowIndex finished, dbId:{}, tableId:{}, jobId:{},
shadowIdxList:{}",
dbId, tableId, jobId, shadowIdxList);
+
+ List<Long> tabletIds = partitionIndexMap.cellSet().stream()
+ .flatMap(cell ->
cell.getValue().getTablets().stream().map(Tablet::getId))
+ .collect(Collectors.toList());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("force sync tablet stats for table: {}, tabletNum: {},
tabletIds: {}", tableId,
+ tabletIds.size(), tabletIds);
+ }
+ CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index fd500fac1a0..d6f0f9516a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -18,34 +18,56 @@
package org.apache.doris.catalog;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
-
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TSyncCloudTabletStatsRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/*
* CloudTabletStatMgr is for collecting tablet(replica) statistics from
backends.
- * Each FE will collect by itself.
+ * Config.cloud_get_tablet_stats_version:
+ * 1: Each FE will collect by itself.
+ * 2: Master FE collects active tablet stats and pushes to followers and
observers,
+ * and each FE will collect tablet stats by interval ladder.
*/
public class CloudTabletStatMgr extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(CloudTabletStatMgr.class);
@@ -55,7 +77,30 @@ public class CloudTabletStatMgr extends MasterDaemon {
private volatile List<OlapTable.Statistics> cloudTableStatsList = new
ArrayList<>();
private static final ExecutorService GET_TABLET_STATS_THREAD_POOL =
Executors.newFixedThreadPool(
- Config.max_get_tablet_stat_task_threads_num);
+ Config.max_get_tablet_stat_task_threads_num,
+ new
ThreadFactoryBuilder().setNameFormat("get-tablet-stats-%d").setDaemon(true).build());
+ // Master: send tablet stats to followers and observers
+ // Follower and observer: receive tablet stats from master
+ private static final ExecutorService SYNC_TABLET_STATS_THREAD_POOL =
Executors.newFixedThreadPool(
+ Config.cloud_sync_tablet_stats_task_threads_num,
+ new
ThreadFactoryBuilder().setNameFormat("sync-tablet-stats-%d").setDaemon(true).build());
+ private Set<Long> activeTablets = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Interval ladder in milliseconds: 1m, 5m, 10m, 30m, 2h, 6h, 12h, 3d,
infinite.
+ * Tablets with changing stats stay at lower intervals; stable tablets
move to higher intervals.
+ */
+ private static final long[] DEFAULT_INTERVAL_LADDER_MS = {
+ TimeUnit.MINUTES.toMillis(1), // 1 minute
+ TimeUnit.MINUTES.toMillis(5), // 5 minutes
+ TimeUnit.MINUTES.toMillis(10), // 10 minutes
+ TimeUnit.MINUTES.toMillis(30), // 30 minutes
+ TimeUnit.HOURS.toMillis(2), // 2 hours
+ TimeUnit.HOURS.toMillis(6), // 6 hours
+ TimeUnit.HOURS.toMillis(12), // 12 hours
+ TimeUnit.DAYS.toMillis(3), // 3 days
+ Long.MAX_VALUE // infinite (never auto-fetch)
+ };
public CloudTabletStatMgr() {
super("cloud tablet stat mgr",
Config.tablet_stat_update_interval_second * 1000);
@@ -63,12 +108,50 @@ public class CloudTabletStatMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- LOG.info("cloud tablet stat begin");
- List<Long> dbIds = getAllTabletStats();
+ int version = Config.cloud_get_tablet_stats_version;
+ LOG.info("cloud tablet stat begin with version: {}", version);
+
+ // version1: get all tablet stats
+ if (version == 1) {
+ this.activeTablets.clear();
+ List<Long> dbIds = getAllTabletStats(null);
+ updateStatInfo(dbIds);
+ return;
+ }
+
+ // version2: get stats for active tablets
+ Set<Long> copiedTablets = new HashSet<>(activeTablets);
+ activeTablets.removeAll(copiedTablets);
+ getActiveTabletStats(copiedTablets);
+
+ // get stats by interval
+ List<Long> dbIds = getAllTabletStats(cloudTablet -> {
+ if (copiedTablets.contains(cloudTablet.getId())) {
+ return false;
+ }
+ List<Replica> replicas =
Env.getCurrentInvertedIndex().getReplicas(cloudTablet.getId());
+ if (replicas == null || replicas.isEmpty()) {
+ return false;
+ }
+ CloudReplica cloudReplica = (CloudReplica) replicas.get(0);
+ int index = cloudReplica.getStatsIntervalIndex();
+ if (index >= DEFAULT_INTERVAL_LADDER_MS.length) {
+ LOG.warn("get tablet stats interval index out of range,
tabletId: {}, index: {}",
+ cloudTablet.getId(), index);
+ index = DEFAULT_INTERVAL_LADDER_MS.length - 1;
+ }
+ long interval = DEFAULT_INTERVAL_LADDER_MS[index];
+ if (interval == Long.MAX_VALUE
+ || System.currentTimeMillis() -
cloudReplica.getLastGetTabletStatsTime() < interval) {
+ return false;
+ }
+ return true;
+ });
updateStatInfo(dbIds);
}
- private List<Long> getAllTabletStats() {
+ private List<Long> getAllTabletStats(Function<CloudTablet, Boolean>
filter) {
+ long getStatsTabletNum = 0;
long start = System.currentTimeMillis();
List<Future<Void>> futures = new ArrayList<>();
GetTabletStatsRequest.Builder builder =
@@ -91,17 +174,21 @@ public class CloudTabletStatMgr extends MasterDaemon {
OlapTable tbl = (OlapTable) table;
for (Partition partition : tbl.getAllPartitions()) {
for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
- for (Long tabletId : index.getTabletIdsInOrder()) {
+ for (Tablet tablet : index.getTablets()) {
+ if (filter != null &&
!filter.apply((CloudTablet) tablet)) {
+ continue;
+ }
+ getStatsTabletNum++;
TabletIndexPB.Builder tabletBuilder =
TabletIndexPB.newBuilder();
tabletBuilder.setDbId(dbId);
tabletBuilder.setTableId(table.getId());
tabletBuilder.setIndexId(index.getId());
tabletBuilder.setPartitionId(partition.getId());
- tabletBuilder.setTabletId(tabletId);
+ tabletBuilder.setTabletId(tablet.getId());
builder.addTabletIdx(tabletBuilder);
if (builder.getTabletIdxCount() >=
Config.get_tablet_stat_batch_size) {
-
futures.add(submitGetTabletStatsTask(builder.build()));
+
futures.add(submitGetTabletStatsTask(builder.build(), filter == null));
builder =
GetTabletStatsRequest.newBuilder()
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
}
@@ -115,7 +202,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
} // end for dbs
if (builder.getTabletIdxCount() > 0) {
- futures.add(submitGetTabletStatsTask(builder.build()));
+ futures.add(submitGetTabletStatsTask(builder.build(), filter ==
null));
}
try {
@@ -126,16 +213,62 @@ public class CloudTabletStatMgr extends MasterDaemon {
LOG.error("Error waiting for get tablet stats tasks to complete",
e);
}
- LOG.info("finished to get tablet stat of all backends. cost: {} ms",
- (System.currentTimeMillis() - start));
+ LOG.info("finished to get tablet stats. getStatsTabletNum: {}, cost:
{} ms",
+ getStatsTabletNum, (System.currentTimeMillis() - start));
return dbIds;
}
- private Future<Void> submitGetTabletStatsTask(GetTabletStatsRequest req) {
+ private void getActiveTabletStats(Set<Long> tablets) {
+ List<Long> tabletIds = new ArrayList<>(tablets);
+ Collections.sort(tabletIds);
+ List<TabletMeta> tabletMetas =
Env.getCurrentInvertedIndex().getTabletMetaList(tabletIds);
+ long start = System.currentTimeMillis();
+ List<Future<Void>> futures = new ArrayList<>();
+ GetTabletStatsRequest.Builder builder =
+
GetTabletStatsRequest.newBuilder().setRequestIp(FrontendOptions.getLocalHostAddressCached());
+ long activeTabletNum = 0;
+ for (int i = 0; i < tabletIds.size(); i++) {
+ TabletIndexPB tabletIndexPB = getTabletIndexPB(tabletIds.get(i),
tabletMetas.get(i));
+ if (tabletIndexPB == null) {
+ continue;
+ }
+ activeTabletNum++;
+ builder.addTabletIdx(tabletIndexPB);
+ if (builder.getTabletIdxCount() >=
Config.get_tablet_stat_batch_size) {
+ futures.add(submitGetTabletStatsTask(builder.build(), true));
+ builder = GetTabletStatsRequest.newBuilder()
+
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
+ }
+ }
+ if (builder.getTabletIdxCount() > 0) {
+ futures.add(submitGetTabletStatsTask(builder.build(), true));
+ }
+
+ try {
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error waiting for get tablet stats tasks to complete",
e);
+ }
+ LOG.info("finished to get {} active tablets stats, cost {}ms",
activeTabletNum,
+ System.currentTimeMillis() - start);
+ }
+
+ private TabletIndexPB getTabletIndexPB(long tabletId, TabletMeta
tabletMeta) {
+ if (tabletMeta == null || tabletMeta ==
TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+ return null;
+ }
+ return
TabletIndexPB.newBuilder().setDbId(tabletMeta.getDbId()).setTableId(tabletMeta.getTableId())
+
.setIndexId(tabletMeta.getIndexId()).setPartitionId(tabletMeta.getPartitionId()).setTabletId(tabletId)
+ .build();
+ }
+
+ private Future<Void> submitGetTabletStatsTask(GetTabletStatsRequest req,
boolean activeUpdate) {
return GET_TABLET_STATS_THREAD_POOL.submit(() -> {
GetTabletStatsResponse resp;
try {
- resp = getTabletStats(req);
+ resp = getTabletStatsFromMs(req);
} catch (RpcException e) {
LOG.warn("get tablet stats exception:", e);
return null;
@@ -144,15 +277,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
LOG.warn("get tablet stats return failed.");
return null;
}
- if (LOG.isDebugEnabled()) {
- int i = 0;
- for (TabletIndexPB idx : req.getTabletIdxList()) {
- LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id:
{} size: {}",
- idx.getDbId(), idx.getTableId(), idx.getIndexId(),
- idx.getTabletId(),
resp.getTabletStats(i++).getDataSize());
- }
- }
- updateTabletStat(resp);
+ updateTabletStat(resp, activeUpdate);
return null;
});
}
@@ -286,7 +411,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
tableTotalLocalIndexSize,
tableTotalLocalSegmentSize, 0L, 0L);
olapTable.setStatistics(tableStats);
LOG.debug("finished to set row num for table: {} in
database: {}",
- table.getName(), db.getFullName());
+ table.getName(), db.getFullName());
} finally {
table.readUnlock();
}
@@ -338,7 +463,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
(System.currentTimeMillis() - start));
}
- private void updateTabletStat(GetTabletStatsResponse response) {
+ private void updateTabletStat(GetTabletStatsResponse response, boolean
activeUpdate) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (TabletStatsPB stat : response.getTabletStatsList()) {
List<Replica> replicas =
invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
@@ -346,16 +471,45 @@ public class CloudTabletStatMgr extends MasterDaemon {
continue;
}
Replica replica = replicas.get(0);
+ boolean statsChanged = replica.getDataSize() != stat.getDataSize()
+ || replica.getRowsetCount() != stat.getNumRowsets()
+ || replica.getSegmentCount() != stat.getNumSegments()
+ || replica.getRowCount() != stat.getNumRows()
+ || replica.getLocalInvertedIndexSize() !=
stat.getIndexSize()
+ || replica.getLocalSegmentSize() != stat.getSegmentSize();
replica.setDataSize(stat.getDataSize());
replica.setRowsetCount(stat.getNumRowsets());
replica.setSegmentCount(stat.getNumSegments());
replica.setRowCount(stat.getNumRows());
replica.setLocalInvertedIndexSize(stat.getIndexSize());
replica.setLocalSegmentSize(stat.getSegmentSize());
+
+ CloudReplica cloudReplica = (CloudReplica) replica;
+ cloudReplica.setLastGetTabletStatsTime(System.currentTimeMillis());
+ int statsIntervalIndex = cloudReplica.getStatsIntervalIndex();
+ if (activeUpdate || statsChanged) {
+ statsIntervalIndex = 0;
+ if (!activeUpdate && statsChanged && LOG.isDebugEnabled()) {
+ LOG.debug("tablet stats changed, reset interval index to
0, dbId: {}, tableId: {}, "
+ + "indexId: {}, partitionId: {}, tabletId:
{}, dataSize: {}, rowCount: {}, "
+ + "rowsetCount: {}, segmentCount: {},
indexSize: {}, segmentSize: {}. lastIdx: {}",
+ stat.getIdx().getDbId(),
stat.getIdx().getTableId(), stat.getIdx().getIndexId(),
+ stat.getIdx().getPartitionId(),
stat.getIdx().getTabletId(), stat.getDataSize(),
+ stat.getNumRows(), stat.getNumRowsets(),
stat.getNumSegments(), stat.getIndexSize(),
+ stat.getSegmentSize(),
cloudReplica.getStatsIntervalIndex());
+ }
+ } else {
+ statsIntervalIndex = Math.min(statsIntervalIndex + 1,
DEFAULT_INTERVAL_LADDER_MS.length - 1);
+ }
+ cloudReplica.setStatsIntervalIndex(statsIntervalIndex);
+ }
+ // push tablet stats to other fes
+ if (Config.cloud_get_tablet_stats_version == 2 && activeUpdate &&
Env.getCurrentEnv().isMaster()) {
+ pushTabletStats(response);
}
}
- private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest
request)
+ private GetTabletStatsResponse getTabletStatsFromMs(GetTabletStatsRequest
request)
throws RpcException {
GetTabletStatsResponse response;
try {
@@ -394,4 +548,78 @@ public class CloudTabletStatMgr extends MasterDaemon {
}
this.cloudTableStatsList = new ArrayList<>(topStats);
}
+
+ public void addActiveTablets(List<Long> tabletIds) {
+ if (Config.cloud_get_tablet_stats_version == 1 || tabletIds == null ||
tabletIds.isEmpty()) {
+ return;
+ }
+ activeTablets.addAll(tabletIds);
+ }
+
+ // master FE send update tablet stats rpc to other FEs
+ private void pushTabletStats(GetTabletStatsResponse response) {
+ List<Frontend> frontends = getFrontends();
+ if (frontends == null || frontends.isEmpty()) {
+ return;
+ }
+ TSyncCloudTabletStatsRequest request = new
TSyncCloudTabletStatsRequest();
+ request.setTabletStatsPb(ByteBuffer.wrap(response.toByteArray()));
+ for (Frontend fe : frontends) {
+ SYNC_TABLET_STATS_THREAD_POOL.submit(() -> {
+ try {
+ pushTabletStatsToFe(request, fe);
+ } catch (Exception e) {
+ LOG.warn("push tablet stats to frontend {}:{} error",
fe.getHost(), fe.getRpcPort(), e);
+ }
+ });
+ }
+ }
+
+ private void pushTabletStatsToFe(TSyncCloudTabletStatsRequest request,
Frontend fe) {
+ FrontendService.Client client = null;
+ TNetworkAddress addr = new TNetworkAddress(fe.getHost(),
fe.getRpcPort());
+ boolean ok = false;
+ try {
+ client = ClientPool.frontendStatsPool.borrowObject(addr);
+ TStatus status = client.syncCloudTabletStats(request);
+ ok = true;
+ if (status.getStatusCode() != TStatusCode.OK) {
+ LOG.warn("failed to push cloud tablet stats to frontend {}:{},
err: {}", fe.getHost(),
+ fe.getRpcPort(), status.getErrorMsgs());
+ }
+ } catch (Exception e) {
+ LOG.warn("failed to push update cloud tablet stats to frontend
{}:{}", fe.getHost(), fe.getRpcPort(), e);
+ } finally {
+ if (ok) {
+ ClientPool.frontendStatsPool.returnObject(addr, client);
+ } else {
+ ClientPool.frontendStatsPool.invalidateObject(addr, client);
+ }
+ }
+ }
+
+ // follower and observer FE receive sync tablet stats rpc from master FE
+ public void syncTabletStats(GetTabletStatsResponse response) {
+ if (Config.cloud_get_tablet_stats_version == 1 ||
response.getTabletStatsList().isEmpty()) {
+ return;
+ }
+ SYNC_TABLET_STATS_THREAD_POOL.submit(() -> {
+ updateTabletStat(response, true);
+ });
+ }
+
+ private List<Frontend> getFrontends() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return Collections.emptyList();
+ }
+ HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+ return Env.getCurrentEnv().getFrontends(null).stream()
+ .filter(fe -> fe.isAlive() &&
!(fe.getHost().equals(selfNode.getHost())
+ && fe.getEditLogPort() == selfNode.getPort())).collect(
+ Collectors.toList());
+ }
+
+ public static CloudTabletStatMgr getInstance() {
+ return (CloudTabletStatMgr) Env.getCurrentEnv().getTabletStatMgr();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index afd2fd2501f..3a9699e8f84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -36,6 +36,8 @@ import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -67,9 +69,23 @@ public class CloudReplica extends Replica implements
GsonPostProcessable {
private long indexId = -1;
@SerializedName(value = "idx")
private long idx = -1;
+ // last time to get tablet stats
+ @Getter
+ @Setter
+ long lastGetTabletStatsTime = 0;
+ /**
+ * The index of {@link
org.apache.doris.catalog.CloudTabletStatMgr#DEFAULT_INTERVAL_LADDER_MS} array.
+ * Used to control the interval of getting tablet stats.
+ * When get tablet stats:
+ * if the stats is unchanged, will update this index to next value to get
stats less frequently;
+ * if the stats is changed, will update this index to 0 to get stats more
frequently.
+ */
+ @Getter
+ @Setter
+ int statsIntervalIndex = 0;
private long segmentCount = 0L;
- private long rowsetCount = 0L;
+ private long rowsetCount = 1L; // [0-1] rowset
private static final Random rand = new Random();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 043128d02fd..a8baff54700 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -17,6 +17,7 @@
package org.apache.doris.cloud.transaction;
+import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@@ -147,6 +148,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -501,7 +503,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
* @param commitTxnResponse commit txn call response from meta-service
* @param tabletCommitInfos tablet commit infos containing backend and
tablet mapping
*/
- public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos) {
+ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos,
+ List<Long> tabletIds) {
// ========================================
// notify BEs to make temporary rowsets visible
// ========================================
@@ -542,6 +545,12 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
if (sb.length() > 0) {
LOG.info("notify partition first load. {}", sb);
}
+ // 4. notify update tablet stats
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("force sync tablet stats for txnId: {}, tabletNum: {},
tabletIds: {}", txnId,
+ tabletIds.size(), tabletIds);
+ }
+ CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
// ========================================
// produce event
@@ -733,12 +742,14 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
- executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC,
txnCommitAttachment, tabletCommitInfos);
+ executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC,
txnCommitAttachment, tabletCommitInfos,
+ tabletCommitInfos == null ? Collections.emptyList()
+ : tabletCommitInfos.stream().map(t ->
t.getTabletId()).collect(Collectors.toList()));
}
private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest,
long transactionId, boolean is2PC,
- TxnCommitAttachment txnCommitAttachment, List<TabletCommitInfo>
tabletCommitInfos)
- throws UserException {
+ TxnCommitAttachment txnCommitAttachment, List<TabletCommitInfo>
tabletCommitInfos, List<Long> tabletIds)
+ throws UserException {
if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
LOG.info("debug point FE.mow.commit.exception, throw e");
throw new UserException(InternalErrorCode.INTERNAL_ERR, "debug
point FE.mow.commit.exception");
@@ -761,7 +772,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
- txnState = commitTxn(commitTxnRequest, transactionId, is2PC,
tabletCommitInfos);
+ txnState = commitTxn(commitTxnRequest, transactionId, is2PC,
tabletCommitInfos, tabletIds);
txnOperated = true;
if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
{
throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -801,7 +812,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC,
- List<TabletCommitInfo> tabletCommitInfos) throws UserException {
+ List<TabletCommitInfo> tabletCommitInfos, List<Long> tabletIds)
throws UserException {
checkCommitInfo(commitTxnRequest);
CommitTxnResponse commitTxnResponse = null;
@@ -861,7 +872,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime()
- txnState.getPrepareTime());
}
- afterCommitTxnResp(commitTxnResponse, tabletCommitInfos);
+ afterCommitTxnResp(commitTxnResponse, tabletCommitInfos, tabletIds);
return txnState;
}
@@ -1638,6 +1649,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
builder.addMowTableIds(olapTable.getId());
}
// add sub txn infos
+ Set<Long> tabletIds = new HashSet<>();
for (SubTransactionState subTransactionState : subTransactionStates) {
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
.setTableId(subTransactionState.getTable().getId())
@@ -1647,10 +1659,13 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
.map(c -> new
TabletCommitInfo(c.getTabletId(), c.getBackendId()))
.collect(Collectors.toList())))
.build());
+ for (TTabletCommitInfo tabletCommitInfo :
subTransactionState.getTabletCommitInfos()) {
+ tabletIds.add(tabletCommitInfo.getTabletId());
+ }
}
final CommitTxnRequest commitTxnRequest = builder.build();
- executeCommitTxnRequest(commitTxnRequest, transactionId, false, null,
null);
+ executeCommitTxnRequest(commitTxnRequest, transactionId, false, null,
null, new ArrayList<>(tabletIds));
}
private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 2b769ae2e62..f10ad74f33b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -72,6 +72,9 @@ public class ClientPool {
public static GenericPool<FrontendService.Client> frontendPool =
new GenericPool("FrontendService", backendConfig,
Config.backend_rpc_timeout_ms,
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
+ public static GenericPool<FrontendService.Client> frontendStatsPool =
+ new GenericPool<>("FrontendService", heartbeatConfig,
heartbeatTimeoutMs,
+
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
public static GenericPool<BackendService.Client> backendPool =
new GenericPool("BackendService", backendConfig,
Config.backend_rpc_timeout_ms);
public static GenericPool<TPaloBrokerService.Client> brokerPool =
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index 50fb5e98610..34eff696fd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -17,6 +17,7 @@
package org.apache.doris.common.proc;
+import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
@@ -31,12 +32,15 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.query.QueryStatsUtil;
import org.apache.doris.system.Backend;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,6 +53,7 @@ import java.util.Map;
* show tablets' detail info within an index
*/
public class TabletsProcDir implements ProcDirInterface {
+ private static final Logger LOG =
LogManager.getLogger(TabletsProcDir.class);
public static final ImmutableList<String> TITLE_NAMES;
static {
@@ -90,6 +95,11 @@ public class TabletsProcDir implements ProcDirInterface {
pathHashToRoot.put(diskInfo.getPathHash(),
diskInfo.getRootPath());
}
}
+ List<Long> tabletIds = null;
+ if (Config.isCloudMode() && ConnectContext.get() != null &&
ConnectContext.get()
+ .getSessionVariable().cloudForceSyncTabletStats) {
+ tabletIds = new ArrayList<>();
+ }
table.readLock();
try {
Map<Long, Long> replicaIdToQueryHits = new HashMap<>();
@@ -105,6 +115,9 @@ public class TabletsProcDir implements ProcDirInterface {
// get infos
for (Tablet tablet : index.getTablets()) {
+ if (tabletIds != null) {
+ tabletIds.add(tablet.getId());
+ }
long tabletId = tablet.getId();
if (tablet.getReplicas().size() == 0) {
List<Comparable> tabletInfo = new ArrayList<Comparable>();
@@ -208,6 +221,10 @@ public class TabletsProcDir implements ProcDirInterface {
} finally {
table.readUnlock();
}
+ if (tabletIds != null && !tabletIds.isEmpty()) {
+ LOG.info("force sync tablet stats for table: {}, tabletNum: {}",
table, tabletIds.size());
+ CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
+ }
return tabletInfos;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 67e45b7d146..576b3a392be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -822,6 +822,7 @@ public class SessionVariable implements Serializable,
Writable {
"cloud_partition_version_cache_ttl_ms";
public static final String CLOUD_TABLE_VERSION_CACHE_TTL_MS =
"cloud_table_version_cache_ttl_ms";
+ public static final String CLOUD_FORCE_SYNC_TABLET_STATS =
"cloud_force_sync_tablet_stats";
// CLOUD_VARIABLES_BEGIN
public static final String ENABLE_MATCH_WITHOUT_INVERTED_INDEX =
"enable_match_without_inverted_index";
@@ -3119,6 +3120,8 @@ public class SessionVariable implements Serializable,
Writable {
public String cloudCluster = "";
@VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
public boolean disableEmptyPartitionPrune = false;
+ @VariableMgr.VarAttr(name = CLOUD_FORCE_SYNC_TABLET_STATS)
+ public boolean cloudForceSyncTabletStats = false;
@VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
public long cloudPartitionVersionCacheTtlMs = Long.MAX_VALUE;
@VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 56040809b5b..f074711854c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.doris.backup.BackupMeta;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
+import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
@@ -48,6 +49,7 @@ import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
@@ -285,6 +287,7 @@ import
org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TSubTxnInfo;
+import org.apache.doris.thrift.TSyncCloudTabletStatsRequest;
import org.apache.doris.thrift.TSyncQueryColumns;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
@@ -5089,16 +5092,28 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return new TStatus(TStatusCode.NOT_MASTER);
}
- LOG.info("receive load stats report request: {}, backend: {}, dbId:
{}, txnId: {}, label: {}",
- request, clientAddr, request.getDbId(), request.getTxnId(),
request.getLabel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive load stats report from backend: {}, dbId: {},
txnId: {}, label: {}, tabletIds: {}",
+ clientAddr, request.getDbId(), request.getTxnId(),
request.getLabel(), request.getTabletIds());
+ }
try {
- byte[] receivedProtobufBytes = request.getPayload();
- if (receivedProtobufBytes == null || receivedProtobufBytes.length
<= 0) {
- return new TStatus(TStatusCode.INVALID_ARGUMENT);
+ List<Long> tabletIds = request.isSetTabletIds() ?
request.getTabletIds() : Collections.emptyList();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("force sync tablet stats for txnId: {}, tabletNum:
{}, tabletIds: {}", request.txnId,
+ tabletIds.size(), tabletIds);
+ }
+ if (request.isSetTxnId() && request.getTxnId() != -1) {
+ byte[] receivedProtobufBytes = request.getPayload();
+ if (receivedProtobufBytes == null ||
receivedProtobufBytes.length <= 0) {
+ return new TStatus(TStatusCode.INVALID_ARGUMENT);
+ }
+ CommitTxnResponse commitTxnResponse =
CommitTxnResponse.parseFrom(receivedProtobufBytes);
+
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse,
null, tabletIds);
+ } else {
+ // compaction notify update tablet stats
+ CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
}
- CommitTxnResponse commitTxnResponse =
CommitTxnResponse.parseFrom(receivedProtobufBytes);
-
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse,
null);
} catch (InvalidProtocolBufferException e) {
// Handle the exception, log it, or take appropriate action
e.printStackTrace();
@@ -5462,6 +5477,33 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
}
+ @Override
+ public TStatus syncCloudTabletStats(TSyncCloudTabletStatsRequest request)
+ throws TException {
+ TStatus status = new TStatus(TStatusCode.OK);
+ if (Env.getCurrentEnv().isMaster()) {
+ LOG.warn("syncCloudTabletStats called on master, ignoring");
+ return status;
+ }
+
+ byte[] receivedProtobufBytes = request.getTabletStatsPb();
+ if (receivedProtobufBytes == null || receivedProtobufBytes.length <=
0) {
+ status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
+ status.addToErrorMsgs("TabletStatsPb is null or empty");
+ return status;
+ }
+ GetTabletStatsResponse getTabletStatsResponse;
+ try {
+ getTabletStatsResponse =
GetTabletStatsResponse.parseFrom(receivedProtobufBytes);
+ } catch (Exception e) {
+ status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
+ status.addToErrorMsgs("parse GetTabletStatsResponse error: " +
e.getMessage());
+ return status;
+ }
+
CloudTabletStatMgr.getInstance().syncTabletStats(getTabletStatsResponse);
+ return status;
+ }
+
private TStatus checkMaster() {
TStatus status = new TStatus(TStatusCode.OK);
if (!Env.getCurrentEnv().isMaster()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index b5e30e9893d..2ce7717912f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -230,7 +230,8 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
@Override
- public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos) {
+ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos,
+ List<Long> tabletIds) {
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index b611ff4e588..d05291e93e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -210,7 +210,8 @@ public interface GlobalTransactionMgrIface extends Writable
{
public void
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation)
throws Exception;
- public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos);
+ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos,
+ List<Long> tabletIds);
public void addSubTransaction(long dbId, long transactionId, long
subTransactionId);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index aeb5b06efb6..674ffc35a12 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1534,6 +1534,8 @@ struct TReportCommitTxnResultRequest {
2: optional i64 txnId
3: optional string label
4: optional binary payload
+ // tablets which need to update stats
+ 5: optional list<i64> tabletIds
}
struct TQueryColumn {
@@ -1868,6 +1870,10 @@ struct TMasterAddressResult {
2: optional Types.TNetworkAddress master_address
}
+struct TSyncCloudTabletStatsRequest {
+ 1: optional binary tablet_stats_pb
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1990,4 +1996,6 @@ service FrontendService {
TInsertOverwriteRecordResult addOrDropInsertOverwriteRecord(1:
TInsertOverwriteRecordRequest request)
TRecordFinishedLoadJobResult recordFinishedLoadJobRequest(1:
TRecordFinishedLoadJobRequest request)
+
+ Status.TStatus syncCloudTabletStats(1: TSyncCloudTabletStatsRequest
request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]