This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f484fd0a [kv] Support to report rocksdb metrics (#2282)
9f484fd0a is described below

commit 9f484fd0a808f44ae6d41a0f0be262f95b772fd2
Author: Yang Wang <[email protected]>
AuthorDate: Mon Jan 5 23:48:49 2026 +0800

    [kv] Support to report rocksdb metrics (#2282)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  48 +++
 .../java/org/apache/fluss/server/kv/KvManager.java |   1 +
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  39 ++-
 .../apache/fluss/server/kv/rocksdb/RocksDBKv.java  |  23 +-
 .../fluss/server/kv/rocksdb/RocksDBKvBuilder.java  |   7 +-
 .../kv/rocksdb/RocksDBResourceContainer.java       |  29 +-
 .../fluss/server/kv/rocksdb/RocksDBStatistics.java | 376 +++++++++++++++++++++
 .../server/metrics/group/BucketMetricGroup.java    |  71 +++-
 .../server/metrics/group/TableMetricGroup.java     | 117 ++++++-
 .../metrics/group/TabletServerMetricGroup.java     |  19 ++
 .../org/apache/fluss/server/replica/Replica.java   |  10 +
 .../org/apache/fluss/server/kv/KvTabletTest.java   | 184 ++++++++++
 .../kv/rocksdb/RocksDBResourceContainerTest.java   |   3 +-
 .../maintenance/observability/monitor-metrics.md   | 128 +++++++
 14 files changed, 1042 insertions(+), 13 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index b1326d46b..60eb942e8 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -131,6 +131,54 @@ public class MetricNames {
     public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
             "preWriteBufferTruncateAsErrorPerSecond";
 
+    // 
--------------------------------------------------------------------------------------------
+    // RocksDB metrics
+    // 
--------------------------------------------------------------------------------------------
+    // Table-level RocksDB metrics (aggregated from all buckets of a table, 
Max aggregation)
+    /** Maximum write stall duration across all buckets of this table (Max 
aggregation). */
+    public static final String ROCKSDB_WRITE_STALL_MICROS_MAX = 
"rocksdbWriteStallMicrosMax";
+
+    /** Maximum get latency across all buckets of this table (Max 
aggregation). */
+    public static final String ROCKSDB_GET_LATENCY_MICROS_MAX = 
"rocksdbGetLatencyMicrosMax";
+
+    /** Maximum write latency across all buckets of this table (Max 
aggregation). */
+    public static final String ROCKSDB_WRITE_LATENCY_MICROS_MAX = 
"rocksdbWriteLatencyMicrosMax";
+
+    /** Maximum number of L0 files across all buckets of this table (Max 
aggregation). */
+    public static final String ROCKSDB_NUM_FILES_AT_LEVEL0_MAX = 
"rocksdbNumFilesAtLevel0Max";
+
+    /** Maximum flush pending indicator across all buckets of this table (Max 
aggregation). */
+    public static final String ROCKSDB_FLUSH_PENDING_MAX = 
"rocksdbFlushPendingMax";
+
+    /** Maximum compaction pending indicator across all buckets of this table 
(Max aggregation). */
+    public static final String ROCKSDB_COMPACTION_PENDING_MAX = 
"rocksdbCompactionPendingMax";
+
+    /** Maximum compaction time across all buckets of this table (Max 
aggregation). */
+    public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX =
+            "rocksdbCompactionTimeMicrosMax";
+
+    // Table-level RocksDB metrics (aggregated from all buckets of a table, 
Sum aggregation)
+    /** Total bytes read across all buckets of this table (Sum aggregation). */
+    public static final String ROCKSDB_BYTES_READ_TOTAL = 
"rocksdbBytesReadTotal";
+
+    /** Total bytes written across all buckets of this table (Sum 
aggregation). */
+    public static final String ROCKSDB_BYTES_WRITTEN_TOTAL = 
"rocksdbBytesWrittenTotal";
+
+    /** Total flush bytes written across all buckets of this table (Sum 
aggregation). */
+    public static final String ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL = 
"rocksdbFlushBytesWrittenTotal";
+
+    /** Total compaction bytes read across all buckets of this table (Sum 
aggregation). */
+    public static final String ROCKSDB_COMPACTION_BYTES_READ_TOTAL =
+            "rocksdbCompactionBytesReadTotal";
+
+    /** Total compaction bytes written across all buckets of this table (Sum 
aggregation). */
+    public static final String ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL =
+            "rocksdbCompactionBytesWrittenTotal";
+
+    // Server-level RocksDB metrics (aggregated from all tables, Sum 
aggregation)
+    /** Total memory usage across all RocksDB instances in this server (Sum 
aggregation). */
+    public static final String ROCKSDB_MEMORY_USAGE_TOTAL = 
"rocksdbMemoryUsageTotal";
+
     // 
--------------------------------------------------------------------------------------------
     // metrics for table bucket
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index 0986c67f9..1637af39d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -387,6 +387,7 @@ public final class KvManager extends TabletManagerBase 
implements ServerReconfig
                             
currentKvs.get(tableBucket).getKvTabletDir().getAbsolutePath()));
         }
         this.currentKvs.put(tableBucket, kvTablet);
+
         return kvTablet;
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index d6cc08666..0b43f0a31 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -50,6 +50,7 @@ import 
org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
 import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
 import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
 import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
 import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
@@ -118,6 +119,9 @@ public final class KvTablet {
     // the changelog image mode for this tablet
     private final ChangelogImage changelogImage;
 
+    // RocksDB statistics accessor for this tablet
+    @Nullable private final RocksDBStatistics rocksDBStatistics;
+
     /**
      * The kv data in pre-write buffer whose log offset is less than the 
flushedLogOffset has been
      * flushed into kv.
@@ -142,7 +146,8 @@ public final class KvTablet {
             RowMerger rowMerger,
             ArrowCompressionInfo arrowCompressionInfo,
             SchemaGetter schemaGetter,
-            ChangelogImage changelogImage) {
+            ChangelogImage changelogImage,
+            @Nullable RocksDBStatistics rocksDBStatistics) {
         this.physicalPath = physicalPath;
         this.tableBucket = tableBucket;
         this.logTablet = logTablet;
@@ -158,6 +163,7 @@ public final class KvTablet {
         this.arrowCompressionInfo = arrowCompressionInfo;
         this.schemaGetter = schemaGetter;
         this.changelogImage = changelogImage;
+        this.rocksDBStatistics = rocksDBStatistics;
     }
 
     public static KvTablet create(
@@ -177,6 +183,19 @@ public final class KvTablet {
             RateLimiter sharedRateLimiter)
             throws IOException {
         RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, 
sharedRateLimiter);
+
+        // Create RocksDB statistics accessor (will be registered to 
TableMetricGroup by Replica)
+        // Pass ResourceGuard to ensure thread-safe access during concurrent 
close operations
+        // Pass ColumnFamilyHandle for column family specific properties like 
num-files-at-level0
+        // Pass Cache for accurate block cache memory tracking
+        RocksDBStatistics rocksDBStatistics =
+                new RocksDBStatistics(
+                        kv.getDb(),
+                        kv.getStatistics(),
+                        kv.getResourceGuard(),
+                        kv.getDefaultColumnFamilyHandle(),
+                        kv.getBlockCache());
+
         return new KvTablet(
                 tablePath,
                 tableBucket,
@@ -192,14 +211,16 @@ public final class KvTablet {
                 rowMerger,
                 arrowCompressionInfo,
                 schemaGetter,
-                changelogImage);
+                changelogImage,
+                rocksDBStatistics);
     }
 
     private static RocksDBKv buildRocksDBKv(
             Configuration configuration, File kvDir, RateLimiter 
sharedRateLimiter)
             throws IOException {
+        // Enable statistics to support RocksDB statistics collection
         RocksDBResourceContainer rocksDBResourceContainer =
-                new RocksDBResourceContainer(configuration, kvDir, false, 
sharedRateLimiter);
+                new RocksDBResourceContainer(configuration, kvDir, true, 
sharedRateLimiter);
         RocksDBKvBuilder rocksDBKvBuilder =
                 new RocksDBKvBuilder(
                         kvDir,
@@ -225,6 +246,16 @@ public final class KvTablet {
         return kvTabletDir;
     }
 
+    /**
+     * Get RocksDB statistics accessor for this tablet.
+     *
+     * @return the RocksDB statistics accessor, or null if not available
+     */
+    @Nullable
+    public RocksDBStatistics getRocksDBStatistics() {
+        return rocksDBStatistics;
+    }
+
     void setFlushedLogOffset(long flushedLogOffset) {
         this.flushedLogOffset = flushedLogOffset;
     }
@@ -621,6 +652,8 @@ public final class KvTablet {
                     if (isClosed) {
                         return;
                     }
+                    // Note: RocksDB metrics lifecycle is managed by 
TableMetricGroup
+                    // No need to close it here
                     if (rocksDBKv != null) {
                         rocksDBKv.close();
                     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
index 602a4b591..6e45b2396 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
@@ -23,12 +23,14 @@ import org.apache.fluss.server.utils.ResourceGuard;
 import org.apache.fluss.utils.BytesUtils;
 import org.apache.fluss.utils.IOUtils;
 
+import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
 import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
@@ -63,6 +65,9 @@ public class RocksDBKv implements AutoCloseable {
     /** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. 
*/
     protected final RocksDB db;
 
+    /** RocksDB Statistics for metrics collection. */
+    private final @Nullable Statistics statistics;
+
     // mark whether this kv is already closed and prevent duplicate closing
     private volatile boolean closed = false;
 
@@ -70,12 +75,14 @@ public class RocksDBKv implements AutoCloseable {
             RocksDBResourceContainer optionsContainer,
             RocksDB db,
             ResourceGuard rocksDBResourceGuard,
-            ColumnFamilyHandle defaultColumnFamilyHandle) {
+            ColumnFamilyHandle defaultColumnFamilyHandle,
+            @Nullable Statistics statistics) {
         this.optionsContainer = optionsContainer;
         this.db = db;
         this.rocksDBResourceGuard = rocksDBResourceGuard;
         this.writeOptions = optionsContainer.getWriteOptions();
         this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
+        this.statistics = statistics;
     }
 
     public ResourceGuard getResourceGuard() {
@@ -206,4 +213,18 @@ public class RocksDBKv implements AutoCloseable {
     public RocksDB getDb() {
         return db;
     }
+
+    @Nullable
+    public Statistics getStatistics() {
+        return optionsContainer.getStatistics();
+    }
+
+    @Nullable
+    public Cache getBlockCache() {
+        return optionsContainer.getBlockCache();
+    }
+
+    public ColumnFamilyHandle getDefaultColumnFamilyHandle() {
+        return defaultColumnFamilyHandle;
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
index 8fbc0cf95..92e86fb96 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
@@ -107,7 +107,12 @@ public class RocksDBKvBuilder {
             throw new KvBuildingException(errMsg, t);
         }
         LOG.info("Finished building RocksDB kv at {}.", instanceBasePath);
-        return new RocksDBKv(optionsContainer, db, rocksDBResourceGuard, 
defaultColumnFamilyHandle);
+        return new RocksDBKv(
+                optionsContainer,
+                db,
+                rocksDBResourceGuard,
+                defaultColumnFamilyHandle,
+                optionsContainer.getStatistics());
     }
 
     void prepareDirectories() throws IOException {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
index 4495613e6..a07ea1a91 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
@@ -28,11 +28,13 @@ import org.apache.fluss.utils.IOUtils;
 
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
 import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
 import org.rocksdb.PlainTableConfig;
 import org.rocksdb.RateLimiter;
 import org.rocksdb.ReadOptions;
@@ -80,6 +82,12 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
     /** The shared rate limiter for all RocksDB instances. */
     private final RateLimiter sharedRateLimiter;
 
+    /** The statistics object for RocksDB, null if statistics is disabled. */
+    @Nullable private Statistics statistics;
+
+    /** The block cache for RocksDB, shared across column families. */
+    @Nullable private Cache blockCache;
+
     /** The handles to be closed when the container is closed. */
     private final ArrayList<AutoCloseable> handlesToClose;
 
@@ -138,7 +146,7 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
         opt.setRateLimiter(sharedRateLimiter);
 
         if (enableStatistics) {
-            Statistics statistics = new Statistics();
+            statistics = new Statistics();
             opt.setStatistics(statistics);
             handlesToClose.add(statistics);
         }
@@ -146,6 +154,18 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
         return opt;
     }
 
+    /** Gets the Statistics object if statistics is enabled, null otherwise. */
+    @Nullable
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    /** Gets the block cache used by RocksDB, null if not yet initialized. */
+    @Nullable
+    public Cache getBlockCache() {
+        return blockCache;
+    }
+
     /** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances. */
     public ColumnFamilyOptions getColumnOptions() {
         // initial options from common profile
@@ -282,8 +302,11 @@ public class RocksDBResourceContainer implements 
AutoCloseable {
         blockBasedTableConfig.setMetadataBlockSize(
                 
internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes());
 
-        blockBasedTableConfig.setBlockCacheSize(
-                
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes());
+        // Create explicit LRUCache for accurate memory tracking
+        long blockCacheSize = 
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes();
+        blockCache = new LRUCache(blockCacheSize);
+        handlesToClose.add(blockCache);
+        blockBasedTableConfig.setBlockCache(blockCache);
 
         if (internalGetOption(ConfigOptions.KV_USE_BLOOM_FILTER)) {
             final double bitsPerKey = 
internalGetOption(ConfigOptions.KV_BLOOM_FILTER_BITS_PER_KEY);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java
new file mode 100644
index 000000000..fd7a20cd7
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rocksdb;
+
+import org.apache.fluss.server.utils.ResourceGuard;
+
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.HistogramData;
+import org.rocksdb.HistogramType;
+import org.rocksdb.MemoryUsageType;
+import org.rocksdb.MemoryUtil;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.Statistics;
+import org.rocksdb.TickerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Collects and provides access to RocksDB statistics for a single KvTablet.
+ *
+ * <p>This class encapsulates low-level RocksDB statistics collection, 
providing semantic methods to
+ * access various RocksDB statistics and properties. It does NOT register 
Fluss metrics directly;
+ * instead, upper layers (e.g., TableMetricGroup) consume these statistics to 
compute and register
+ * actual Fluss Metrics.
+ *
+ * <p>Thread-safety: This class uses RocksDB's ResourceGuard to ensure safe 
concurrent access. All
+ * statistics read operations acquire the resource guard to prevent accessing 
closed RocksDB
+ * instances.
+ */
+public class RocksDBStatistics implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStatistics.class);
+
+    private final RocksDB db;
+    @Nullable private final Statistics statistics;
+    private final ResourceGuard resourceGuard;
+    private final ColumnFamilyHandle defaultColumnFamilyHandle;
+    @Nullable private final Cache blockCache;
+
+    public RocksDBStatistics(
+            RocksDB db,
+            @Nullable Statistics statistics,
+            ResourceGuard resourceGuard,
+            ColumnFamilyHandle defaultColumnFamilyHandle,
+            @Nullable Cache blockCache) {
+        this.db = db;
+        this.statistics = statistics;
+        this.resourceGuard = resourceGuard;
+        this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
+        this.blockCache = blockCache;
+    }
+
+    // ==================== Ticker-based Metrics ====================
+
+    /**
+     * Get write stall duration in microseconds.
+     *
+     * @return write stall duration, or 0 if not available
+     */
+    public long getWriteStallMicros() {
+        return getTickerValue(TickerType.STALL_MICROS);
+    }
+
+    /**
+     * Get total bytes read.
+     *
+     * @return bytes read, or 0 if not available
+     */
+    public long getBytesRead() {
+        return getTickerValue(TickerType.BYTES_READ);
+    }
+
+    /**
+     * Get total bytes written.
+     *
+     * @return bytes written, or 0 if not available
+     */
+    public long getBytesWritten() {
+        return getTickerValue(TickerType.BYTES_WRITTEN);
+    }
+
+    /**
+     * Get flush bytes written.
+     *
+     * @return flush bytes written, or 0 if not available
+     */
+    public long getFlushBytesWritten() {
+        return getTickerValue(TickerType.FLUSH_WRITE_BYTES);
+    }
+
+    /**
+     * Get compaction bytes read.
+     *
+     * @return compaction bytes read, or 0 if not available
+     */
+    public long getCompactionBytesRead() {
+        return getTickerValue(TickerType.COMPACT_READ_BYTES);
+    }
+
+    /**
+     * Get compaction bytes written.
+     *
+     * @return compaction bytes written, or 0 if not available
+     */
+    public long getCompactionBytesWritten() {
+        return getTickerValue(TickerType.COMPACT_WRITE_BYTES);
+    }
+
+    // ==================== Property-based Metrics ====================
+
+    /**
+     * Get get operation latency in microseconds (P99).
+     *
+     * <p>This uses RocksDB Statistics histogram data to get the P99 latency 
of get operations. P99
+     * is used instead of average because it better reflects tail latency 
issues, which are more
+     * critical for monitoring database performance.
+     *
+     * @return P99 get latency in microseconds, or 0 if not available
+     */
+    public long getGetLatencyMicros() {
+        return getHistogramValue(HistogramType.DB_GET);
+    }
+
+    /**
+     * Get write operation latency in microseconds (P99).
+     *
+     * <p>This uses RocksDB Statistics histogram data to get the P99 latency 
of write operations.
+     * P99 is used instead of average because it better reflects tail latency 
issues, which are more
+     * critical for monitoring database performance.
+     *
+     * @return P99 write latency in microseconds, or 0 if not available
+     */
+    public long getWriteLatencyMicros() {
+        return getHistogramValue(HistogramType.DB_WRITE);
+    }
+
+    /**
+     * Get number of files at level 0.
+     *
+     * <p>This property is column family specific and must be accessed through 
the column family
+     * handle.
+     *
+     * @return number of L0 files, or 0 if not available
+     */
+    public long getNumFilesAtLevel0() {
+        return getPropertyValue(defaultColumnFamilyHandle, 
"rocksdb.num-files-at-level0");
+    }
+
+    /**
+     * Get whether a memtable flush is pending.
+     *
+     * @return 1 if flush is pending, 0 otherwise
+     */
+    public long getFlushPending() {
+        return getPropertyValue("rocksdb.mem-table-flush-pending");
+    }
+
+    /**
+     * Get whether a compaction is pending.
+     *
+     * @return 1 if compaction is pending, 0 otherwise
+     */
+    public long getCompactionPending() {
+        return getPropertyValue("rocksdb.compaction-pending");
+    }
+
+    /**
+     * Get compaction time in microseconds (P99).
+     *
+     * <p>This uses RocksDB Statistics histogram data to get the P99 
compaction time. P99 is used
+     * instead of average because it better reflects tail latency issues in 
compaction operations.
+     *
+     * @return P99 compaction time in microseconds, or 0 if not available
+     */
+    public long getCompactionTimeMicros() {
+        return getHistogramValue(HistogramType.COMPACTION_TIME);
+    }
+
+    /**
+     * Get total memory usage across all RocksDB components including block 
cache, memtables,
+     * indexes, filters, etc.
+     *
+     * <p>This uses RocksDB MemoryUtil to get approximate memory usage by type 
and sums all types.
+     * This includes:
+     *
+     * <ul>
+     *   <li>Block cache usage (if explicit cache is provided)
+     *   <li>All memtables (active and immutable)
+     *   <li>Table readers (indexes and bloom filters)
+     *   <li>Pinned blocks
+     * </ul>
+     *
+     * <p>Note: To get accurate block cache memory usage, an explicit Cache 
object must be provided
+     * during construction. If no cache is provided (null), the block cache 
memory usage may not be
+     * fully accounted for.
+     *
+     * @return total memory usage in bytes, or 0 if not available
+     */
+    public long getTotalMemoryUsage() {
+        try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+            if (db == null) {
+                return 0L;
+            }
+
+            // Create cache set for memory usage calculation.
+            // If blockCache is null, pass null to MemoryUtil (will only count 
memtables, etc.)
+            Set<Cache> caches = null;
+            if (blockCache != null) {
+                caches = new HashSet<>();
+                caches.add(blockCache);
+            }
+
+            Map<MemoryUsageType, Long> memoryUsage =
+                    MemoryUtil.getApproximateMemoryUsageByType(
+                            Collections.singletonList(db), caches);
+            return 
memoryUsage.values().stream().mapToLong(Long::longValue).sum();
+        } catch (Exception e) {
+            LOG.debug(
+                    "Failed to get total memory usage from RocksDB (possibly 
closed or unavailable)",
+                    e);
+            return 0L;
+        }
+    }
+
+    // ==================== Internal Helper Methods ====================
+
+    /**
+     * Get ticker value from RocksDB Statistics with resource guard protection.
+     *
+     * @param tickerType the ticker type to query
+     * @return the ticker value, or 0 if not available or RocksDB is closed
+     */
+    private long getTickerValue(TickerType tickerType) {
+        try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+            if (statistics != null) {
+                return statistics.getTickerCount(tickerType);
+            }
+        } catch (Exception e) {
+            LOG.debug(
+                    "Failed to get ticker {} from RocksDB (possibly closed or 
unavailable)",
+                    tickerType,
+                    e);
+        }
+        return 0L;
+    }
+
+    /**
+     * Get histogram P99 value from RocksDB Statistics with resource guard 
protection.
+     *
+     * <p>Histograms are used for latency metrics and provide average, median, 
percentile values.
+     * This method returns the P99 value (99th percentile) instead of average, 
which better reflects
+     * tail latency and is more useful for performance monitoring. For 
microsecond-level latencies,
+     * we round to the nearest long value to avoid precision loss where it 
matters.
+     *
+     * <p>Why P99 instead of average:
+     *
+     * <ul>
+     *   <li>P99 captures tail latency issues that average would hide
+     *   <li>More aligned with industry best practices for latency monitoring
+     *   <li>Better indicator of user-facing performance problems
+     * </ul>
+     *
+     * @param histogramType the histogram type to query
+     * @return the P99 histogram value (rounded to nearest long), or 0 if not 
available or RocksDB
+     *     is closed
+     */
+    private long getHistogramValue(HistogramType histogramType) {
+        try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+            if (statistics != null) {
+                HistogramData histogramData = 
statistics.getHistogramData(histogramType);
+                if (histogramData != null) {
+                    // Use P99 instead of average for better tail latency 
monitoring
+                    // Round to nearest long to preserve precision for 
microsecond-level values
+                    return Math.round(histogramData.getPercentile99());
+                }
+            }
+        } catch (Exception e) {
+            LOG.debug(
+                    "Failed to get histogram {} from RocksDB Statistics 
(possibly closed or unavailable)",
+                    histogramType,
+                    e);
+        }
+        return 0L;
+    }
+
+    /**
+     * Get property value from RocksDB with resource guard protection.
+     *
+     * @param propertyName the property name to query
+     * @return the property value as long, or 0 if not available or RocksDB is 
closed
+     */
+    private long getPropertyValue(String propertyName) {
+        try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+            String value = db.getProperty(propertyName);
+            if (value != null && !value.isEmpty()) {
+                return Long.parseLong(value);
+            }
+        } catch (RocksDBException e) {
+            LOG.debug(
+                    "Failed to get property {} from RocksDB (possibly closed 
or unavailable)",
+                    propertyName,
+                    e);
+        } catch (NumberFormatException e) {
+            LOG.debug("Failed to parse property {} value as long", 
propertyName, e);
+        } catch (Exception e) {
+            // ResourceGuard may throw exception if RocksDB is closed
+            LOG.debug(
+                    "Failed to access RocksDB for property {} (possibly 
closed)", propertyName, e);
+        }
+        return 0L;
+    }
+
+    /**
+     * Get property value from RocksDB for a specific column family with 
resource guard protection.
+     *
+     * <p>Some RocksDB properties are column family specific and must be 
accessed through the column
+     * family handle.
+     *
+     * @param columnFamilyHandle the column family handle
+     * @param propertyName the property name to query
+     * @return the property value as long, or 0 if not available or RocksDB is 
closed
+     */
+    private long getPropertyValue(ColumnFamilyHandle columnFamilyHandle, 
String propertyName) {
+        try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+            if (columnFamilyHandle == null) {
+                return 0L;
+            }
+            String value = db.getProperty(columnFamilyHandle, propertyName);
+            if (value != null && !value.isEmpty()) {
+                return Long.parseLong(value);
+            }
+        } catch (RocksDBException e) {
+            LOG.debug(
+                    "Failed to get property {} from RocksDB column family 
(possibly closed or unavailable)",
+                    propertyName,
+                    e);
+        } catch (NumberFormatException e) {
+            LOG.debug("Failed to parse property {} value as long", 
propertyName, e);
+        } catch (Exception e) {
+            // ResourceGuard may throw exception if RocksDB is closed
+            LOG.debug(
+                    "Failed to access RocksDB for property {} (possibly 
closed)", propertyName, e);
+        }
+        return 0L;
+    }
+
+    @Override
+    public void close() {
+        // No resources to clean up, statistics are managed by TableMetricGroup
+        LOG.debug("RocksDB statistics accessor closed");
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
index fc8e281a5..ed5b11101 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
@@ -20,6 +20,11 @@ package org.apache.fluss.server.metrics.group;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
+import org.apache.fluss.utils.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -27,12 +32,24 @@ import java.util.Map;
 
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
-/** Metrics for the table buckets with table as parent group. */
+/**
+ * Metrics for the table buckets with table as parent group.
+ *
+ * <p>For KV tables, this class also manages the RocksDB statistics lifecycle. 
The statistics are
+ * registered when KvTablet is initialized and automatically cleaned up when 
this metric group is
+ * closed.
+ */
 public class BucketMetricGroup extends AbstractMetricGroup {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketMetricGroup.class);
+
     // will be null if the bucket doesn't belong to a partition
     private final @Nullable String partitionName;
     private final int bucket;
 
+    // RocksDB statistics for this bucket (null for non-KV tables)
+    private volatile @Nullable RocksDBStatistics rocksDBStatistics;
+
     public BucketMetricGroup(
             MetricRegistry registry,
             @Nullable String partitionName,
@@ -62,4 +79,56 @@ public class BucketMetricGroup extends AbstractMetricGroup {
     public TableMetricGroup getTableMetricGroup() {
         return (TableMetricGroup) parent;
     }
+
+    /**
+     * Register RocksDB statistics for this bucket. This should be called when 
KvTablet is
+     * initialized.
+     *
+     * <p>This method must be paired with {@link 
#unregisterRocksDBStatistics()} to ensure proper
+     * resource cleanup.
+     *
+     * @param statistics the RocksDB statistics collector
+     */
+    public void registerRocksDBStatistics(RocksDBStatistics statistics) {
+        if (this.rocksDBStatistics != null) {
+            LOG.warn(
+                    "RocksDB statistics already registered for bucket {}, this 
may indicate a resource leak",
+                    bucket);
+        }
+        this.rocksDBStatistics = statistics;
+        LOG.debug("Registered RocksDB statistics for bucket {}", bucket);
+    }
+
+    /**
+     * Unregister and close RocksDB statistics for this bucket. This should be 
called when KvTablet
+     * is destroyed.
+     *
+     * <p>This method must be paired with {@link 
#registerRocksDBStatistics(RocksDBStatistics)} to
+     * ensure proper resource cleanup.
+     */
+    public void unregisterRocksDBStatistics() {
+        if (rocksDBStatistics != null) {
+            LOG.debug("Unregistering RocksDB statistics for bucket {}", 
bucket);
+            IOUtils.closeQuietly(rocksDBStatistics);
+            rocksDBStatistics = null;
+        }
+    }
+
+    /**
+     * Get the RocksDB statistics for this bucket.
+     *
+     * @return the RocksDB statistics, or null if not a KV table or not yet 
initialized
+     */
+    @Nullable
+    public RocksDBStatistics getRocksDBStatistics() {
+        return rocksDBStatistics;
+    }
+
+    @Override
+    public void close() {
+        // Clean up RocksDB statistics before closing the metric group
+        // This handles the case when the bucket is removed entirely
+        unregisterRocksDBStatistics();
+        super.close();
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
index 7620bcbfd..fc40463f2 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
@@ -27,11 +27,13 @@ import org.apache.fluss.metrics.NoOpCounter;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
+import org.apache.fluss.utils.MapUtils;
 
 import javax.annotation.Nullable;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
@@ -41,7 +43,7 @@ import static 
org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
  */
 public class TableMetricGroup extends AbstractMetricGroup {
 
-    private final Map<TableBucket, BucketMetricGroup> buckets = new 
HashMap<>();
+    private final Map<TableBucket, BucketMetricGroup> buckets = 
MapUtils.newConcurrentHashMap();
 
     private final TablePath tablePath;
 
@@ -70,6 +72,8 @@ public class TableMetricGroup extends AbstractMetricGroup {
         if (isKvTable) {
             kvMetrics = new KvMetricGroup(this);
             logMetrics = new LogMetricGroup(this, TabletType.CDC_LOG);
+            // Register RocksDB aggregated metrics for kv tables
+            registerRocksDBMetrics();
         } else {
             // otherwise, create log produce metrics
             kvMetrics = null;
@@ -236,17 +240,124 @@ public class TableMetricGroup extends 
AbstractMetricGroup {
 
     public void removeBucketMetricGroup(TableBucket tableBucket) {
         BucketMetricGroup metricGroup = buckets.remove(tableBucket);
-        metricGroup.close();
+        if (metricGroup != null) {
+            // BucketMetricGroup.close() will automatically clean up RocksDB 
statistics
+            metricGroup.close();
+        }
     }
 
     public int bucketGroupsCount() {
         return buckets.size();
     }
 
+    public java.util.Collection<BucketMetricGroup> getBucketMetricGroups() {
+        return buckets.values();
+    }
+
+    /**
+     * Get all RocksDB statistics from bucket metric groups for table-level 
and server-level
+     * aggregation.
+     *
+     * <p>This method dynamically collects statistics from all buckets, 
allowing automatic cleanup
+     * when buckets are removed without maintaining a separate map.
+     *
+     * @return stream of RocksDB statistics from all buckets in this table
+     */
+    public Stream<RocksDBStatistics> allRocksDBStatistics() {
+        return buckets.values().stream()
+                .map(BucketMetricGroup::getRocksDBStatistics)
+                .filter(stats -> stats != null);
+    }
+
     public TabletServerMetricGroup getServerMetricGroup() {
         return (TabletServerMetricGroup) parent;
     }
 
+    /**
+     * Register RocksDB aggregated metrics at table level. These metrics 
aggregate values from all
+     * buckets of this table.
+     *
+     * <p>This method is called once during TableMetricGroup construction for 
KV tables.
+     */
+    private void registerRocksDBMetrics() {
+        // Max aggregation metrics - track the maximum value across all buckets
+        gauge(
+                MetricNames.ROCKSDB_WRITE_STALL_MICROS_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getWriteStallMicros)
+                                .max()
+                                .orElse(0L));
+        gauge(
+                MetricNames.ROCKSDB_GET_LATENCY_MICROS_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getGetLatencyMicros)
+                                .max()
+                                .orElse(0L));
+        gauge(
+                MetricNames.ROCKSDB_WRITE_LATENCY_MICROS_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getWriteLatencyMicros)
+                                .max()
+                                .orElse(0L));
+        gauge(
+                MetricNames.ROCKSDB_NUM_FILES_AT_LEVEL0_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getNumFilesAtLevel0)
+                                .max()
+                                .orElse(0L));
+        gauge(
+                MetricNames.ROCKSDB_FLUSH_PENDING_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                .mapToLong(RocksDBStatistics::getFlushPending)
+                                .max()
+                                .orElse(0L));
+        gauge(
+                MetricNames.ROCKSDB_COMPACTION_PENDING_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getCompactionPending)
+                                .max()
+                                .orElse(0L));
+        gauge(
+                MetricNames.ROCKSDB_COMPACTION_TIME_MICROS_MAX,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getCompactionTimeMicros)
+                                .max()
+                                .orElse(0L));
+
+        // Sum aggregation metrics - track the total value across all buckets
+        gauge(
+                MetricNames.ROCKSDB_BYTES_READ_TOTAL,
+                () -> 
allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesRead).sum());
+        gauge(
+                MetricNames.ROCKSDB_BYTES_WRITTEN_TOTAL,
+                () -> 
allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesWritten).sum());
+        gauge(
+                MetricNames.ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getFlushBytesWritten)
+                                .sum());
+        gauge(
+                MetricNames.ROCKSDB_COMPACTION_BYTES_READ_TOTAL,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getCompactionBytesRead)
+                                .sum());
+        gauge(
+                MetricNames.ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL,
+                () ->
+                        allRocksDBStatistics()
+                                
.mapToLong(RocksDBStatistics::getCompactionBytesWritten)
+                                .sum());
+    }
+
     /** Metric group for specific kind of tablet of a table. */
     private static class TabletMetricGroup extends AbstractMetricGroup {
         private final TabletType tabletType;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
index 59730b69f..f4b2c6b73 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
@@ -30,6 +30,7 @@ import org.apache.fluss.metrics.SimpleCounter;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
 import org.apache.fluss.utils.MapUtils;
 
 import java.util.Map;
@@ -133,6 +134,24 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
         meter(MetricNames.ISR_SHRINKS_RATE, new MeterView(isrShrinks));
         failedIsrUpdates = new SimpleCounter();
         meter(MetricNames.FAILED_ISR_UPDATES_RATE, new 
MeterView(failedIsrUpdates));
+
+        // Register server-level RocksDB aggregated metrics
+        registerServerRocksDBMetrics();
+    }
+
+    /**
+     * Register server-level RocksDB aggregated metrics. These metrics 
aggregate memory usage from
+     * all tables.
+     */
+    private void registerServerRocksDBMetrics() {
+        // Total memory usage across all RocksDB instances in this server.
+        gauge(
+                MetricNames.ROCKSDB_MEMORY_USAGE_TOTAL,
+                () ->
+                        metricGroupByTable.values().stream()
+                                
.flatMap(TableMetricGroup::allRocksDBStatistics)
+                                
.mapToLong(RocksDBStatistics::getTotalMemoryUsage)
+                                .sum());
     }
 
     @Override
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 1669e004d..f86778ebc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -596,6 +596,10 @@ public final class Replica {
             IOUtils.closeQuietly(closeableRegistryForKv);
         }
         if (kvTablet != null) {
+            // Unregister RocksDB statistics before dropping KvTablet
+            // This ensures statistics are cleaned up when KvTablet is 
destroyed
+            bucketMetricGroup.unregisterRocksDBStatistics();
+
             // drop the kv tablet
             checkNotNull(kvManager);
             kvManager.dropKv(tableBucket);
@@ -689,6 +693,12 @@ public final class Replica {
                 physicalPath,
                 tableBucket,
                 endTime - startTime);
+
+        // Register RocksDB statistics to BucketMetricGroup
+        if (kvTablet != null && kvTablet.getRocksDBStatistics() != null) {
+            
bucketMetricGroup.registerRocksDBStatistics(kvTablet.getRocksDBStatistics());
+        }
+
         return optCompletedSnapshot;
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index ec2fab37c..52b1080ed 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -48,12 +48,14 @@ import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
 import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.log.FetchIsolation;
 import org.apache.fluss.server.log.LogAppendInfo;
 import org.apache.fluss.server.log.LogTablet;
 import org.apache.fluss.server.log.LogTestUtils;
 import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
@@ -1343,4 +1345,186 @@ class KvTabletTest {
     private Value valueOf(BinaryRow row) {
         return Value.of(ValueEncoder.encodeValue(schemaId, row));
     }
+
+    @Test
+    void testRocksDBMetrics() throws Exception {
+        // Initialize tablet with schema
+        initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
+
+        // Get RocksDB statistics
+        RocksDBStatistics statistics = kvTablet.getRocksDBStatistics();
+        assertThat(statistics).as("RocksDB statistics should be 
available").isNotNull();
+
+        // Verify statistics is properly initialized
+        org.rocksdb.Statistics stats = kvTablet.getRocksDBKv().getStatistics();
+        assertThat(stats).as("RocksDB Statistics should be 
enabled").isNotNull();
+
+        // All metrics should start at 0 for a fresh database
+        assertThat(statistics.getBytesWritten()).isEqualTo(0);
+        assertThat(statistics.getBytesRead()).isEqualTo(0);
+        assertThat(statistics.getFlushBytesWritten()).isEqualTo(0);
+        assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0);
+        assertThat(statistics.getGetLatencyMicros()).isEqualTo(0);
+        assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0);
+        assertThat(statistics.getFlushPending()).isEqualTo(0);
+        assertThat(statistics.getCompactionPending()).isEqualTo(0);
+        assertThat(statistics.getTotalMemoryUsage())
+                .isGreaterThan(0); // Block cache is pre-allocated
+
+        // ========== Phase 1: Write and Flush ==========
+        int numRecords = 10000;
+        List<KvRecord> rows = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            rows.add(
+                    kvRecordFactory.ofRecord(
+                            String.valueOf(i).getBytes(), new Object[] {i, 
"value-" + i}));
+        }
+        kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+        // After write and flush: must have written data to RocksDB
+        long bytesWrittenAfterFlush = statistics.getBytesWritten();
+        assertThat(bytesWrittenAfterFlush)
+                .as("Must write data to RocksDB after flush")
+                .isGreaterThan(0);
+
+        // Flush must have written bytes (memtable to SST file)
+        long flushBytesWritten = statistics.getFlushBytesWritten();
+        // Note: FLUSH_WRITE_BYTES may not be tracked in all configurations
+        assertThat(flushBytesWritten)
+                .as("Flush bytes written is non-negative")
+                .isGreaterThanOrEqualTo(0);
+
+        // Write latency must be tracked for write operations
+        long writeLatency = statistics.getWriteLatencyMicros();
+        assertThat(writeLatency)
+                .as("Write latency must be > 0 after write operations")
+                .isGreaterThan(0);
+
+        // After flush, there should be at least 1 L0 file (unless immediate 
compaction occurred)
+        long numL0Files = statistics.getNumFilesAtLevel0();
+        assertThat(numL0Files)
+                .as("Should have L0 files after flush (or 0 if compacted)")
+                .isGreaterThanOrEqualTo(0);
+
+        // Flush pending must be 0 after flush completes
+        assertThat(statistics.getFlushPending())
+                .as("No pending flush after completion")
+                .isEqualTo(0);
+
+        // ========== Phase 2: Read Operations ==========
+        List<byte[]> keysToRead = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            keysToRead.add(String.valueOf(i).getBytes());
+        }
+        List<byte[]> readValues = kvTablet.multiGet(keysToRead);
+        assertThat(readValues).hasSize(100);
+
+        // After reads: get latency must be tracked
+        long getLatency = statistics.getGetLatencyMicros();
+        assertThat(getLatency)
+                .as("Get latency must be tracked after read operations")
+                .isGreaterThan(0);
+
+        // Bytes read may increase (depending on cache hits)
+        long bytesRead = statistics.getBytesRead();
+        // Note: bytesRead could be 0 if all data was served from block cache
+        assertThat(bytesRead).as("Bytes read is 
non-negative").isGreaterThanOrEqualTo(0);
+
+        // ========== Phase 3: Write More Data to Trigger Compaction ==========
+        List<KvRecord> moreRows = new ArrayList<>();
+        for (int i = numRecords; i < numRecords * 2; i++) {
+            moreRows.add(
+                    kvRecordFactory.ofRecord(
+                            String.valueOf(i).getBytes(), new Object[] {i, 
"value-" + i}));
+        }
+        kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(moreRows), null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+        // Bytes written must increase with more data
+        long bytesWrittenAfterSecondFlush = statistics.getBytesWritten();
+        assertThat(bytesWrittenAfterSecondFlush)
+                .as("Bytes written must increase with second batch")
+                .isGreaterThan(bytesWrittenAfterFlush);
+
+        // ========== Phase 4: Manual Compaction ==========
+        long compactionBytesReadBefore = statistics.getCompactionBytesRead();
+        long compactionBytesWrittenBefore = 
statistics.getCompactionBytesWritten();
+        long compactionTimeBefore = statistics.getCompactionTimeMicros();
+
+        // Trigger manual compaction
+        try {
+            kvTablet.getRocksDBKv().getDb().compactRange();
+        } catch (Exception e) {
+            // Compaction failure is acceptable in test
+        }
+
+        // After compaction: verify compaction metrics increased
+        long compactionBytesReadAfter = statistics.getCompactionBytesRead();
+        long compactionBytesWrittenAfter = 
statistics.getCompactionBytesWritten();
+        long compactionTimeAfter = statistics.getCompactionTimeMicros();
+
+        // If any compaction occurred, all three metrics should increase
+        boolean compactionOccurred =
+                compactionBytesReadAfter > compactionBytesReadBefore
+                        || compactionBytesWrittenAfter > 
compactionBytesWrittenBefore
+                        || compactionTimeAfter > compactionTimeBefore;
+
+        if (compactionOccurred) {
+            assertThat(compactionBytesReadAfter)
+                    .as("Compaction must read data")
+                    .isGreaterThan(compactionBytesReadBefore);
+            assertThat(compactionBytesWrittenAfter)
+                    .as("Compaction must write data")
+                    .isGreaterThan(compactionBytesWrittenBefore);
+            assertThat(compactionTimeAfter)
+                    .as("Compaction must take time")
+                    .isGreaterThan(compactionTimeBefore);
+        }
+
+        // Compaction pending must be 0 after compaction completes
+        assertThat(statistics.getCompactionPending())
+                .as("No pending compaction after completion")
+                .isEqualTo(0);
+
+        // ========== Phase 5: Verify Final State Before Close ==========
+        // Bytes written must be positive after all operations
+        assertThat(statistics.getBytesWritten())
+                .as("Total bytes written must be positive")
+                .isGreaterThan(0);
+
+        // Write and get latency must be positive (operations occurred)
+        assertThat(statistics.getWriteLatencyMicros())
+                .as("Write latency must be positive after writes")
+                .isGreaterThan(0);
+        assertThat(statistics.getGetLatencyMicros())
+                .as("Get latency must be positive after reads")
+                .isGreaterThan(0);
+
+        // No pending operations
+        assertThat(statistics.getFlushPending()).isEqualTo(0);
+        assertThat(statistics.getCompactionPending()).isEqualTo(0);
+
+        // Memory usage should be reasonable (> 0 due to block cache + 
memtables)
+        assertThat(statistics.getTotalMemoryUsage())
+                .as("Total memory usage must be positive")
+                .isGreaterThan(0);
+
+        // ========== Phase 6: Verify Metrics After Close ==========
+        kvTablet.close();
+
+        // After close: all metrics must return 0 (ResourceGuard protection)
+        assertThat(statistics.getBytesWritten()).isEqualTo(0);
+        assertThat(statistics.getBytesRead()).isEqualTo(0);
+        assertThat(statistics.getFlushBytesWritten()).isEqualTo(0);
+        assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0);
+        assertThat(statistics.getGetLatencyMicros()).isEqualTo(0);
+        assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0);
+        assertThat(statistics.getFlushPending()).isEqualTo(0);
+        assertThat(statistics.getCompactionPending()).isEqualTo(0);
+        assertThat(statistics.getCompactionBytesRead()).isEqualTo(0);
+        assertThat(statistics.getCompactionBytesWritten()).isEqualTo(0);
+        assertThat(statistics.getCompactionTimeMicros()).isEqualTo(0);
+        assertThat(statistics.getTotalMemoryUsage()).isEqualTo(0);
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
index f36853cd4..676a17a6b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
@@ -189,7 +189,8 @@ class RocksDBResourceContainerTest {
                     (BlockBasedTableConfig) columnOptions.tableFormatConfig();
             assertThat(tableConfig.blockSize()).isEqualTo(4 * SizeUnit.KB);
             assertThat(tableConfig.metadataBlockSize()).isEqualTo(8 * 
SizeUnit.KB);
-            assertThat(tableConfig.blockCacheSize()).isEqualTo(512 * 
SizeUnit.MB);
+            // Verify block cache was created with explicit LRUCache for 
memory tracking
+            assertThat(optionsContainer.getBlockCache()).isNotNull();
             assertThat(tableConfig.filterPolicy() instanceof 
BloomFilter).isTrue();
         }
     }
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 2d8ef8077..65109551c 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -825,6 +825,134 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </tbody>
 </table>
 
+### RocksDB
+
+RocksDB metrics provide insights into the performance and health of the 
underlying RocksDB storage engine used by Fluss. These metrics are categorized 
into table-level metrics (aggregated from all buckets of a table) and 
server-level metrics (aggregated from all tables in a server).
+
+#### Table-level RocksDB Metrics (Max Aggregation)
+
+These metrics use Max aggregation to show the maximum value across all buckets 
of a table, which helps identify the worst-performing bucket.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style={{width: '30pt'}}>Scope</th>
+      <th class="text-left" style={{width: '150pt'}}>Infix</th>
+      <th class="text-left" style={{width: '80pt'}}>Metrics</th>
+      <th class="text-left" style={{width: '300pt'}}>Description</th>
+      <th class="text-left" style={{width: '40pt'}}>Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="7"><strong>tabletserver</strong></th>
+      <td rowspan="7">table</td>
+      <td>rocksdbWriteStallMicrosMax</td>
+      <td>Maximum write stall duration across all buckets of this table (in 
microseconds). Write stalls occur when RocksDB needs to slow down writes due to 
compaction pressure or memory limits.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbGetLatencyMicrosMax</td>
+      <td>Maximum get operation latency across all buckets of this table (in 
microseconds). This represents the slowest read operation among all 
buckets.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbWriteLatencyMicrosMax</td>
+      <td>Maximum write operation latency across all buckets of this table (in 
microseconds). This represents the slowest write operation among all 
buckets.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbNumFilesAtLevel0Max</td>
+      <td>Maximum number of L0 files across all buckets of this table. A high 
number of L0 files indicates compaction pressure and may impact read 
performance.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbFlushPendingMax</td>
+      <td>Maximum flush pending indicator across all buckets of this table. A 
value greater than 0 indicates that some buckets have pending flush 
operations.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbCompactionPendingMax</td>
+      <td>Maximum compaction pending indicator across all buckets of this 
table. A value greater than 0 indicates that some buckets have pending 
compaction operations.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbCompactionTimeMicrosMax</td>
+      <td>Maximum compaction time across all buckets of this table (in 
microseconds). This represents the longest compaction operation among all 
buckets.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+#### Table-level RocksDB Metrics (Sum Aggregation)
+
+These metrics use Sum aggregation to show the total value across all buckets 
of a table, providing an overall view of table-level I/O and storage operations.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style={{width: '30pt'}}>Scope</th>
+      <th class="text-left" style={{width: '150pt'}}>Infix</th>
+      <th class="text-left" style={{width: '80pt'}}>Metrics</th>
+      <th class="text-left" style={{width: '300pt'}}>Description</th>
+      <th class="text-left" style={{width: '40pt'}}>Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="5"><strong>tabletserver</strong></th>
+      <td rowspan="5">table</td>
+      <td>rocksdbBytesReadTotal</td>
+      <td>Total bytes read across all buckets of this table. This includes 
both user reads and internal reads (e.g., compaction reads).</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbBytesWrittenTotal</td>
+      <td>Total bytes written across all buckets of this table. This includes 
both user writes and internal writes (e.g., compaction writes).</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbFlushBytesWrittenTotal</td>
+      <td>Total flush bytes written across all buckets of this table. This 
represents the amount of data flushed from memtable to persistent storage.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbCompactionBytesReadTotal</td>
+      <td>Total compaction bytes read across all buckets of this table. This 
represents the amount of data read during compaction operations.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>rocksdbCompactionBytesWrittenTotal</td>
+      <td>Total compaction bytes written across all buckets of this table. 
This represents the amount of data written during compaction operations.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+#### Server-level RocksDB Metrics (Sum Aggregation)
+
+These metrics use Sum aggregation to show the total value across all tables in 
a server, providing a server-wide view of RocksDB resource usage.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style={{width: '30pt'}}>Scope</th>
+      <th class="text-left" style={{width: '150pt'}}>Infix</th>
+      <th class="text-left" style={{width: '80pt'}}>Metrics</th>
+      <th class="text-left" style={{width: '300pt'}}>Description</th>
+      <th class="text-left" style={{width: '40pt'}}>Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="1"><strong>tabletserver</strong></th>
+      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="1">-</td>
+      <td>rocksdbMemoryUsageTotal</td>
+      <td>Total memory usage across all RocksDB instances in this server (in 
bytes). This includes memory used by memtables, block cache, and other RocksDB 
internal structures.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
 
 ### Flink connector standard metrics
 

Reply via email to