This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7a331a460b0 branch-4.1: pick #61318, #60543, #60705 (#61874)
7a331a460b0 is described below

commit 7a331a460b0bc0b329ec72d5dfb07bb48fa53710
Author: meiyi <[email protected]>
AuthorDate: Tue Mar 31 11:28:29 2026 +0800

    branch-4.1: pick #61318, #60543, #60705 (#61874)
    
    pick:
    1. modify CloudTabletRebalancer and CloudTabletStatMgr to reduce memory
    (https://github.com/apache/doris/pull/61318)
    2. cloud reduce get_tablet_stats rpc to meta_service
    (https://github.com/apache/doris/pull/60543)
    3. checkpoint save cloud tablet stats to image
    (https://github.com/apache/doris/pull/60705)
---
 be/src/cloud/cloud_meta_mgr.cpp                    |  25 +-
 .../main/java/org/apache/doris/common/Config.java  |  17 +-
 .../org/apache/doris/alter/CloudRollupJobV2.java   |  11 +
 .../apache/doris/alter/CloudSchemaChangeJobV2.java |  10 +
 .../apache/doris/catalog/CloudTabletStatMgr.java   | 318 +++++++++++++++++++--
 .../apache/doris/cloud/catalog/CloudReplica.java   |  22 +-
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  12 +-
 .../transaction/CloudGlobalTransactionMgr.java     |  30 +-
 .../java/org/apache/doris/common/ClientPool.java   |   3 +
 .../apache/doris/common/proc/TabletsProcDir.java   |  17 ++
 .../java/org/apache/doris/master/Checkpoint.java   | 124 +++++++-
 .../doris/metric/PrometheusMetricVisitor.java      |  26 +-
 .../java/org/apache/doris/persist/Storage.java     |  19 ++
 .../java/org/apache/doris/qe/SessionVariable.java  |   3 +
 .../apache/doris/service/FrontendServiceImpl.java  |  56 +++-
 .../doris/transaction/GlobalTransactionMgr.java    |   2 +-
 .../transaction/GlobalTransactionMgrIface.java     |   2 +-
 gensrc/thrift/FrontendService.thrift               |   8 +
 18 files changed, 619 insertions(+), 86 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 1cf71f74632..ca6c6ff3e79 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1383,15 +1383,18 @@ Status CloudMetaMgr::update_tmp_rowset(const 
RowsetMeta& rs_meta) {
 
 // async send TableStats(in res) to FE coz we are in streamload ctx, response 
to the user ASAP
 static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
-                                   const std::string& label, 
CommitTxnResponse& res) {
+                                   const std::string& label, 
CommitTxnResponse& res,
+                                   const std::vector<int64_t>& tablet_ids) {
     std::string protobufBytes;
-    res.SerializeToString(&protobufBytes);
+    if (txn_id != -1) {
+        res.SerializeToString(&protobufBytes);
+    }
     auto st = 
ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
-            [db_id, txn_id, label, protobufBytes]() -> Status {
+            [db_id, txn_id, label, protobufBytes, tablet_ids]() -> Status {
                 TReportCommitTxnResultRequest request;
                 TStatus result;
 
-                if (protobufBytes.length() <= 0) {
+                if (txn_id != -1 && protobufBytes.length() <= 0) {
                     LOG(WARNING) << "protobufBytes: " << 
protobufBytes.length();
                     return Status::OK(); // nobody cares the return status
                 }
@@ -1400,6 +1403,7 @@ static void send_stats_to_fe_async(const int64_t db_id, 
const int64_t txn_id,
                 request.__set_txnId(txn_id);
                 request.__set_label(label);
                 request.__set_payload(protobufBytes);
+                request.__set_tabletIds(tablet_ids);
 
                 Status status;
                 int64_t duration_ns = 0;
@@ -1453,7 +1457,11 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& 
ctx, bool is_2pc) {
     auto st = retry_rpc("commit txn", req, &res, 
&MetaService_Stub::commit_txn);
 
     if (st.ok()) {
-        send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res);
+        std::vector<int64_t> tablet_ids;
+        for (auto& commit_info : ctx.commit_infos) {
+            tablet_ids.emplace_back(commit_info.tabletId);
+        }
+        send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res, 
tablet_ids);
     }
 
     return st;
@@ -1612,6 +1620,13 @@ Status CloudMetaMgr::commit_tablet_job(const 
TabletJobInfoPB& job, FinishTabletJ
         return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
                 "txn conflict when commit tablet job {}", 
job.ShortDebugString());
     }
+
+    if (st.ok() && !job.compaction().empty() && job.has_idx()) {
+        CommitTxnResponse commit_txn_resp;
+        std::vector<int64_t> tablet_ids = {job.idx().tablet_id()};
+        send_stats_to_fe_async(-1, -1, "", commit_txn_resp, tablet_ids);
+    }
+
     return st;
 }
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0bd485c68bb..53b73dd2b04 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -659,6 +659,14 @@ public class Config extends ConfigBase {
                     + "insert into and stream load use group commit by 
default."})
     public static boolean wait_internal_group_commit_finish = false;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "The stale threshold of checkpoint image file in cloud mode (in 
seconds). "
+                    + "If the image file is older than this threshold, a new 
checkpoint will be triggered "
+                    + "even if there are no new journals. This helps keep 
table version, partition version, "
+                    + "and tablet stats in the image up-to-date. If the value 
is less than or equal to 0, "
+                    + "this feature is disabled."})
+    public static long cloud_checkpoint_image_stale_threshold_seconds = 3600;
+
     @ConfField(mutable = false, masterOnly = true, description = 
{"攒批的默认提交时间,单位是毫秒",
             "Default commit interval in ms for group commit"})
     public static int group_commit_interval_ms_default_value = 10000;
@@ -3540,7 +3548,6 @@ public class Config extends ConfigBase {
             description = { "存算分离模式下,一个 BE 挂掉多长时间后,它的 tablet 彻底转移到其他 BE 上" })
     public static int rehash_tablet_after_be_dead_seconds = 3600;
 
-
     @ConfField(mutable = true, description = {"存算分离模式下是否启用自动启停功能,默认 true",
         "Whether to enable the automatic start-stop feature in cloud model, 
default is true."})
     public static boolean enable_auto_start_for_cloud_cluster = true;
@@ -3554,6 +3561,14 @@ public class Config extends ConfigBase {
         "Maximal concurrent num of get tablet stat job."})
     public static int max_get_tablet_stat_task_threads_num = 4;
 
+    @ConfField(description = {
+            "Maximal concurrent num of master FE sync tablet stats task to 
observers and followers in cloud mode."})
+    public static int cloud_sync_tablet_stats_task_threads_num = 4;
+
+    @ConfField(mutable = true, description = {"Version of getting tablet stats 
in cloud mode. "
+            + "Version 1: get all tablets; Version 2: get active and interval 
expired tablets"})
+    public static int cloud_get_tablet_stats_version = 2;
+
     @ConfField(description = {"存算分离模式下同步 table 和 partition version 的间隔. 所有 
frontend 都会检查",
             "Cloud table and partition version syncer interval. All frontends 
will perform the checking"})
     public static int cloud_version_syncer_interval_second = 20;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 37d6a5e1bca..ea11f2a175a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -54,6 +55,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class CloudRollupJobV2 extends RollupJobV2 {
     private static final Logger LOG = 
LogManager.getLogger(CloudRollupJobV2.class);
@@ -119,6 +121,15 @@ public class CloudRollupJobV2 extends RollupJobV2 {
 
         LOG.info("onCreateRollupReplicaDone finished, dbId:{}, tableId:{}, 
jobId:{}, rollupIndexList:{}",
                 dbId, tableId, jobId, rollupIndexList);
+
+        List<Long> tabletIds = partitionIdToRollupIndex.values().stream()
+                .flatMap(rollupIndex -> 
rollupIndex.getTablets().stream()).map(Tablet::getId)
+                .collect(Collectors.toList());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force sync tablet stats for table: {}, index: {}, 
tabletNum: {}, tabletIds: {}", tableId,
+                    rollupIndexId, tabletIds.size(), tabletIds);
+        }
+        CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 63e2575abb6..8b7a7dba1cf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -91,6 +92,15 @@ public class CloudSchemaChangeJobV2 extends 
SchemaChangeJobV2 {
         }
         LOG.info("commitShadowIndex finished, dbId:{}, tableId:{}, jobId:{}, 
shadowIdxList:{}",
                 dbId, tableId, jobId, shadowIdxList);
+
+        List<Long> tabletIds = partitionIndexMap.cellSet().stream()
+                .flatMap(cell -> 
cell.getValue().getTablets().stream().map(Tablet::getId))
+                .collect(Collectors.toList());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force sync tablet stats for table: {}, tabletNum: {}, 
tabletIds: {}", tableId,
+                    tabletIds.size(), tabletIds);
+        }
+        CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index 5b14786c1e7..18da6784acf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -18,40 +18,89 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
 import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
 import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
 import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
 import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
-
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TSyncCloudTabletStatsRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /*
  * CloudTabletStatMgr is for collecting tablet(replica) statistics from 
backends.
- * Each FE will collect by itself.
+ * Config.cloud_get_tablet_stats_version:
+ * 1: Each FE will collect by itself.
+ * 2: Master FE collects active tablet stats and pushes to followers and 
observers,
+ *    and each FE will collect tablet stats by interval ladder.
  */
 public class CloudTabletStatMgr extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(CloudTabletStatMgr.class);
 
+    private volatile long totalTableSize = 0;
+    // keep Config.prom_output_table_metrics_limit tables with the largest 
data size, used for prometheus output
     private volatile List<OlapTable.Statistics> cloudTableStatsList = new 
ArrayList<>();
 
     private static final ExecutorService GET_TABLET_STATS_THREAD_POOL = 
Executors.newFixedThreadPool(
-            Config.max_get_tablet_stat_task_threads_num);
+            Config.max_get_tablet_stat_task_threads_num,
+            new 
ThreadFactoryBuilder().setNameFormat("get-tablet-stats-%d").setDaemon(true).build());
+    // Master: send tablet stats to followers and observers
+    // Follower and observer: receive tablet stats from master
+    private static final ExecutorService SYNC_TABLET_STATS_THREAD_POOL = 
Executors.newFixedThreadPool(
+            Config.cloud_sync_tablet_stats_task_threads_num,
+            new 
ThreadFactoryBuilder().setNameFormat("sync-tablet-stats-%d").setDaemon(true).build());
+    private Set<Long> activeTablets = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Interval ladder in milliseconds: 1m, 5m, 10m, 30m, 2h, 6h, 12h, 3d, 
infinite.
+     * Tablets with changing stats stay at lower intervals; stable tablets 
move to higher intervals.
+     */
+    private static final long[] DEFAULT_INTERVAL_LADDER_MS = {
+            TimeUnit.MINUTES.toMillis(1),    // 1 minute
+            TimeUnit.MINUTES.toMillis(5),    // 5 minutes
+            TimeUnit.MINUTES.toMillis(10),   // 10 minutes
+            TimeUnit.MINUTES.toMillis(30),   // 30 minutes
+            TimeUnit.HOURS.toMillis(2),      // 2 hours
+            TimeUnit.HOURS.toMillis(6),      // 6 hours
+            TimeUnit.HOURS.toMillis(12),     // 12 hours
+            TimeUnit.DAYS.toMillis(3),       // 3 days
+            Long.MAX_VALUE                   // infinite (never auto-fetch)
+    };
 
     public CloudTabletStatMgr() {
         super("cloud tablet stat mgr", 
Config.tablet_stat_update_interval_second * 1000);
@@ -59,12 +108,57 @@ public class CloudTabletStatMgr extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        LOG.info("cloud tablet stat begin");
-        List<Long> dbIds = getAllTabletStats();
+        if (cloudTableStatsList.isEmpty()) {
+            // use tablet stats loaded from image to update table stats when 
fe start
+            // avoid that the table stats is empty for a long time since 
getAllTabletStats may consume a long time
+            LOG.info("cloud tablet stat is empty, will update stat info of all 
tables");
+            updateStatInfo(Env.getCurrentInternalCatalog().getDbIds());
+        }
+
+        int version = Config.cloud_get_tablet_stats_version;
+        LOG.info("cloud tablet stat begin with version: {}", version);
+
+        // version1: get all tablet stats
+        if (version == 1) {
+            this.activeTablets.clear();
+            List<Long> dbIds = getAllTabletStats(null);
+            updateStatInfo(dbIds);
+            return;
+        }
+
+        // version2: get stats for active tablets
+        Set<Long> copiedTablets = new HashSet<>(activeTablets);
+        activeTablets.removeAll(copiedTablets);
+        getActiveTabletStats(copiedTablets);
+
+        // get stats by interval
+        List<Long> dbIds = getAllTabletStats(cloudTablet -> {
+            if (copiedTablets.contains(cloudTablet.getId())) {
+                return false;
+            }
+            List<Replica> replicas = 
Env.getCurrentInvertedIndex().getReplicas(cloudTablet.getId());
+            if (replicas == null || replicas.isEmpty()) {
+                return false;
+            }
+            CloudReplica cloudReplica = (CloudReplica) replicas.get(0);
+            int index = cloudReplica.getStatsIntervalIndex();
+            if (index >= DEFAULT_INTERVAL_LADDER_MS.length) {
+                LOG.warn("get tablet stats interval index out of range, 
tabletId: {}, index: {}",
+                        cloudTablet.getId(), index);
+                index = DEFAULT_INTERVAL_LADDER_MS.length - 1;
+            }
+            long interval = DEFAULT_INTERVAL_LADDER_MS[index];
+            if (interval == Long.MAX_VALUE
+                    || System.currentTimeMillis() - 
cloudReplica.getLastGetTabletStatsTime() < interval) {
+                return false;
+            }
+            return true;
+        });
         updateStatInfo(dbIds);
     }
 
-    private List<Long> getAllTabletStats() {
+    private List<Long> getAllTabletStats(Function<CloudTablet, Boolean> 
filter) {
+        long getStatsTabletNum = 0;
         long start = System.currentTimeMillis();
         List<Future<Void>> futures = new ArrayList<>();
         GetTabletStatsRequest.Builder builder =
@@ -87,17 +181,21 @@ public class CloudTabletStatMgr extends MasterDaemon {
                     OlapTable tbl = (OlapTable) table;
                     for (Partition partition : tbl.getAllPartitions()) {
                         for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                            for (Long tabletId : index.getTabletIdsInOrder()) {
+                            for (Tablet tablet : index.getTablets()) {
+                                if (filter != null && 
!filter.apply((CloudTablet) tablet)) {
+                                    continue;
+                                }
+                                getStatsTabletNum++;
                                 TabletIndexPB.Builder tabletBuilder = 
TabletIndexPB.newBuilder();
                                 tabletBuilder.setDbId(dbId);
                                 tabletBuilder.setTableId(table.getId());
                                 tabletBuilder.setIndexId(index.getId());
                                 
tabletBuilder.setPartitionId(partition.getId());
-                                tabletBuilder.setTabletId(tabletId);
+                                tabletBuilder.setTabletId(tablet.getId());
                                 builder.addTabletIdx(tabletBuilder);
 
                                 if (builder.getTabletIdxCount() >= 
Config.get_tablet_stat_batch_size) {
-                                    
futures.add(submitGetTabletStatsTask(builder.build()));
+                                    
futures.add(submitGetTabletStatsTask(builder.build(), filter == null));
                                     builder = 
GetTabletStatsRequest.newBuilder()
                                             
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
                                 }
@@ -111,7 +209,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
         } // end for dbs
 
         if (builder.getTabletIdxCount() > 0) {
-            futures.add(submitGetTabletStatsTask(builder.build()));
+            futures.add(submitGetTabletStatsTask(builder.build(), filter == 
null));
         }
 
         try {
@@ -122,16 +220,62 @@ public class CloudTabletStatMgr extends MasterDaemon {
             LOG.error("Error waiting for get tablet stats tasks to complete", 
e);
         }
 
-        LOG.info("finished to get tablet stat of all backends. cost: {} ms",
-                (System.currentTimeMillis() - start));
+        LOG.info("finished to get tablet stats. getStatsTabletNum: {}, cost: 
{} ms",
+                getStatsTabletNum, (System.currentTimeMillis() - start));
         return dbIds;
     }
 
-    private Future<Void> submitGetTabletStatsTask(GetTabletStatsRequest req) {
+    private void getActiveTabletStats(Set<Long> tablets) {
+        List<Long> tabletIds = new ArrayList<>(tablets);
+        Collections.sort(tabletIds);
+        List<TabletMeta> tabletMetas = 
Env.getCurrentInvertedIndex().getTabletMetaList(tabletIds);
+        long start = System.currentTimeMillis();
+        List<Future<Void>> futures = new ArrayList<>();
+        GetTabletStatsRequest.Builder builder =
+                
GetTabletStatsRequest.newBuilder().setRequestIp(FrontendOptions.getLocalHostAddressCached());
+        long activeTabletNum = 0;
+        for (int i = 0; i < tabletIds.size(); i++) {
+            TabletIndexPB tabletIndexPB = getTabletIndexPB(tabletIds.get(i), 
tabletMetas.get(i));
+            if (tabletIndexPB == null) {
+                continue;
+            }
+            activeTabletNum++;
+            builder.addTabletIdx(tabletIndexPB);
+            if (builder.getTabletIdxCount() >= 
Config.get_tablet_stat_batch_size) {
+                futures.add(submitGetTabletStatsTask(builder.build(), true));
+                builder = GetTabletStatsRequest.newBuilder()
+                        
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
+            }
+        }
+        if (builder.getTabletIdxCount() > 0) {
+            futures.add(submitGetTabletStatsTask(builder.build(), true));
+        }
+
+        try {
+            for (Future<Void> future : futures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error waiting for get tablet stats tasks to complete", 
e);
+        }
+        LOG.info("finished to get {} active tablets stats, cost {}ms", 
activeTabletNum,
+                System.currentTimeMillis() - start);
+    }
+
+    private TabletIndexPB getTabletIndexPB(long tabletId, TabletMeta 
tabletMeta) {
+        if (tabletMeta == null || tabletMeta == 
TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+            return null;
+        }
+        return 
TabletIndexPB.newBuilder().setDbId(tabletMeta.getDbId()).setTableId(tabletMeta.getTableId())
+                
.setIndexId(tabletMeta.getIndexId()).setPartitionId(tabletMeta.getPartitionId()).setTabletId(tabletId)
+                .build();
+    }
+
+    private Future<Void> submitGetTabletStatsTask(GetTabletStatsRequest req, 
boolean activeUpdate) {
         return GET_TABLET_STATS_THREAD_POOL.submit(() -> {
             GetTabletStatsResponse resp;
             try {
-                resp = getTabletStats(req);
+                resp = getTabletStatsFromMs(req);
             } catch (RpcException e) {
                 LOG.warn("get tablet stats exception:", e);
                 return null;
@@ -140,15 +284,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 LOG.warn("get tablet stats return failed.");
                 return null;
             }
-            if (LOG.isDebugEnabled()) {
-                int i = 0;
-                for (TabletIndexPB idx : req.getTabletIdxList()) {
-                    LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id: 
{} size: {}",
-                            idx.getDbId(), idx.getTableId(), idx.getIndexId(),
-                            idx.getTabletId(), 
resp.getTabletStats(i++).getDataSize());
-                }
-            }
-            updateTabletStat(resp);
+            updateTabletStat(resp, activeUpdate);
             return null;
         });
     }
@@ -282,7 +418,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                             tableTotalLocalIndexSize, 
tableTotalLocalSegmentSize, 0L, 0L);
                     olapTable.setStatistics(tableStats);
                     LOG.debug("finished to set row num for table: {} in 
database: {}",
-                             table.getName(), db.getFullName());
+                            table.getName(), db.getFullName());
                 } finally {
                     table.readUnlock();
                 }
@@ -290,7 +426,8 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 newCloudTableStatsList.add(tableStats);
             }
         }
-        this.cloudTableStatsList = newCloudTableStatsList;
+        filterTopTableStatsByDataSize(newCloudTableStatsList);
+        this.totalTableSize = totalTableSize;
 
         if (MetricRepo.isInit) {
             
MetricRepo.GAUGE_MAX_TABLE_SIZE_BYTES.setValue(maxTableSize.second);
@@ -333,7 +470,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 (System.currentTimeMillis() - start));
     }
 
-    private void updateTabletStat(GetTabletStatsResponse response) {
+    private void updateTabletStat(GetTabletStatsResponse response, boolean 
activeUpdate) {
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
         for (TabletStatsPB stat : response.getTabletStatsList()) {
             List<Replica> replicas = 
invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
@@ -341,16 +478,45 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 continue;
             }
             Replica replica = replicas.get(0);
+            boolean statsChanged = replica.getDataSize() != stat.getDataSize()
+                    || replica.getRowsetCount() != stat.getNumRowsets()
+                    || replica.getSegmentCount() != stat.getNumSegments()
+                    || replica.getRowCount() != stat.getNumRows()
+                    || replica.getLocalInvertedIndexSize() != 
stat.getIndexSize()
+                    || replica.getLocalSegmentSize() != stat.getSegmentSize();
             replica.setDataSize(stat.getDataSize());
             replica.setRowsetCount(stat.getNumRowsets());
             replica.setSegmentCount(stat.getNumSegments());
             replica.setRowCount(stat.getNumRows());
             replica.setLocalInvertedIndexSize(stat.getIndexSize());
             replica.setLocalSegmentSize(stat.getSegmentSize());
+
+            CloudReplica cloudReplica = (CloudReplica) replica;
+            cloudReplica.setLastGetTabletStatsTime(System.currentTimeMillis());
+            int statsIntervalIndex = cloudReplica.getStatsIntervalIndex();
+            if (activeUpdate || statsChanged) {
+                statsIntervalIndex = 0;
+                if (!activeUpdate && statsChanged && LOG.isDebugEnabled()) {
+                    LOG.debug("tablet stats changed, reset interval index to 
0, dbId: {}, tableId: {}, "
+                                    + "indexId: {}, partitionId: {}, tabletId: 
{}, dataSize: {}, rowCount: {}, "
+                                    + "rowsetCount: {}, segmentCount: {}, 
indexSize: {}, segmentSize: {}. lastIdx: {}",
+                            stat.getIdx().getDbId(), 
stat.getIdx().getTableId(), stat.getIdx().getIndexId(),
+                            stat.getIdx().getPartitionId(), 
stat.getIdx().getTabletId(), stat.getDataSize(),
+                            stat.getNumRows(), stat.getNumRowsets(), 
stat.getNumSegments(), stat.getIndexSize(),
+                            stat.getSegmentSize(), 
cloudReplica.getStatsIntervalIndex());
+                }
+            } else {
+                statsIntervalIndex = Math.min(statsIntervalIndex + 1, 
DEFAULT_INTERVAL_LADDER_MS.length - 1);
+            }
+            cloudReplica.setStatsIntervalIndex(statsIntervalIndex);
+        }
+        // push tablet stats to other fes
+        if (Config.cloud_get_tablet_stats_version == 2 && activeUpdate && 
Env.getCurrentEnv().isMaster()) {
+            pushTabletStats(response);
         }
     }
 
-    private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest 
request)
+    private GetTabletStatsResponse getTabletStatsFromMs(GetTabletStatsRequest 
request)
             throws RpcException {
         GetTabletStatsResponse response;
         try {
@@ -365,4 +531,102 @@ public class CloudTabletStatMgr extends MasterDaemon {
     public List<OlapTable.Statistics> getCloudTableStats() {
         return this.cloudTableStatsList;
     }
+
+    public long getTotalTableSize() {
+        return this.totalTableSize;
+    }
+
+    private void filterTopTableStatsByDataSize(List<OlapTable.Statistics> 
newCloudTableStatsList) {
+        int limit = Config.prom_output_table_metrics_limit;
+        if (limit <= 0 || newCloudTableStatsList.size() <= limit) {
+            this.cloudTableStatsList = newCloudTableStatsList;
+            return;
+        }
+        // only copy elements if number of tables > 
prom_output_table_metrics_limit
+        PriorityQueue<OlapTable.Statistics> topStats = new 
PriorityQueue<>(limit,
+                Comparator.comparingLong(OlapTable.Statistics::getDataSize));
+        for (OlapTable.Statistics stats : newCloudTableStatsList) {
+            if (topStats.size() < limit) {
+                topStats.offer(stats);
+            } else if (!topStats.isEmpty() && stats.getDataSize() > 
topStats.peek().getDataSize()) {
+                topStats.poll();
+                topStats.offer(stats);
+            }
+        }
+        this.cloudTableStatsList = new ArrayList<>(topStats);
+    }
+
+    public void addActiveTablets(List<Long> tabletIds) {
+        if (Config.cloud_get_tablet_stats_version == 1 || tabletIds == null || 
tabletIds.isEmpty()) {
+            return;
+        }
+        activeTablets.addAll(tabletIds);
+    }
+
+    // master FE send update tablet stats rpc to other FEs
+    private void pushTabletStats(GetTabletStatsResponse response) {
+        List<Frontend> frontends = getFrontends();
+        if (frontends == null || frontends.isEmpty()) {
+            return;
+        }
+        TSyncCloudTabletStatsRequest request = new 
TSyncCloudTabletStatsRequest();
+        request.setTabletStatsPb(ByteBuffer.wrap(response.toByteArray()));
+        for (Frontend fe : frontends) {
+            SYNC_TABLET_STATS_THREAD_POOL.submit(() -> {
+                try {
+                    pushTabletStatsToFe(request, fe);
+                } catch (Exception e) {
+                    LOG.warn("push tablet stats to frontend {}:{} error", 
fe.getHost(), fe.getRpcPort(), e);
+                }
+            });
+        }
+    }
+
+    private void pushTabletStatsToFe(TSyncCloudTabletStatsRequest request, 
Frontend fe) {
+        FrontendService.Client client = null;
+        TNetworkAddress addr = new TNetworkAddress(fe.getHost(), 
fe.getRpcPort());
+        boolean ok = false;
+        try {
+            client = ClientPool.frontendStatsPool.borrowObject(addr);
+            TStatus status = client.syncCloudTabletStats(request);
+            ok = true;
+            if (status.getStatusCode() != TStatusCode.OK) {
+                LOG.warn("failed to push cloud tablet stats to frontend {}:{}, 
err: {}", fe.getHost(),
+                        fe.getRpcPort(), status.getErrorMsgs());
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to push update cloud tablet stats to frontend 
{}:{}", fe.getHost(), fe.getRpcPort(), e);
+        } finally {
+            if (ok) {
+                ClientPool.frontendStatsPool.returnObject(addr, client);
+            } else {
+                ClientPool.frontendStatsPool.invalidateObject(addr, client);
+            }
+        }
+    }
+
+    // follower and observer FE receive sync tablet stats rpc from master FE
+    public void syncTabletStats(GetTabletStatsResponse response) {
+        if (Config.cloud_get_tablet_stats_version == 1 || 
response.getTabletStatsList().isEmpty()) {
+            return;
+        }
+        SYNC_TABLET_STATS_THREAD_POOL.submit(() -> {
+            updateTabletStat(response, true);
+        });
+    }
+
+    private List<Frontend> getFrontends() {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return Collections.emptyList();
+        }
+        HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+        return Env.getCurrentEnv().getFrontends(null).stream()
+                .filter(fe -> fe.isAlive() && 
!(fe.getHost().equals(selfNode.getHost())
+                        && fe.getEditLogPort() == selfNode.getPort())).collect(
+                        Collectors.toList());
+    }
+
+    public static CloudTabletStatMgr getInstance() {
+        return (CloudTabletStatMgr) Env.getCurrentEnv().getTabletStatMgr();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index afd2fd2501f..c50b5d80923 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -36,6 +36,8 @@ import com.google.common.base.Strings;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -67,9 +69,27 @@ public class CloudReplica extends Replica implements 
GsonPostProcessable {
     private long indexId = -1;
     @SerializedName(value = "idx")
     private long idx = -1;
+    // last time to get tablet stats
+    @Getter
+    @Setter
+    @SerializedName(value = "gst")
+    long lastGetTabletStatsTime = 0;
+    /**
+     * The index of {@link 
org.apache.doris.catalog.CloudTabletStatMgr#DEFAULT_INTERVAL_LADDER_MS} array.
+     * Used to control the interval of getting tablet stats.
+     * When get tablet stats:
+     * if the stats is unchanged, will update this index to next value to get 
stats less frequently;
+     * if the stats is changed, will update this index to 0 to get stats more 
frequently.
+     */
+    @Getter
+    @Setter
+    @SerializedName(value = "sii")
+    int statsIntervalIndex = 0;
 
+    @SerializedName(value = "sc")
     private long segmentCount = 0L;
-    private long rowsetCount = 0L;
+    @SerializedName(value = "rsc")
+    private long rowsetCount = 1L; // [0-1] rowset
 
     private static final Random rand = new Random();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index b3f307dfd99..735adb0061b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -240,10 +240,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
 
     @Getter
     private class InfightTablet {
-        private final Long tabletId;
+        private final long tabletId;
         private final String clusterId;
 
-        public InfightTablet(Long tabletId, String clusterId) {
+        public InfightTablet(long tabletId, String clusterId) {
             this.tabletId = tabletId;
             this.clusterId = clusterId;
         }
@@ -257,7 +257,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                 return false;
             }
             InfightTablet that = (InfightTablet) o;
-            return tabletId.equals(that.tabletId) && 
clusterId.equals(that.clusterId);
+            return tabletId == that.tabletId && 
clusterId.equals(that.clusterId);
         }
 
         @Override
@@ -270,7 +270,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
         public Tablet pickedTablet;
         public long srcBe;
         public long destBe;
-        public Map<Long, Set<Tablet>> beToTablets;
         public long startTimestamp;
         BalanceType balanceType;
     }
@@ -1511,7 +1510,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
                         futurePartitionToTablets, futureBeToTabletsInTable)) {
                     continue;
                 }
-                preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType, beToTablets);
+                preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId, 
balanceType);
             }
         }
     }
@@ -1552,7 +1551,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
     }
 
     private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long 
destBe, String clusterId,
-                                     BalanceType balanceType, Map<Long, 
Set<Tablet>> beToTablets) {
+            BalanceType balanceType) {
         Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
         Backend destBackend = cloudSystemInfoService.getBackend(destBe);
         if (srcBackend == null || destBackend == null) {
@@ -1566,7 +1565,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
         task.srcBe = srcBe;
         task.destBe = destBe;
         task.balanceType = balanceType;
-        task.beToTablets = beToTablets;
         task.startTimestamp = System.currentTimeMillis() / 1000;
         InfightTablet key = new InfightTablet(pickedTablet.getId(), clusterId);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index bc61868f83f..bf7e82072c3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.cloud.transaction;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -146,6 +147,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -482,7 +484,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
      * 2. produce event for further processes like async MV
      * @param commitTxnResponse commit txn call response from meta-service
      */
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<Long> tabletIds) {
         // ========================================
         // update some table stats
         // ========================================
@@ -516,6 +518,12 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         if (sb.length() > 0) {
             LOG.info("notify partition first load. {}", sb);
         }
+        // 4. notify update tablet stats
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force sync tablet stats for txnId: {}, tabletNum: {}, 
tabletIds: {}", txnId,
+                    tabletIds.size(), tabletIds);
+        }
+        CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
 
         // ========================================
         // produce event
@@ -706,11 +714,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment);
+        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment,
+                tabletCommitInfos == null ? Collections.emptyList()
+                        : tabletCommitInfos.stream().map(t -> 
t.getTabletId()).collect(Collectors.toList()));
     }
 
     private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, 
long transactionId, boolean is2PC,
-            TxnCommitAttachment txnCommitAttachment) throws UserException {
+            TxnCommitAttachment txnCommitAttachment, List<Long> tabletIds) 
throws UserException {
         if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
             LOG.info("debug point FE.mow.commit.exception, throw e");
             throw new UserException(InternalErrorCode.INTERNAL_ERR, "debug 
point FE.mow.commit.exception");
@@ -733,7 +743,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         try {
-            txnState = commitTxn(commitTxnRequest, transactionId, is2PC);
+            txnState = commitTxn(commitTxnRequest, transactionId, is2PC, 
tabletIds);
             txnOperated = true;
             if 
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
 {
                 throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -772,8 +782,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
-    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC)
-            throws UserException {
+    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC,
+            List<Long> tabletIds) throws UserException {
         checkCommitInfo(commitTxnRequest);
 
         CommitTxnResponse commitTxnResponse = null;
@@ -833,7 +843,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
             MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() 
- txnState.getPrepareTime());
         }
-        afterCommitTxnResp(commitTxnResponse);
+        afterCommitTxnResp(commitTxnResponse, tabletIds);
         return txnState;
     }
 
@@ -1610,6 +1620,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             builder.addMowTableIds(olapTable.getId());
         }
         // add sub txn infos
+        Set<Long> tabletIds = new HashSet<>();
         for (SubTransactionState subTransactionState : subTransactionStates) {
             
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
                     .setTableId(subTransactionState.getTable().getId())
@@ -1619,10 +1630,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                                             .map(c -> new 
TabletCommitInfo(c.getTabletId(), c.getBackendId()))
                                             .collect(Collectors.toList())))
                     .build());
+            for (TTabletCommitInfo tabletCommitInfo : 
subTransactionState.getTabletCommitInfos()) {
+                tabletIds.add(tabletCommitInfo.getTabletId());
+            }
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null);
+        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null, 
new ArrayList<>(tabletIds));
     }
 
     private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 2b769ae2e62..f10ad74f33b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -72,6 +72,9 @@ public class ClientPool {
     public static GenericPool<FrontendService.Client> frontendPool =
             new GenericPool("FrontendService", backendConfig, 
Config.backend_rpc_timeout_ms,
                     
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
+    public static GenericPool<FrontendService.Client> frontendStatsPool =
+            new GenericPool<>("FrontendService", heartbeatConfig, 
heartbeatTimeoutMs,
+                    
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<BackendService.Client> backendPool =
             new GenericPool("BackendService", backendConfig, 
Config.backend_rpc_timeout_ms);
     public static GenericPool<TPaloBrokerService.Client> brokerPool =
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index 9a273e150ba..77a2a6565d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.proc;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -30,12 +31,15 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.query.QueryStatsUtil;
 import org.apache.doris.system.Backend;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,6 +52,7 @@ import java.util.Map;
  * show tablets' detail info within an index
  */
 public class TabletsProcDir implements ProcDirInterface {
+    private static final Logger LOG = 
LogManager.getLogger(TabletsProcDir.class);
     public static final ImmutableList<String> TITLE_NAMES;
 
     static {
@@ -88,6 +93,11 @@ public class TabletsProcDir implements ProcDirInterface {
                 pathHashToRoot.put(diskInfo.getPathHash(), 
diskInfo.getRootPath());
             }
         }
+        List<Long> tabletIds = null;
+        if (Config.isCloudMode() && ConnectContext.get() != null && 
ConnectContext.get()
+                .getSessionVariable().cloudForceSyncTabletStats) {
+            tabletIds = new ArrayList<>();
+        }
         table.readLock();
         try {
             Map<Long, Long> replicaIdToQueryHits = new HashMap<>();
@@ -103,6 +113,9 @@ public class TabletsProcDir implements ProcDirInterface {
 
             // get infos
             for (Tablet tablet : index.getTablets()) {
+                if (tabletIds != null) {
+                    tabletIds.add(tablet.getId());
+                }
                 long tabletId = tablet.getId();
                 if (tablet.getReplicas().size() == 0) {
                     List<Comparable> tabletInfo = new ArrayList<Comparable>();
@@ -194,6 +207,10 @@ public class TabletsProcDir implements ProcDirInterface {
         } finally {
             table.readUnlock();
         }
+        if (tabletIds != null && !tabletIds.isEmpty()) {
+            LOG.info("force sync tablet stats for table: {}, tabletNum: {}", 
table, tabletIds.size());
+            CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
+        }
         return tabletInfos;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
index ec4126aa86d..0d21fa8094c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
@@ -17,7 +17,16 @@
 
 package org.apache.doris.master;
 
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.cloud.catalog.CloudReplica;
 import org.apache.doris.common.CheckpointException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
@@ -104,11 +113,23 @@ public class Checkpoint extends MasterDaemon {
             storage = new Storage(imageDir);
             // get max image version
             imageVersion = storage.getLatestImageSeq();
+            long latestImageCreateTime = storage.getLatestImageCreateTime();
             // get max finalized journal id
             checkPointVersion = editLog.getFinalizedJournalId();
-            LOG.info("last checkpoint journal id: {}, current finalized 
journal id: {}",
-                    imageVersion, checkPointVersion);
-            if (imageVersion >= checkPointVersion) {
+            LOG.info("last checkpoint journal id: {}, create timestamp: {}. 
current finalized journal id: {}",
+                    imageVersion, latestImageCreateTime, checkPointVersion);
+            if (imageVersion < checkPointVersion) {
+                LOG.info("Trigger checkpoint since last checkpoint journal id: 
{} is less than "
+                        + "current finalized journal id: {}", imageVersion, 
checkPointVersion);
+            } else if (Config.isCloudMode() && 
Config.cloud_checkpoint_image_stale_threshold_seconds > 0
+                    && latestImageCreateTime > 0 && 
((System.currentTimeMillis() - latestImageCreateTime)
+                    >= Config.cloud_checkpoint_image_stale_threshold_seconds * 
1000L)) {
+                // No new finalized journals beyond the latest image.
+                // But in cloud mode, we may still want to force a checkpoint 
if the latest image file is expired.
+                // This helps that image can keep the newer table version, 
partition version, tablet stats.
+                LOG.info("Trigger checkpoint in cloud mode because latest 
image is expired. "
+                        + "latestImageSeq: {}, latestImageCreateTime: {}", 
imageVersion, latestImageCreateTime);
+            } else {
                 return;
             }
         } catch (Throwable e) {
@@ -146,6 +167,7 @@ public class Checkpoint extends MasterDaemon {
                                 checkPointVersion, 
env.getReplayedJournalId()));
             }
             env.postProcessAfterMetadataReplayed(false);
+            postProcessCloudMetadata();
             latestImageFilePath = env.saveImage();
             replayedJournalId = env.getReplayedJournalId();
 
@@ -395,4 +417,100 @@ public class Checkpoint extends MasterDaemon {
     public ReentrantReadWriteLock getLock() {
         return lock;
     }
+
+    private void postProcessCloudMetadata() {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+        Env servingEnv = Env.getServingEnv();
+        if (servingEnv == null) {
+            LOG.warn("serving env is null, skip process cloud metadata for 
checkpoint");
+            return;
+        }
+        long start = System.currentTimeMillis();
+        for (Database db : env.getInternalCatalog().getDbs()) {
+            Database servingDb = 
servingEnv.getInternalCatalog().getDbNullable(db.getId());
+            if (servingDb == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("serving db is null. dbId: {}, dbName: {}", 
db.getId(), db.getFullName());
+                }
+                continue;
+            }
+
+            for (Table table : db.getTables()) {
+                Table servingTable = servingDb.getTableNullable(table.getId());
+                if (servingTable == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("serving table is null. dbId: {}, table: 
{}", db.getId(), table);
+                    }
+                    continue;
+                }
+                if (!(table instanceof OlapTable) || !(servingTable instanceof 
OlapTable)) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) table;
+                OlapTable servingOlapTable = (OlapTable) servingTable;
+
+                List<Partition> partitions = olapTable.getAllPartitions();
+                for (Partition partition : partitions) {
+                    Partition servingPartition = 
servingOlapTable.getPartition(partition.getId());
+                    if (servingPartition == null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("serving partition is null. tableId: {}, 
partitionId: {}", table.getId(),
+                                    partition.getId());
+                        }
+                        continue;
+                    }
+                    // set tablet stats
+                    setTabletStats(table.getId(), partition, servingPartition);
+                }
+            }
+        }
+        LOG.info("post process cloud metadata for checkpoint finished. cost {} 
ms", System.currentTimeMillis() - start);
+    }
+
+    private void setTabletStats(long tableId, Partition partition, Partition 
servingPartition) {
+        for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+            MaterializedIndex servingIndex = 
servingPartition.getIndex(index.getId());
+            if (servingIndex == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("serving index is null. tableId: {}, 
partitionId: {}, indexId: {}", tableId,
+                            partition.getId(), index.getId());
+                }
+                continue;
+            }
+            for (Tablet tablet : index.getTablets()) {
+                Tablet servingTablet = servingIndex.getTablet(tablet.getId());
+                if (servingTablet == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("serving tablet is null. tableId: {}, 
partitionId: {}, indexId: {}, tabletId: {}",
+                                tableId, partition.getId(), index.getId(), 
tablet.getId());
+                    }
+                    continue;
+                }
+                for (Replica replica : tablet.getReplicas()) {
+                    Replica servingReplica = 
servingTablet.getReplicaById(replica.getId());
+                    if (servingReplica == null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("serving replica is null. tableId: {}, 
partitionId: {}, indexId: {}, "
+                                            + "tabletId: {}, replicaId: {}", 
tableId, partition.getId(), index.getId(),
+                                    tablet.getId(), replica.getId());
+                        }
+                        continue;
+                    }
+                    replica.setDataSize(servingReplica.getDataSize());
+                    replica.setRowsetCount(servingReplica.getRowsetCount());
+                    replica.setSegmentCount(servingReplica.getSegmentCount());
+                    replica.setRowCount(servingReplica.getRowCount());
+                    
replica.setLocalInvertedIndexSize(servingReplica.getLocalInvertedIndexSize());
+                    
replica.setLocalSegmentSize(servingReplica.getLocalSegmentSize());
+                    // set last get stats time and stats interval index
+                    CloudReplica cloudReplica = (CloudReplica) replica;
+                    CloudReplica servingCloudReplica = (CloudReplica) 
servingReplica;
+                    
cloudReplica.setStatsIntervalIndex(servingCloudReplica.getStatsIntervalIndex());
+                    
cloudReplica.setLastGetTabletStatsTime(servingCloudReplica.getLastGetTabletStatsTime());
+                }
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
index 764002aaed0..2103dcb6c21 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/metric/PrometheusMetricVisitor.java
@@ -35,12 +35,10 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -252,30 +250,8 @@ public class PrometheusMetricVisitor extends MetricVisitor 
{
         StringBuilder tableRowCountBuilder = new StringBuilder();
 
         Collection<OlapTable.Statistics> values = 
tabletStatMgr.getCloudTableStats();
-        // calc totalTableSize
-        long totalTableSize = 0;
+        long totalTableSize = tabletStatMgr.getTotalTableSize();
         for (OlapTable.Statistics stats : values) {
-            totalTableSize += stats.getDataSize();
-        }
-        // output top N metrics
-        if (values.size() > Config.prom_output_table_metrics_limit) {
-            // only copy elements if number of tables > 
prom_output_table_metrics_limit
-            PriorityQueue<OlapTable.Statistics> topStats = new PriorityQueue<>(
-                    Config.prom_output_table_metrics_limit,
-                    
Comparator.comparingLong(OlapTable.Statistics::getDataSize));
-            for (OlapTable.Statistics stats : values) {
-                if (topStats.size() < Config.prom_output_table_metrics_limit) {
-                    topStats.offer(stats);
-                } else if (!topStats.isEmpty()
-                        && stats.getDataSize() > 
topStats.peek().getDataSize()) {
-                    topStats.poll();
-                    topStats.offer(stats);
-                }
-            }
-            values = topStats;
-        }
-        for (OlapTable.Statistics stats : values) {
-
             dataSizeBuilder.append("doris_fe_table_data_size{db_name=\"");
             dataSizeBuilder.append(stats.getDbName());
             dataSizeBuilder.append("\", table_name=\"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java
index 9f8cd558a57..e826b7e03c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/Storage.java
@@ -68,6 +68,7 @@ public class Storage {
     private long editsSeq;
     private long latestImageSeq = 0;
     private long latestValidatedImageSeq = 0;
+    private long latestImageCreateTime = 0;
     private String metaDir;
     private List<Long> editsFileSequenceNumbers;
 
@@ -83,6 +84,15 @@ public class Storage {
         this.editsSeq = editsSeq;
         this.latestImageSeq = latestImageSeq;
         this.metaDir = metaDir;
+        // try to set latestImageCreateTime from the image file if it exists
+        try {
+            File img = getImageFile(latestImageSeq);
+            if (img != null && img.exists()) {
+                latestImageCreateTime = img.lastModified();
+            }
+        } catch (Exception e) {
+            // ignore; best-effort only
+        }
     }
 
     public Storage(String metaDir) throws IOException {
@@ -146,6 +156,7 @@ public class Storage {
                         imageIds.add(fileSeq);
                         if (latestImageSeq < fileSeq) {
                             latestImageSeq = fileSeq;
+                            latestImageCreateTime = child.lastModified();
                         }
                     } else if (name.startsWith(EDITS)) {
                         // Just record the sequence part of the file name
@@ -162,6 +173,14 @@ public class Storage {
         latestValidatedImageSeq = imageIds.size() < 2 ? 0 : 
imageIds.get(imageIds.size() - 2);
     }
 
+    /**
+     * Return latest image creation time in milliseconds since epoch.
+     * 0 means unknown.
+     */
+    public long getLatestImageCreateTime() {
+        return latestImageCreateTime;
+    }
+
     public int getClusterID() {
         return clusterID;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index cf6e8b44356..0a6f09978dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -814,6 +814,7 @@ public class SessionVariable implements Serializable, 
Writable {
             "cloud_partition_version_cache_ttl_ms";
     public static final String CLOUD_TABLE_VERSION_CACHE_TTL_MS =
             "cloud_table_version_cache_ttl_ms";
+    public static final String CLOUD_FORCE_SYNC_TABLET_STATS = 
"cloud_force_sync_tablet_stats";
     // CLOUD_VARIABLES_BEGIN
 
     public static final String ENABLE_MATCH_WITHOUT_INVERTED_INDEX = 
"enable_match_without_inverted_index";
@@ -3046,6 +3047,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public String cloudCluster = "";
     @VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
     public boolean disableEmptyPartitionPrune = false;
+    @VariableMgr.VarAttr(name = CLOUD_FORCE_SYNC_TABLET_STATS)
+    public boolean cloudForceSyncTabletStats = false;
     @VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
     public long cloudPartitionVersionCacheTtlMs = Long.MAX_VALUE;
     @VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1af06e77b9f..23f7730f1f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.doris.backup.BackupMeta;
 import org.apache.doris.backup.Snapshot;
 import org.apache.doris.binlog.BinlogLagInfo;
 import org.apache.doris.catalog.AutoIncrementGenerator;
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
@@ -50,6 +51,7 @@ import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.cloud.catalog.CloudReplica;
 import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
@@ -273,6 +275,7 @@ import 
org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TStreamLoadPutResult;
 import org.apache.doris.thrift.TSubTxnInfo;
+import org.apache.doris.thrift.TSyncCloudTabletStatsRequest;
 import org.apache.doris.thrift.TSyncQueryColumns;
 import org.apache.doris.thrift.TTableIndexQueryStats;
 import org.apache.doris.thrift.TTableMetadataNameIds;
@@ -4506,16 +4509,28 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return new TStatus(TStatusCode.NOT_MASTER);
         }
 
-        LOG.info("receive load stats report request: {}, backend: {}, dbId: 
{}, txnId: {}, label: {}",
-                request, clientAddr, request.getDbId(), request.getTxnId(), 
request.getLabel());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive load stats report from backend: {}, dbId: {}, 
txnId: {}, label: {}, tabletIds: {}",
+                    clientAddr, request.getDbId(), request.getTxnId(), 
request.getLabel(), request.getTabletIds());
+        }
 
         try {
-            byte[] receivedProtobufBytes = request.getPayload();
-            if (receivedProtobufBytes == null || receivedProtobufBytes.length 
<= 0) {
-                return new TStatus(TStatusCode.INVALID_ARGUMENT);
+            List<Long> tabletIds = request.isSetTabletIds() ? 
request.getTabletIds() : Collections.emptyList();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("force sync tablet stats for txnId: {}, tabletNum: 
{}, tabletIds: {}", request.txnId,
+                        tabletIds.size(), tabletIds);
+            }
+            if (request.isSetTxnId() && request.getTxnId() != -1) {
+                byte[] receivedProtobufBytes = request.getPayload();
+                if (receivedProtobufBytes == null || 
receivedProtobufBytes.length <= 0) {
+                    return new TStatus(TStatusCode.INVALID_ARGUMENT);
+                }
+                CommitTxnResponse commitTxnResponse = 
CommitTxnResponse.parseFrom(receivedProtobufBytes);
+                
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse, 
tabletIds);
+            } else {
+                // compaction notify update tablet stats
+                CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
             }
-            CommitTxnResponse commitTxnResponse = 
CommitTxnResponse.parseFrom(receivedProtobufBytes);
-            
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse);
         } catch (InvalidProtocolBufferException e) {
             // Handle the exception, log it, or take appropriate action
             e.printStackTrace();
@@ -4846,6 +4861,33 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
     }
 
+    @Override
+    public TStatus syncCloudTabletStats(TSyncCloudTabletStatsRequest request)
+            throws TException {
+        TStatus status = new TStatus(TStatusCode.OK);
+        if (Env.getCurrentEnv().isMaster()) {
+            LOG.warn("syncCloudTabletStats called on master, ignoring");
+            return status;
+        }
+
+        byte[] receivedProtobufBytes = request.getTabletStatsPb();
+        if (receivedProtobufBytes == null || receivedProtobufBytes.length <= 
0) {
+            status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
+            status.addToErrorMsgs("TabletStatsPb is null or empty");
+            return status;
+        }
+        GetTabletStatsResponse getTabletStatsResponse;
+        try {
+            getTabletStatsResponse = 
GetTabletStatsResponse.parseFrom(receivedProtobufBytes);
+        } catch (Exception e) {
+            status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
+            status.addToErrorMsgs("parse GetTabletStatsResponse error: " + 
e.getMessage());
+            return status;
+        }
+        
CloudTabletStatMgr.getInstance().syncTabletStats(getTabletStatsResponse);
+        return status;
+    }
+
     private TStatus checkMaster() {
         TStatus status = new TStatus(TStatusCode.OK);
         if (!Env.getCurrentEnv().isMaster()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 614448891a2..2ceb14776b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -230,7 +230,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     @Override
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<Long> tabletIds) {
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index d56193fe683..ae837dc064a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -210,7 +210,7 @@ public interface GlobalTransactionMgrIface extends Writable 
{
 
     public void 
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) 
throws Exception;
 
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse);
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<Long> tabletIds);
 
     public void addSubTransaction(long dbId, long transactionId, long 
subTransactionId);
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 641624f4d12..9d46246d901 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1584,6 +1584,8 @@ struct TReportCommitTxnResultRequest {
     2: optional i64 txnId
     3: optional string label
     4: optional binary payload
+    // tablets which need to update stats
+    5: optional list<i64> tabletIds
 }
 
 struct TQueryColumn {
@@ -1744,6 +1746,10 @@ struct TGetOlapTableMetaResult {
     4: optional list<i64> removed_partitions
 }
 
+struct TSyncCloudTabletStatsRequest {
+    1: optional binary tablet_stats_pb
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1854,4 +1860,6 @@ service FrontendService {
     TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request)
 
     TGetOlapTableMetaResult getOlapTableMeta(1: TGetOlapTableMetaRequest 
request)
+
+    Status.TStatus syncCloudTabletStats(1: TSyncCloudTabletStatsRequest 
request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to