This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5095236e6 [metrics] Adding Server-Level Storage Aggregation Metrics 
(#1548)
5095236e6 is described below

commit 5095236e66afc0c0bc49e9d1f393dd82518fd30d
Author: yunhong <[email protected]>
AuthorDate: Wed Sep 17 00:36:15 2025 +0800

    [metrics] Adding Server-Level Storage Aggregation Metrics (#1548)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  17 +-
 .../coordinator/CompletedSnapshotStoreManager.java |  67 ++++++--
 .../coordinator/CoordinatorEventProcessor.java     |  53 ++++++-
 .../server/kv/snapshot/CompletedSnapshotStore.java |   8 +
 .../kv/snapshot/PeriodicSnapshotManager.java       |   9 +-
 .../server/kv/snapshot/SharedKvFileRegistry.java   |  10 ++
 .../org/apache/fluss/server/log/LogTablet.java     |  21 ++-
 .../fluss/server/log/remote/LogTieringTask.java    |   2 +
 .../fluss/server/log/remote/RemoteLogManager.java  |   5 +
 .../fluss/server/log/remote/RemoteLogManifest.java |   8 +
 .../fluss/server/log/remote/RemoteLogTablet.java   |   6 +-
 .../metrics/group/CoordinatorMetricGroup.java      | 172 +++++++++++++++++++++
 .../org/apache/fluss/server/replica/Replica.java   |  38 ++++-
 .../fluss/server/replica/ReplicaManager.java       |  39 +++++
 .../CompletedSnapshotStoreManagerTest.java         |  27 +++-
 .../coordinator/CoordinatorEventProcessorTest.java |   3 +-
 .../maintenance/observability/monitor-metrics.md   |  60 +++++--
 17 files changed, 491 insertions(+), 54 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 7bd21b552..9242b4daa 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -47,6 +47,11 @@ public class MetricNames {
     public static final String EVENT_QUEUE_TIME_MS = "eventQueueTimeMs";
     public static final String EVENT_PROCESSING_TIME_MS = 
"eventProcessingTimeMs";
 
+    // for kv tablet which reported by coordinator
+    public static final String KV_NUM_SNAPSHOTS = "numKvSnapshots";
+    public static final String KV_ALL_SNAPSHOT_SIZE = "allKvSnapshotSize";
+    public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = 
"remoteKvSize";
+
     // 
--------------------------------------------------------------------------------------------
     // metrics for tablet server
     // 
--------------------------------------------------------------------------------------------
@@ -63,6 +68,11 @@ public class MetricNames {
     public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
             "delayedFetchFromClientExpiresPerSecond";
 
+    public static final String SERVER_LOGICAL_STORAGE_LOG_SIZE = "logSize";
+    public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
+    public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE = 
"localSize";
+    public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE = 
"remoteLogSize";
+
     // 
--------------------------------------------------------------------------------------------
     // metrics for table
     // 
--------------------------------------------------------------------------------------------
@@ -121,10 +131,11 @@ public class MetricNames {
     // for log tablet
     public static final String LOG_NUM_SEGMENTS = "numSegments";
     public static final String LOG_END_OFFSET = "endOffset";
-    public static final String LOG_SIZE = "size";
+    public static final String REMOTE_LOG_SIZE = "size";
 
-    // for kv tablet
-    public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
+    // for logic storage
+    public static final String LOCAL_STORAGE_LOG_SIZE = "logSize";
+    public static final String LOCAL_STORAGE_KV_SIZE = "kvSize";
 
     // 
--------------------------------------------------------------------------------------------
     // metrics for rpc client
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
index 0ea715930..b7c92289b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
@@ -19,12 +19,16 @@ package org.apache.fluss.server.coordinator;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
 import org.apache.fluss.server.kv.snapshot.SharedKvFileRegistry;
 import 
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.utils.MapUtils;
 
@@ -60,18 +64,19 @@ public class CompletedSnapshotStoreManager {
     private final Executor ioExecutor;
     private final Function<ZooKeeperClient, CompletedSnapshotHandleStore>
             makeZookeeperCompletedSnapshotHandleStore;
+    private final CoordinatorMetricGroup coordinatorMetricGroup;
 
     public CompletedSnapshotStoreManager(
             int maxNumberOfSnapshotsToRetain,
             Executor ioExecutor,
-            ZooKeeperClient zooKeeperClient) {
-        checkArgument(
-                maxNumberOfSnapshotsToRetain > 0, 
"maxNumberOfSnapshotsToRetain must be positive");
-        this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
-        this.zooKeeperClient = zooKeeperClient;
-        this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
-        this.ioExecutor = ioExecutor;
-        this.makeZookeeperCompletedSnapshotHandleStore = 
ZooKeeperCompletedSnapshotHandleStore::new;
+            ZooKeeperClient zooKeeperClient,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                maxNumberOfSnapshotsToRetain,
+                ioExecutor,
+                zooKeeperClient,
+                ZooKeeperCompletedSnapshotHandleStore::new,
+                coordinatorMetricGroup);
     }
 
     @VisibleForTesting
@@ -80,7 +85,8 @@ public class CompletedSnapshotStoreManager {
             Executor ioExecutor,
             ZooKeeperClient zooKeeperClient,
             Function<ZooKeeperClient, CompletedSnapshotHandleStore>
-                    makeZookeeperCompletedSnapshotHandleStore) {
+                    makeZookeeperCompletedSnapshotHandleStore,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
         checkArgument(
                 maxNumberOfSnapshotsToRetain > 0, 
"maxNumberOfSnapshotsToRetain must be positive");
         this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
@@ -88,9 +94,34 @@ public class CompletedSnapshotStoreManager {
         this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
         this.ioExecutor = ioExecutor;
         this.makeZookeeperCompletedSnapshotHandleStore = 
makeZookeeperCompletedSnapshotHandleStore;
+        this.coordinatorMetricGroup = coordinatorMetricGroup;
+
+        registerMetrics();
     }
 
-    public CompletedSnapshotStore 
getOrCreateCompletedSnapshotStore(TableBucket tableBucket) {
+    private void registerMetrics() {
+        MetricGroup physicalStorage = 
coordinatorMetricGroup.addGroup("physicalStorage");
+        physicalStorage.gauge(
+                MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE,
+                this::physicalStorageRemoteKvSize);
+    }
+
+    private long physicalStorageRemoteKvSize() {
+        return bucketCompletedSnapshotStores.values().stream()
+                .map(CompletedSnapshotStore::getPhysicalStorageRemoteKvSize)
+                .reduce(0L, Long::sum);
+    }
+
+    private long getNumSnapshots(TableBucket tableBucket) {
+        return 
bucketCompletedSnapshotStores.get(tableBucket).getNumSnapshots();
+    }
+
+    private long getAllSnapshotSize(TableBucket tableBucket) {
+        return 
bucketCompletedSnapshotStores.get(tableBucket).getPhysicalStorageRemoteKvSize();
+    }
+
+    public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(
+            TablePath tablePath, TableBucket tableBucket) {
         return bucketCompletedSnapshotStores.computeIfAbsent(
                 tableBucket,
                 (bucket) -> {
@@ -104,6 +135,22 @@ public class CompletedSnapshotStoreManager {
                                 "Created snapshot store for table bucket {} in 
{} ms.",
                                 bucket,
                                 end - start);
+
+                        MetricGroup bucketMetricGroup =
+                                
coordinatorMetricGroup.getTableBucketMetricGroup(
+                                        tablePath, tableBucket);
+                        if (bucketMetricGroup != null) {
+                            LOG.info("Add bucketMetricGroup for tableBucket 
{}.", bucket);
+                            bucketMetricGroup.gauge(
+                                    MetricNames.KV_NUM_SNAPSHOTS, () -> 
getNumSnapshots(bucket));
+                            bucketMetricGroup.gauge(
+                                    MetricNames.KV_ALL_SNAPSHOT_SIZE,
+                                    () -> getAllSnapshotSize(bucket));
+                        } else {
+                            LOG.warn(
+                                    "Failed to add bucketMetricGroup for 
tableBucket {} when creating completed snapshot.",
+                                    bucket);
+                        }
                         return snapshotStore;
                     } catch (Exception e) {
                         throw new RuntimeException(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index ba48a6240..66d52d9fc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -188,7 +188,8 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 new CompletedSnapshotStoreManager(
                         conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS),
                         ioExecutor,
-                        zooKeeperClient);
+                        zooKeeperClient,
+                        coordinatorMetricGroup);
         this.autoPartitionManager = autoPartitionManager;
         this.lakeTableTieringManager = lakeTableTieringManager;
         this.coordinatorMetricGroup = coordinatorMetricGroup;
@@ -456,6 +457,20 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 coordinatorContext.putBucketLeaderAndIsr(tableBucket, 
leaderAndIsr);
             }
         }
+
+        // register table/bucket metrics when initialing context.
+        TablePath tablePath = coordinatorContext.getTablePathById(tableId);
+        if (tablePath != null) {
+            coordinatorMetricGroup.addTableBucketMetricGroup(
+                    PhysicalTablePath.of(
+                            tablePath,
+                            partitionId == null
+                                    ? null
+                                    : 
coordinatorContext.getPartitionName(partitionId)),
+                    tableId,
+                    partitionId,
+                    tableAssignment.getBucketAssignments().keySet());
+        }
     }
 
     private void onShutdown() {
@@ -529,10 +544,10 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
             return;
         }
         TableInfo tableInfo = createTableEvent.getTableInfo();
+        TablePath tablePath = tableInfo.getTablePath();
         coordinatorContext.putTableInfo(tableInfo);
         TableAssignment tableAssignment = 
createTableEvent.getTableAssignment();
-        tableManager.onCreateNewTable(
-                tableInfo.getTablePath(), tableInfo.getTableId(), 
tableAssignment);
+        tableManager.onCreateNewTable(tablePath, tableInfo.getTableId(), 
tableAssignment);
         if (createTableEvent.isAutoPartitionTable()) {
             autoPartitionManager.addAutoPartitionTable(tableInfo, true);
         }
@@ -551,6 +566,14 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                     null,
                     null,
                     tableBuckets);
+
+            // register table metrics.
+            coordinatorMetricGroup.addTableBucketMetricGroup(
+                    PhysicalTablePath.of(tablePath),
+                    tableId,
+                    null,
+                    tableAssignment.getBucketAssignments().keySet());
+
         } else {
             updateTabletServerMetadataCache(
                     new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -568,10 +591,11 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         }
 
         long tableId = createPartitionEvent.getTableId();
+        TablePath tablePath = createPartitionEvent.getTablePath();
         String partitionName = createPartitionEvent.getPartitionName();
         PartitionAssignment partitionAssignment = 
createPartitionEvent.getPartitionAssignment();
         tableManager.onCreateNewPartition(
-                createPartitionEvent.getTablePath(),
+                tablePath,
                 tableId,
                 createPartitionEvent.getPartitionId(),
                 partitionName,
@@ -585,6 +609,14 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 .forEach(
                         bucketId ->
                                 tableBuckets.add(new TableBucket(tableId, 
partitionId, bucketId)));
+
+        // register partition metrics.
+        coordinatorMetricGroup.addTableBucketMetricGroup(
+                PhysicalTablePath.of(tablePath, partitionName),
+                tableId,
+                partitionId,
+                partitionAssignment.getBucketAssignments().keySet());
+
         updateTabletServerMetadataCache(
                 new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
                 null,
@@ -617,6 +649,9 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 tableId,
                 null,
                 Collections.emptySet());
+
+        // remove table metrics.
+        
coordinatorMetricGroup.removeTableMetricGroup(dropTableInfo.getTablePath(), 
tableId);
     }
 
     private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
@@ -644,6 +679,10 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 tableId,
                 tablePartition.getPartitionId(),
                 Collections.emptySet());
+
+        // remove partition metrics.
+        coordinatorMetricGroup.removeTablePartitionMetricsGroup(
+                dropTableInfo.getTablePath(), tableId, 
tablePartition.getPartitionId());
     }
 
     private void processDeleteReplicaResponseReceived(
@@ -990,15 +1029,17 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
             return;
         }
         // commit the kv snapshot asynchronously
+        TableBucket tb = event.getTableBucket();
+        TablePath tablePath = 
coordinatorContext.getTablePathById(tb.getTableId());
         ioExecutor.execute(
                 () -> {
                     try {
-                        TableBucket tb = event.getTableBucket();
                         CompletedSnapshot completedSnapshot =
                                 
event.getAddCompletedSnapshotData().getCompletedSnapshot();
                         // add completed snapshot
                         CompletedSnapshotStore completedSnapshotStore =
-                                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
+                                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+                                        tablePath, tb);
                         // this involves IO operation (ZK), so we do it in 
ioExecutor
                         completedSnapshotStore.add(completedSnapshot);
                         coordinatorEventManager.put(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
index 309e3fcac..3e93cc46a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
@@ -87,6 +87,14 @@ public class CompletedSnapshotStore {
         addSnapshotAndSubsumeOldestOne(completedSnapshot, snapshotsCleaner, () 
-> {});
     }
 
+    public long getPhysicalStorageRemoteKvSize() {
+        return sharedKvFileRegistry.getFileSize();
+    }
+
+    public long getNumSnapshots() {
+        return completedSnapshots.size();
+    }
+
     /**
      * Synchronously writes the new snapshots to snapshot handle store and 
asynchronously removes
      * older ones.
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index f0ac036bf..0c4314469 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -21,8 +21,6 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.fs.FileSystemSafetyNet;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
 import org.apache.fluss.utils.MathUtils;
 import org.apache.fluss.utils.concurrent.Executors;
@@ -124,8 +122,6 @@ public class PeriodicSnapshotManager implements Closeable {
                 periodicSnapshotDelay > 0
                         ? MathUtils.murmurHash(tableBucket.hashCode()) % 
periodicSnapshotDelay
                         : 0;
-
-        registerMetrics(bucketMetricGroup);
     }
 
     public static PeriodicSnapshotManager create(
@@ -156,9 +152,8 @@ public class PeriodicSnapshotManager implements Closeable {
         }
     }
 
-    private void registerMetrics(BucketMetricGroup bucketMetricGroup) {
-        MetricGroup metricGroup = 
bucketMetricGroup.addGroup("kv").addGroup("snapshot");
-        metricGroup.gauge(MetricNames.KV_LATEST_SNAPSHOT_SIZE, 
target::getSnapshotSize);
+    public long getSnapshotSize() {
+        return target.getSnapshotSize();
     }
 
     // schedule thread and asyncOperationsThreadPool can access this method
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
index e2699cb32..9d13d3e0e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
@@ -53,6 +53,9 @@ public class SharedKvFileRegistry implements AutoCloseable {
     /** This flag indicates whether or not the registry is open or if close() 
was called. */
     private boolean open;
 
+    /** The total size of all kv files registered in this registry. */
+    private volatile long fileSize;
+
     /** Executor for async kv deletion. */
     private final Executor asyncDisposalExecutor;
 
@@ -64,6 +67,11 @@ public class SharedKvFileRegistry implements AutoCloseable {
         this.registeredKvEntries = new HashMap<>();
         this.asyncDisposalExecutor = checkNotNull(asyncDisposalExecutor);
         this.open = true;
+        this.fileSize = 0L;
+    }
+
+    public long getFileSize() {
+        return fileSize;
     }
 
     public KvFileHandle registerReference(
@@ -87,6 +95,7 @@ public class SharedKvFileRegistry implements AutoCloseable {
                 LOG.trace("Registered new kv file {} under key {}.", 
newHandle, registrationKey);
                 entry = new SharedKvEntry(newHandle, snapshotID);
                 registeredKvEntries.put(registrationKey, entry);
+                fileSize += newHandle.getSize();
 
                 // no further handling
                 return entry.kvFileHandle;
@@ -134,6 +143,7 @@ public class SharedKvFileRegistry implements AutoCloseable {
                 if (entry.lastUsedSnapshotID < lowestSnapshotID) {
                     subsumed.add(entry.kvFileHandle);
                     it.remove();
+                    fileSize -= entry.kvFileHandle.getSize();
                 }
             }
         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index c23606850..1470081a8 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -117,6 +117,8 @@ public final class LogTablet {
     private volatile long remoteLogStartOffset = Long.MAX_VALUE;
     // tracking the log end offset in remote storage
     private volatile long remoteLogEndOffset = -1L;
+    // tracking the log size in remote storage
+    private volatile long remoteLogSize = 0;
 
     // tracking the log start/end offset in lakehouse storage
     private volatile long lakeTableSnapshotId = -1;
@@ -339,7 +341,20 @@ public final class LogTablet {
         metricGroup.gauge(
                 MetricNames.LOG_NUM_SEGMENTS, () -> 
localLog.getSegments().numberOfSegments());
         metricGroup.gauge(MetricNames.LOG_END_OFFSET, 
localLog::getLocalLogEndOffset);
-        metricGroup.gauge(MetricNames.LOG_SIZE, () -> 
localLog.getSegments().sizeInBytes());
+    }
+
+    public long logSize() {
+        return localLog.getSegments().sizeInBytes();
+    }
+
+    public long logicalStorageSize() {
+        if (remoteLogEndOffset <= 0L) {
+            return localLog.getSegments().sizeInBytes();
+        } else {
+            return 
localLog.getSegments().higherSegments(remoteLogEndOffset).stream()
+                    .mapToLong(LogSegment::getSizeInBytes)
+                    .reduce(remoteLogSize, Long::sum);
+        }
     }
 
     public void updateLeaderEndOffsetSnapshot() {
@@ -470,6 +485,10 @@ public final class LogTablet {
         }
     }
 
+    public void updateRemoteLogSize(long remoteLogSize) {
+        this.remoteLogSize = remoteLogSize;
+    }
+
     public void updateRemoteLogEndOffset(long remoteLogEndOffset) {
         if (remoteLogEndOffset > this.remoteLogEndOffset) {
             this.remoteLogEndOffset = remoteLogEndOffset;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 0e3666509..6df83faa4 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -322,6 +322,7 @@ public class LogTieringTask implements Runnable {
         // to try to commit this snapshot.
         long newRemoteLogStartOffset = 
newRemoteLogManifest.getRemoteLogStartOffset();
         long newRemoteLogEndOffset = 
newRemoteLogManifest.getRemoteLogEndOffset();
+        long newRemoteLogSize = newRemoteLogManifest.getRemoteLogSize();
         int retrySendCommitTimes = 1;
         while (retrySendCommitTimes <= 10) {
             try {
@@ -356,6 +357,7 @@ public class LogTieringTask implements Runnable {
                     
logTablet.updateRemoteLogStartOffset(newRemoteLogStartOffset);
                     // make the local log cleaner clean log segments that are 
committed to remote.
                     logTablet.updateRemoteLogEndOffset(newRemoteLogEndOffset);
+                    logTablet.updateRemoteLogSize(newRemoteLogSize);
                     return true;
                 }
             } catch (Exception e) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index cc23be0be..d963f4fcf 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -150,6 +150,7 @@ public class RemoteLogManager implements Closeable {
         }
         
remoteLog.getRemoteLogEndOffset().ifPresent(log::updateRemoteLogEndOffset);
         log.updateRemoteLogStartOffset(remoteLog.getRemoteLogStartOffset());
+        log.updateRemoteLogSize(remoteLog.getRemoteSizeInBytes());
         // leader needs to register the remote log metrics
         remoteLog.registerMetrics(replica.bucketMetrics());
         remoteLogs.put(tableBucket, remoteLog);
@@ -158,6 +159,10 @@ public class RemoteLogManager implements Closeable {
         LOG.debug("Added the remote log tiering task for replica {}", 
tableBucket);
     }
 
+    public long getRemoteLogSize() {
+        return 
remoteLogs.values().stream().mapToLong(RemoteLogTablet::getRemoteSizeInBytes).sum();
+    }
+
     /** Stop the log tiering task for the given replica. */
     public void stopLogTiering(Replica replica) {
         if (remoteDisabled()) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
index c6bc380e7..e1478dec4 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
@@ -96,6 +96,14 @@ public class RemoteLogManifest {
         return endOffset;
     }
 
+    public long getRemoteLogSize() {
+        long size = 0;
+        for (RemoteLogSegment remoteLogSegment : remoteLogSegmentList) {
+            size += remoteLogSegment.segmentSizeInBytes();
+        }
+        return size;
+    }
+
     public byte[] toJsonBytes() {
         return RemoteLogManifestJsonSerde.toJson(this);
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
index 017ef9383..75892ed55 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
@@ -122,11 +122,15 @@ public class RemoteLogTablet {
                     MetricGroup metricGroup = 
bucketMetricGroup.addGroup("remoteLog");
                     metricGroup.gauge(MetricNames.LOG_NUM_SEGMENTS, () -> 
numRemoteLogSegments);
                     metricGroup.gauge(MetricNames.LOG_END_OFFSET, () -> 
remoteLogEndOffset);
-                    metricGroup.gauge(MetricNames.LOG_SIZE, () -> 
remoteSizeInBytes);
+                    metricGroup.gauge(MetricNames.REMOTE_LOG_SIZE, 
this::getRemoteSizeInBytes);
                     remoteLogMetrics = metricGroup;
                 });
     }
 
+    public long getRemoteSizeInBytes() {
+        return remoteSizeInBytes;
+    }
+
     public void unregisterMetrics() {
         inWriteLock(
                 lock,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
index 251fb462b..4045fbae7 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
@@ -17,19 +17,34 @@
 
 package org.apache.fluss.server.metrics.group;
 
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
 import org.apache.fluss.utils.MapUtils;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
 /** The metric group for coordinator server. */
 public class CoordinatorMetricGroup extends AbstractMetricGroup {
 
     private static final String NAME = "coordinator";
 
+    private final Map<TablePath, SimpleTableMetricGroup> metricGroupByTable =
+            MapUtils.newConcurrentHashMap();
+
     protected final String clusterId;
     protected final String hostname;
     protected final String serverId;
@@ -62,4 +77,161 @@ public class CoordinatorMetricGroup extends 
AbstractMetricGroup {
         return eventMetricGroups.computeIfAbsent(
                 eventClass, e -> new CoordinatorEventMetricGroup(registry, 
eventClass, this));
     }
+
+    // ------------------------------------------------------------------------
+    //  table buckets groups
+    // ------------------------------------------------------------------------
+
+    public @Nullable MetricGroup getTableBucketMetricGroup(
+            TablePath tablePath, TableBucket tableBucket) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.get(tablePath);
+        if (tableMetricGroup == null) {
+            return null;
+        }
+        return tableMetricGroup.buckets.get(tableBucket);
+    }
+
+    public void addTableBucketMetricGroup(
+            PhysicalTablePath physicalTablePath,
+            long tableId,
+            @Nullable Long partitionId,
+            Set<Integer> buckets) {
+        TablePath tablePath = physicalTablePath.getTablePath();
+        SimpleTableMetricGroup tableMetricGroup =
+                metricGroupByTable.computeIfAbsent(
+                        tablePath, table -> new 
SimpleTableMetricGroup(registry, tablePath, this));
+        buckets.forEach(
+                bucket ->
+                        tableMetricGroup.addBucketMetricGroup(
+                                physicalTablePath.getPartitionName(),
+                                new TableBucket(tableId, partitionId, 
bucket)));
+    }
+
+    public void removeTableMetricGroup(TablePath tablePath, long tableId) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.remove(tablePath);
+        if (tableMetricGroup != null) {
+            tableMetricGroup.removeBucketMetricsGroupForTable(tableId);
+            tableMetricGroup.close();
+        }
+    }
+
+    public void removeTablePartitionMetricsGroup(
+            TablePath tablePath, long tableId, long partitionId) {
+        SimpleTableMetricGroup tableMetricGroup = 
metricGroupByTable.get(tablePath);
+        if (tableMetricGroup != null) {
+            tableMetricGroup.removeBucketMetricsGroupForPartition(tableId, 
partitionId);
+        }
+    }
+
+    /** The metric group for table. */
+    private static class SimpleTableMetricGroup extends AbstractMetricGroup {
+
+        private final Map<TableBucket, SimpleBucketMetricGroup> buckets = new 
HashMap<>();
+
+        private final TablePath tablePath;
+
+        private final MetricRegistry registry;
+
+        public SimpleTableMetricGroup(
+                MetricRegistry registry,
+                TablePath tablePath,
+                AbstractMetricGroup serverMetricGroup) {
+            super(
+                    registry,
+                    makeScope(
+                            serverMetricGroup,
+                            tablePath.getDatabaseName(),
+                            tablePath.getTableName()),
+                    serverMetricGroup);
+
+            this.tablePath = tablePath;
+            this.registry = registry;
+        }
+
+        @Override
+        protected void putVariables(Map<String, String> variables) {
+            variables.put("database", tablePath.getDatabaseName());
+            variables.put("table", tablePath.getTableName());
+        }
+
+        @Override
+        protected String getGroupName(CharacterFilter filter) {
+            // partition and table share same logic group name
+            return "table";
+        }
+
+        // 
------------------------------------------------------------------------
+        //  bucket groups
+        // 
------------------------------------------------------------------------
+        public void addBucketMetricGroup(@Nullable String partitionName, 
TableBucket tableBucket) {
+            buckets.computeIfAbsent(
+                    tableBucket,
+                    (bucket) ->
+                            new SimpleBucketMetricGroup(
+                                    registry, partitionName, 
tableBucket.getBucket(), this));
+        }
+
+        public void removeBucketMetricsGroupForTable(long tableId) {
+            List<TableBucket> tableBuckets = new ArrayList<>();
+            buckets.forEach(
+                    (tableBucket, bucketMetricGroup) -> {
+                        if (tableBucket.getTableId() == tableId) {
+                            tableBuckets.add(tableBucket);
+                        }
+                    });
+            tableBuckets.forEach(this::removeBucketMetricGroup);
+        }
+
+        public void removeBucketMetricsGroupForPartition(long tableId, long 
partitionId) {
+            List<TableBucket> tableBuckets = new ArrayList<>();
+            buckets.forEach(
+                    (tableBucket, bucketMetricGroup) -> {
+                        Long bucketPartitionId = tableBucket.getPartitionId();
+                        if (tableBucket.getTableId() == tableId
+                                && bucketPartitionId != null
+                                && bucketPartitionId == partitionId) {
+                            tableBuckets.add(tableBucket);
+                        }
+                    });
+            tableBuckets.forEach(this::removeBucketMetricGroup);
+        }
+
+        public void removeBucketMetricGroup(TableBucket tb) {
+            SimpleBucketMetricGroup metricGroup = buckets.remove(tb);
+            metricGroup.close();
+        }
+    }
+
+    /** The metric group for bucket. */
+    private static class SimpleBucketMetricGroup extends AbstractMetricGroup {
+        // will be null if the bucket doesn't belong to a partition
+        private final @Nullable String partitionName;
+        private final int bucket;
+
+        public SimpleBucketMetricGroup(
+                MetricRegistry registry,
+                @Nullable String partitionName,
+                int bucket,
+                SimpleTableMetricGroup parent) {
+            super(registry, makeScope(parent, String.valueOf(bucket)), parent);
+            this.partitionName = partitionName;
+            this.bucket = bucket;
+        }
+
+        @Override
+        protected void putVariables(Map<String, String> variables) {
+            if (partitionName != null) {
+                variables.put("partition", partitionName);
+            } else {
+                // value of empty string indicates non-partitioned tables
+                variables.put("partition", "");
+            }
+            variables.put("bucket", String.valueOf(bucket));
+        }
+
+        @Override
+        protected String getGroupName(CharacterFilter filter) {
+            return "bucket";
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 44e44bfe6..90c1630f8 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -38,6 +38,8 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.record.DefaultValueRecordBatch;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.LogRecords;
@@ -192,6 +194,7 @@ public final class Replica {
     // null if table without pk or haven't become leader
     private volatile @Nullable KvTablet kvTablet;
     private volatile @Nullable CloseableRegistry closeableRegistryForKv;
+    private @Nullable PeriodicSnapshotManager kvSnapshotManager;
 
     // ------- metrics
     private Counter isrShrinks;
@@ -250,6 +253,31 @@ public final class Replica {
         isrExpands = serverMetrics.isrExpands();
         isrShrinks = serverMetrics.isrShrinks();
         failedIsrUpdates = serverMetrics.failedIsrUpdates();
+
+        // logical storage metrics.
+        MetricGroup logicalStorageMetrics = 
bucketMetricGroup.addGroup("logicalStorage");
+        logicalStorageMetrics.gauge(
+                MetricNames.LOCAL_STORAGE_LOG_SIZE, 
this::logicalStorageLogSize);
+        logicalStorageMetrics.gauge(MetricNames.LOCAL_STORAGE_KV_SIZE, 
this::logicalStorageKvSize);
+    }
+
+    public long logicalStorageLogSize() {
+        if (isLeader()) {
+            return logTablet.logicalStorageSize();
+        } else {
+            // follower doesn't need to report the logical storage size.
+            return 0L;
+        }
+    }
+
+    public long logicalStorageKvSize() {
+        if (isLeader() && isKvTable()) {
+            checkNotNull(kvSnapshotManager, "kvSnapshotManager is null");
+            return kvSnapshotManager.getSnapshotSize();
+        } else {
+            // follower doesn't need to report the logical storage size.
+            return 0L;
+        }
     }
 
     public boolean isKvTable() {
@@ -792,7 +820,7 @@ public final class Replica {
                             coordinatorEpochSupplier,
                             lastCompletedSnapshotLogOffset,
                             snapshotSize);
-            PeriodicSnapshotManager kvSnapshotManager =
+            this.kvSnapshotManager =
                     PeriodicSnapshotManager.create(
                             tableBucket,
                             kvTabletSnapshotTarget,
@@ -806,6 +834,14 @@ public final class Replica {
         }
     }
 
+    public long getLatestKvSnapshotSize() {
+        if (kvSnapshotManager == null) {
+            return 0L;
+        } else {
+            return kvSnapshotManager.getSnapshotSize();
+        }
+    }
+
     public long getLeaderEndOffsetSnapshot() {
         return logTablet.getLeaderEndOffsetSnapshot();
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 692bd4c2a..876e8e984 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -34,6 +34,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.remote.RemoteLogFetchInfo;
@@ -306,6 +307,19 @@ public class ReplicaManager {
         serverMetricGroup.gauge(MetricNames.UNDER_REPLICATED, 
this::underReplicatedCount);
         serverMetricGroup.gauge(MetricNames.UNDER_MIN_ISR, 
this::underMinIsrCount);
         serverMetricGroup.gauge(MetricNames.AT_MIN_ISR, this::atMinIsrCount);
+
+        MetricGroup logicalStorage = 
serverMetricGroup.addGroup("logicalStorage");
+        logicalStorage.gauge(
+                MetricNames.SERVER_LOGICAL_STORAGE_LOG_SIZE, 
this::logicalStorageLogSize);
+        logicalStorage.gauge(
+                MetricNames.SERVER_LOGICAL_STORAGE_KV_SIZE, 
this::logicalStorageKvSize);
+
+        MetricGroup physicalStorage = 
serverMetricGroup.addGroup("physicalStorage");
+        physicalStorage.gauge(
+                MetricNames.SERVER_PHYSICAL_STORAGE_LOCAL_SIZE, 
this::physicalStorageLocalSize);
+        physicalStorage.gauge(
+                MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE,
+                this::physicalStorageRemoteLogSize);
     }
 
     private Stream<Replica> onlineReplicas() {
@@ -338,6 +352,31 @@ public class ReplicaManager {
         return onlineReplicas().map(Replica::writerIdCount).reduce(0, 
Integer::sum);
     }
 
+    private long logicalStorageLogSize() {
+        return onlineReplicas().map(Replica::logicalStorageLogSize).reduce(0L, 
Long::sum);
+    }
+
+    private long logicalStorageKvSize() {
+        return onlineReplicas().map(Replica::logicalStorageKvSize).reduce(0L, 
Long::sum);
+    }
+
+    private long physicalStorageLocalSize() {
+        return onlineReplicas()
+                .mapToLong(
+                        replica -> {
+                            long size = replica.getLogTablet().logSize();
+                            if (replica.isKvTable()) {
+                                size += replica.getLatestKvSnapshotSize();
+                            }
+                            return size;
+                        })
+                .reduce(0L, Long::sum);
+    }
+
+    private long physicalStorageRemoteLogSize() {
+        return remoteLogManager.getRemoteLogSize();
+    }
+
     /**
      * Receive a request to make these replicas to become leader or follower, 
if the replica doesn't
      * exit, we will create it.
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
index 4ed6617fc..b8fec59ca 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
 import org.apache.fluss.server.kv.snapshot.TestingCompletedSnapshotHandle;
 import 
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.server.testutils.KvTestUtils;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
@@ -53,6 +54,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CompletedSnapshotStoreManager}. */
@@ -117,7 +119,8 @@ class CompletedSnapshotStoreManagerTest {
             // check has retain number of snapshots
             assertThat(
                             completedSnapshotStoreManager
-                                    
.getOrCreateCompletedSnapshotStore(tableBucket)
+                                    .getOrCreateCompletedSnapshotStore(
+                                            DATA1_TABLE_PATH, tableBucket)
                                     .getAllSnapshots())
                     .hasSize(maxNumberOfSnapshotsToRetain);
         }
@@ -144,7 +147,8 @@ class CompletedSnapshotStoreManagerTest {
             // check has retain number of snapshots
             assertThat(
                             completedSnapshotStoreManager
-                                    
.getOrCreateCompletedSnapshotStore(tableBucket)
+                                    .getOrCreateCompletedSnapshotStore(
+                                            DATA1_TABLE_PATH, tableBucket)
                                     .getAllSnapshots())
                     .hasSize(maxNumberOfSnapshotsToRetain);
         }
@@ -153,7 +157,7 @@ class CompletedSnapshotStoreManagerTest {
         TableBucket nonExistBucket = new TableBucket(10, 100);
         assertThat(
                         completedSnapshotStoreManager
-                                
.getOrCreateCompletedSnapshotStore(nonExistBucket)
+                                
.getOrCreateCompletedSnapshotStore(DATA1_TABLE_PATH, nonExistBucket)
                                 .getAllSnapshots())
                 .hasSize(0);
     }
@@ -206,11 +210,13 @@ class CompletedSnapshotStoreManagerTest {
                         10,
                         ioExecutor,
                         zookeeperClient,
-                        zooKeeperClient -> completedSnapshotHandleStore);
+                        zooKeeperClient -> completedSnapshotHandleStore,
+                        TestingMetricGroups.COORDINATOR_METRICS);
 
         // Verify that only the valid snapshot remains
         CompletedSnapshotStore completedSnapshotStore =
-                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
+                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+                        DATA1_TABLE_PATH, tableBucket);
         assertThat(completedSnapshotStore.getAllSnapshots()).hasSize(1);
         
assertThat(completedSnapshotStore.getAllSnapshots().get(0).getSnapshotID()).isEqualTo(1L);
     }
@@ -218,13 +224,17 @@ class CompletedSnapshotStoreManagerTest {
     private CompletedSnapshotStoreManager createCompletedSnapshotStoreManager(
             int maxNumberOfSnapshotsToRetain) {
         return new CompletedSnapshotStoreManager(
-                maxNumberOfSnapshotsToRetain, ioExecutor, zookeeperClient);
+                maxNumberOfSnapshotsToRetain,
+                ioExecutor,
+                zookeeperClient,
+                TestingMetricGroups.COORDINATOR_METRICS);
     }
 
     private CompletedSnapshot getLatestCompletedSnapshot(
             CompletedSnapshotStoreManager completedSnapshotStoreManager, 
TableBucket tableBucket) {
         CompletedSnapshotStore completedSnapshotStore =
-                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
+                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+                        DATA1_TABLE_PATH, tableBucket);
         return completedSnapshotStore.getLatestSnapshot().get();
     }
 
@@ -234,7 +244,8 @@ class CompletedSnapshotStoreManagerTest {
             throws Exception {
         TableBucket tableBucket = completedSnapshot.getTableBucket();
         CompletedSnapshotStore completedSnapshotStore =
-                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
+                
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+                        DATA1_TABLE_PATH, tableBucket);
         completedSnapshotStore.add(completedSnapshot);
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 82cd8f1a3..09a833e41 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -221,7 +221,7 @@ class CoordinatorEventProcessorTest {
         // mock CompletedSnapshotStore
         for (TableBucket tableBucket : allTableBuckets(t1Id, nBuckets)) {
             completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
-                    new TableBucket(tableBucket.getTableId(), 
tableBucket.getBucket()));
+                    t1, new TableBucket(tableBucket.getTableId(), 
tableBucket.getBucket()));
         }
         
assertThat(completedSnapshotStoreManager.getBucketCompletedSnapshotStores()).isNotEmpty();
 
@@ -606,6 +606,7 @@ class CoordinatorEventProcessorTest {
         // mock CompletedSnapshotStore for partition1
         for (TableBucket tableBucket : allTableBuckets(tableId, partition1Id, 
nBuckets)) {
             completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+                    tablePath,
                     new TableBucket(
                             tableBucket.getTableId(),
                             tableBucket.getPartitionId(),
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index a01f9e67c..6b01cceda 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="9"><strong>coordinator</strong></th>
+      <th rowspan="12"><strong>coordinator</strong></th>
       <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="7">-</td>
       <td>activeCoordinatorCount</td>
       <td>The number of active CoordinatorServer in this cluster.</td>
@@ -331,7 +331,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>Histogram</td>
     </tr>
     <tr>
-      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="2">event</td>
+      <td rowspan="2">event</td>
       <td>eventQueueSize</td>
       <td>The number of events currently waiting to be processed in the 
coordinator event queue. This metric is labeled with <code>event_type</code> to 
distinguish between different types of coordinator events.</td>
       <td>Gauge</td>
@@ -341,6 +341,23 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The time that an event took to be processed by the coordinator event 
processor. This metric is labeled with <code>event_type</code> to distinguish 
between different types of coordinator events.</td>
       <td>Histogram</td>
     </tr>
+    <tr>
+      <td rowspan="1">physicalStorage</td>
+      <td>remoteKvSize</td>
+      <td>The physical storage size of remote KV store.</td>
+      <td>Gauge</td>
+    </tr>
+     <tr>
+      <td rowspan="2">table_bucket</td>
+      <td>numKvSnapshots</td>
+      <td>number of kv snapshots of each table bucket.</td>
+      <td>Gauge</td>
+    </tr>
+     <tr>
+      <td>allKvSnapshotSize</td>
+      <td>all kv snapshot size of each table bucket.</td>
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
@@ -358,7 +375,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="25"><strong>tabletserver</strong></th>
+      <th rowspan="29"><strong>tabletserver</strong></th>
       <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="25">-</td>
       <td>messagesInPerSecond</td>
       <td>The number of messages written per second to this server.</td>
@@ -484,6 +501,28 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The number of kv pre-write buffer truncate due to the error happened 
when writing cdc to log per second.</td>
       <td>Meter</td>
     </tr>
+    <tr>
+      <td rowspan="2">logicalStorage</td>
+      <td>logSize</td>
+      <td>The logical storage size of log managed by this TabletServer.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>kvSize</td>
+      <td>The logical storage size of kv managed by this TabletServer.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td rowspan="2">physicalStorage</td>
+      <td>localSize</td>
+      <td>The physical local storage size of this TabletServer.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>remoteLogSize</td>
+      <td>The physical remote log size managed by this TabletServer.</td>
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
@@ -607,7 +646,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="30"><strong>tabletserver</strong></th>
+      <th rowspan="28"><strong>tabletserver</strong></th>
       <td rowspan="20">table</td>
       <td>messagesInPerSecond</td>
       <td>The number of messages written per second to this table.</td>
@@ -709,7 +748,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>Meter</td>
     </tr>
      <tr>
-      <td rowspan="3">table_bucket_log</td>
+      <td rowspan="2">table_bucket_log</td>
       <td>numSegments</td>
       <td>The number of segments in local storage for this table bucket.</td>
       <td>Gauge</td>
@@ -718,11 +757,6 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>endOffset</td>
       <td>The end offset in local storage for this table bucket.</td>
       <td>Gauge</td>
-    </tr>
-     <tr>
-      <td>size</td>
-      <td>The total log sizes in local storage for this table bucket.</td>
-      <td>Gauge</td>
     </tr>
     <tr>
       <td rowspan="3">table_bucket_remoteLog</td>
@@ -740,12 +774,6 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The number of bytes written per second to this table.</td>
       <td>Gauge</td>
     </tr>
-    <tr>
-      <td rowspan="1">table_bucket_kv_snapshot</td>
-      <td>latestSnapshotSize</td>
-      <td>The latest kv snapshot size in bytes for this table bucket.</td>
-      <td>Gauge</td>
-    </tr>
   </tbody>
 </table>
 

Reply via email to