This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9f484fd0a [kv] Support to report rocksdb metrics (#2282)
9f484fd0a is described below
commit 9f484fd0a808f44ae6d41a0f0be262f95b772fd2
Author: Yang Wang <[email protected]>
AuthorDate: Mon Jan 5 23:48:49 2026 +0800
[kv] Support to report rocksdb metrics (#2282)
---
.../java/org/apache/fluss/metrics/MetricNames.java | 48 +++
.../java/org/apache/fluss/server/kv/KvManager.java | 1 +
.../java/org/apache/fluss/server/kv/KvTablet.java | 39 ++-
.../apache/fluss/server/kv/rocksdb/RocksDBKv.java | 23 +-
.../fluss/server/kv/rocksdb/RocksDBKvBuilder.java | 7 +-
.../kv/rocksdb/RocksDBResourceContainer.java | 29 +-
.../fluss/server/kv/rocksdb/RocksDBStatistics.java | 376 +++++++++++++++++++++
.../server/metrics/group/BucketMetricGroup.java | 71 +++-
.../server/metrics/group/TableMetricGroup.java | 117 ++++++-
.../metrics/group/TabletServerMetricGroup.java | 19 ++
.../org/apache/fluss/server/replica/Replica.java | 10 +
.../org/apache/fluss/server/kv/KvTabletTest.java | 184 ++++++++++
.../kv/rocksdb/RocksDBResourceContainerTest.java | 3 +-
.../maintenance/observability/monitor-metrics.md | 128 +++++++
14 files changed, 1042 insertions(+), 13 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index b1326d46b..60eb942e8 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -131,6 +131,54 @@ public class MetricNames {
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
"preWriteBufferTruncateAsErrorPerSecond";
+ //
--------------------------------------------------------------------------------------------
+ // RocksDB metrics
+ //
--------------------------------------------------------------------------------------------
+ // Table-level RocksDB metrics (aggregated from all buckets of a table,
Max aggregation)
+ /** Maximum write stall duration across all buckets of this table (Max
aggregation). */
+ public static final String ROCKSDB_WRITE_STALL_MICROS_MAX =
"rocksdbWriteStallMicrosMax";
+
+ /** Maximum get latency across all buckets of this table (Max
aggregation). */
+ public static final String ROCKSDB_GET_LATENCY_MICROS_MAX =
"rocksdbGetLatencyMicrosMax";
+
+ /** Maximum write latency across all buckets of this table (Max
aggregation). */
+ public static final String ROCKSDB_WRITE_LATENCY_MICROS_MAX =
"rocksdbWriteLatencyMicrosMax";
+
+ /** Maximum number of L0 files across all buckets of this table (Max
aggregation). */
+ public static final String ROCKSDB_NUM_FILES_AT_LEVEL0_MAX =
"rocksdbNumFilesAtLevel0Max";
+
+ /** Maximum flush pending indicator across all buckets of this table (Max
aggregation). */
+ public static final String ROCKSDB_FLUSH_PENDING_MAX =
"rocksdbFlushPendingMax";
+
+ /** Maximum compaction pending indicator across all buckets of this table
(Max aggregation). */
+ public static final String ROCKSDB_COMPACTION_PENDING_MAX =
"rocksdbCompactionPendingMax";
+
+ /** Maximum compaction time across all buckets of this table (Max
aggregation). */
+ public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX =
+ "rocksdbCompactionTimeMicrosMax";
+
+ // Table-level RocksDB metrics (aggregated from all buckets of a table,
Sum aggregation)
+ /** Total bytes read across all buckets of this table (Sum aggregation). */
+ public static final String ROCKSDB_BYTES_READ_TOTAL =
"rocksdbBytesReadTotal";
+
+ /** Total bytes written across all buckets of this table (Sum
aggregation). */
+ public static final String ROCKSDB_BYTES_WRITTEN_TOTAL =
"rocksdbBytesWrittenTotal";
+
+ /** Total flush bytes written across all buckets of this table (Sum
aggregation). */
+ public static final String ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL =
"rocksdbFlushBytesWrittenTotal";
+
+ /** Total compaction bytes read across all buckets of this table (Sum
aggregation). */
+ public static final String ROCKSDB_COMPACTION_BYTES_READ_TOTAL =
+ "rocksdbCompactionBytesReadTotal";
+
+ /** Total compaction bytes written across all buckets of this table (Sum
aggregation). */
+ public static final String ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL =
+ "rocksdbCompactionBytesWrittenTotal";
+
+ // Server-level RocksDB metrics (aggregated from all tables, Sum
aggregation)
+ /** Total memory usage across all RocksDB instances in this server (Sum
aggregation). */
+ public static final String ROCKSDB_MEMORY_USAGE_TOTAL =
"rocksdbMemoryUsageTotal";
+
//
--------------------------------------------------------------------------------------------
// metrics for table bucket
//
--------------------------------------------------------------------------------------------
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index 0986c67f9..1637af39d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -387,6 +387,7 @@ public final class KvManager extends TabletManagerBase
implements ServerReconfig
currentKvs.get(tableBucket).getKvTabletDir().getAbsolutePath()));
}
this.currentKvs.put(tableBucket, kvTablet);
+
return kvTablet;
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index d6cc08666..0b43f0a31 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -50,6 +50,7 @@ import
org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.kv.snapshot.KvFileHandleAndLocalPath;
@@ -118,6 +119,9 @@ public final class KvTablet {
// the changelog image mode for this tablet
private final ChangelogImage changelogImage;
+ // RocksDB statistics accessor for this tablet
+ @Nullable private final RocksDBStatistics rocksDBStatistics;
+
/**
* The kv data in pre-write buffer whose log offset is less than the
flushedLogOffset has been
* flushed into kv.
@@ -142,7 +146,8 @@ public final class KvTablet {
RowMerger rowMerger,
ArrowCompressionInfo arrowCompressionInfo,
SchemaGetter schemaGetter,
- ChangelogImage changelogImage) {
+ ChangelogImage changelogImage,
+ @Nullable RocksDBStatistics rocksDBStatistics) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
@@ -158,6 +163,7 @@ public final class KvTablet {
this.arrowCompressionInfo = arrowCompressionInfo;
this.schemaGetter = schemaGetter;
this.changelogImage = changelogImage;
+ this.rocksDBStatistics = rocksDBStatistics;
}
public static KvTablet create(
@@ -177,6 +183,19 @@ public final class KvTablet {
RateLimiter sharedRateLimiter)
throws IOException {
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir,
sharedRateLimiter);
+
+ // Create RocksDB statistics accessor (will be registered to
TableMetricGroup by Replica)
+ // Pass ResourceGuard to ensure thread-safe access during concurrent
close operations
+ // Pass ColumnFamilyHandle for column family specific properties like
num-files-at-level0
+ // Pass Cache for accurate block cache memory tracking
+ RocksDBStatistics rocksDBStatistics =
+ new RocksDBStatistics(
+ kv.getDb(),
+ kv.getStatistics(),
+ kv.getResourceGuard(),
+ kv.getDefaultColumnFamilyHandle(),
+ kv.getBlockCache());
+
return new KvTablet(
tablePath,
tableBucket,
@@ -192,14 +211,16 @@ public final class KvTablet {
rowMerger,
arrowCompressionInfo,
schemaGetter,
- changelogImage);
+ changelogImage,
+ rocksDBStatistics);
}
private static RocksDBKv buildRocksDBKv(
Configuration configuration, File kvDir, RateLimiter
sharedRateLimiter)
throws IOException {
+ // Enable statistics to support RocksDB statistics collection
RocksDBResourceContainer rocksDBResourceContainer =
- new RocksDBResourceContainer(configuration, kvDir, false,
sharedRateLimiter);
+ new RocksDBResourceContainer(configuration, kvDir, true,
sharedRateLimiter);
RocksDBKvBuilder rocksDBKvBuilder =
new RocksDBKvBuilder(
kvDir,
@@ -225,6 +246,16 @@ public final class KvTablet {
return kvTabletDir;
}
+ /**
+ * Get RocksDB statistics accessor for this tablet.
+ *
+ * @return the RocksDB statistics accessor, or null if not available
+ */
+ @Nullable
+ public RocksDBStatistics getRocksDBStatistics() {
+ return rocksDBStatistics;
+ }
+
void setFlushedLogOffset(long flushedLogOffset) {
this.flushedLogOffset = flushedLogOffset;
}
@@ -621,6 +652,8 @@ public final class KvTablet {
if (isClosed) {
return;
}
+ // Note: RocksDB metrics lifecycle is managed by
TableMetricGroup
+ // No need to close it here
if (rocksDBKv != null) {
rocksDBKv.close();
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
index 602a4b591..6e45b2396 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java
@@ -23,12 +23,14 @@ import org.apache.fluss.server.utils.ResourceGuard;
import org.apache.fluss.utils.BytesUtils;
import org.apache.fluss.utils.IOUtils;
+import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
import org.rocksdb.WriteOptions;
import javax.annotation.Nullable;
@@ -63,6 +65,9 @@ public class RocksDBKv implements AutoCloseable {
/** Our RocksDB database. Currently, one kv tablet, one RocksDB instance.
*/
protected final RocksDB db;
+ /** RocksDB Statistics for metrics collection. */
+ private final @Nullable Statistics statistics;
+
// mark whether this kv is already closed and prevent duplicate closing
private volatile boolean closed = false;
@@ -70,12 +75,14 @@ public class RocksDBKv implements AutoCloseable {
RocksDBResourceContainer optionsContainer,
RocksDB db,
ResourceGuard rocksDBResourceGuard,
- ColumnFamilyHandle defaultColumnFamilyHandle) {
+ ColumnFamilyHandle defaultColumnFamilyHandle,
+ @Nullable Statistics statistics) {
this.optionsContainer = optionsContainer;
this.db = db;
this.rocksDBResourceGuard = rocksDBResourceGuard;
this.writeOptions = optionsContainer.getWriteOptions();
this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
+ this.statistics = statistics;
}
public ResourceGuard getResourceGuard() {
@@ -206,4 +213,18 @@ public class RocksDBKv implements AutoCloseable {
public RocksDB getDb() {
return db;
}
+
+ @Nullable
+ public Statistics getStatistics() {
+ return optionsContainer.getStatistics();
+ }
+
+ @Nullable
+ public Cache getBlockCache() {
+ return optionsContainer.getBlockCache();
+ }
+
+ public ColumnFamilyHandle getDefaultColumnFamilyHandle() {
+ return defaultColumnFamilyHandle;
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
index 8fbc0cf95..92e86fb96 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java
@@ -107,7 +107,12 @@ public class RocksDBKvBuilder {
throw new KvBuildingException(errMsg, t);
}
LOG.info("Finished building RocksDB kv at {}.", instanceBasePath);
- return new RocksDBKv(optionsContainer, db, rocksDBResourceGuard,
defaultColumnFamilyHandle);
+ return new RocksDBKv(
+ optionsContainer,
+ db,
+ rocksDBResourceGuard,
+ defaultColumnFamilyHandle,
+ optionsContainer.getStatistics());
}
void prepareDirectories() throws IOException {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
index 4495613e6..a07ea1a91 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java
@@ -28,11 +28,13 @@ import org.apache.fluss.utils.IOUtils;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.RateLimiter;
import org.rocksdb.ReadOptions;
@@ -80,6 +82,12 @@ public class RocksDBResourceContainer implements
AutoCloseable {
/** The shared rate limiter for all RocksDB instances. */
private final RateLimiter sharedRateLimiter;
+ /** The statistics object for RocksDB, null if statistics is disabled. */
+ @Nullable private Statistics statistics;
+
+ /** The block cache for RocksDB, shared across column families. */
+ @Nullable private Cache blockCache;
+
/** The handles to be closed when the container is closed. */
private final ArrayList<AutoCloseable> handlesToClose;
@@ -138,7 +146,7 @@ public class RocksDBResourceContainer implements
AutoCloseable {
opt.setRateLimiter(sharedRateLimiter);
if (enableStatistics) {
- Statistics statistics = new Statistics();
+ statistics = new Statistics();
opt.setStatistics(statistics);
handlesToClose.add(statistics);
}
@@ -146,6 +154,18 @@ public class RocksDBResourceContainer implements
AutoCloseable {
return opt;
}
+ /** Gets the Statistics object if statistics is enabled, null otherwise. */
+ @Nullable
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ /** Gets the block cache used by RocksDB, null if not yet initialized. */
+ @Nullable
+ public Cache getBlockCache() {
+ return blockCache;
+ }
+
/** Gets the RocksDB {@link ColumnFamilyOptions} to be used for all
RocksDB instances. */
public ColumnFamilyOptions getColumnOptions() {
// initial options from common profile
@@ -282,8 +302,11 @@ public class RocksDBResourceContainer implements
AutoCloseable {
blockBasedTableConfig.setMetadataBlockSize(
internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes());
- blockBasedTableConfig.setBlockCacheSize(
-
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes());
+ // Create explicit LRUCache for accurate memory tracking
+ long blockCacheSize =
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes();
+ blockCache = new LRUCache(blockCacheSize);
+ handlesToClose.add(blockCache);
+ blockBasedTableConfig.setBlockCache(blockCache);
if (internalGetOption(ConfigOptions.KV_USE_BLOOM_FILTER)) {
final double bitsPerKey =
internalGetOption(ConfigOptions.KV_BLOOM_FILTER_BITS_PER_KEY);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java
new file mode 100644
index 000000000..fd7a20cd7
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBStatistics.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.rocksdb;
+
+import org.apache.fluss.server.utils.ResourceGuard;
+
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.HistogramData;
+import org.rocksdb.HistogramType;
+import org.rocksdb.MemoryUsageType;
+import org.rocksdb.MemoryUtil;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.Statistics;
+import org.rocksdb.TickerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Collects and provides access to RocksDB statistics for a single KvTablet.
+ *
+ * <p>This class encapsulates low-level RocksDB statistics collection,
providing semantic methods to
+ * access various RocksDB statistics and properties. It does NOT register
Fluss metrics directly;
+ * instead, upper layers (e.g., TableMetricGroup) consume these statistics to
compute and register
+ * actual Fluss Metrics.
+ *
+ * <p>Thread-safety: This class uses RocksDB's ResourceGuard to ensure safe
concurrent access. All
+ * statistics read operations acquire the resource guard to prevent accessing
closed RocksDB
+ * instances.
+ */
+public class RocksDBStatistics implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RocksDBStatistics.class);
+
+ private final RocksDB db;
+ @Nullable private final Statistics statistics;
+ private final ResourceGuard resourceGuard;
+ private final ColumnFamilyHandle defaultColumnFamilyHandle;
+ @Nullable private final Cache blockCache;
+
+ public RocksDBStatistics(
+ RocksDB db,
+ @Nullable Statistics statistics,
+ ResourceGuard resourceGuard,
+ ColumnFamilyHandle defaultColumnFamilyHandle,
+ @Nullable Cache blockCache) {
+ this.db = db;
+ this.statistics = statistics;
+ this.resourceGuard = resourceGuard;
+ this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
+ this.blockCache = blockCache;
+ }
+
+ // ==================== Ticker-based Metrics ====================
+
+ /**
+ * Get write stall duration in microseconds.
+ *
+ * @return write stall duration, or 0 if not available
+ */
+ public long getWriteStallMicros() {
+ return getTickerValue(TickerType.STALL_MICROS);
+ }
+
+ /**
+ * Get total bytes read.
+ *
+ * @return bytes read, or 0 if not available
+ */
+ public long getBytesRead() {
+ return getTickerValue(TickerType.BYTES_READ);
+ }
+
+ /**
+ * Get total bytes written.
+ *
+ * @return bytes written, or 0 if not available
+ */
+ public long getBytesWritten() {
+ return getTickerValue(TickerType.BYTES_WRITTEN);
+ }
+
+ /**
+ * Get flush bytes written.
+ *
+ * @return flush bytes written, or 0 if not available
+ */
+ public long getFlushBytesWritten() {
+ return getTickerValue(TickerType.FLUSH_WRITE_BYTES);
+ }
+
+ /**
+ * Get compaction bytes read.
+ *
+ * @return compaction bytes read, or 0 if not available
+ */
+ public long getCompactionBytesRead() {
+ return getTickerValue(TickerType.COMPACT_READ_BYTES);
+ }
+
+ /**
+ * Get compaction bytes written.
+ *
+ * @return compaction bytes written, or 0 if not available
+ */
+ public long getCompactionBytesWritten() {
+ return getTickerValue(TickerType.COMPACT_WRITE_BYTES);
+ }
+
+ // ==================== Property-based Metrics ====================
+
+ /**
+ * Get get operation latency in microseconds (P99).
+ *
+ * <p>This uses RocksDB Statistics histogram data to get the P99 latency
of get operations. P99
+ * is used instead of average because it better reflects tail latency
issues, which are more
+ * critical for monitoring database performance.
+ *
+ * @return P99 get latency in microseconds, or 0 if not available
+ */
+ public long getGetLatencyMicros() {
+ return getHistogramValue(HistogramType.DB_GET);
+ }
+
+ /**
+ * Get write operation latency in microseconds (P99).
+ *
+ * <p>This uses RocksDB Statistics histogram data to get the P99 latency
of write operations.
+ * P99 is used instead of average because it better reflects tail latency
issues, which are more
+ * critical for monitoring database performance.
+ *
+ * @return P99 write latency in microseconds, or 0 if not available
+ */
+ public long getWriteLatencyMicros() {
+ return getHistogramValue(HistogramType.DB_WRITE);
+ }
+
+ /**
+ * Get number of files at level 0.
+ *
+ * <p>This property is column family specific and must be accessed through
the column family
+ * handle.
+ *
+ * @return number of L0 files, or 0 if not available
+ */
+ public long getNumFilesAtLevel0() {
+ return getPropertyValue(defaultColumnFamilyHandle,
"rocksdb.num-files-at-level0");
+ }
+
+ /**
+ * Get whether a memtable flush is pending.
+ *
+ * @return 1 if flush is pending, 0 otherwise
+ */
+ public long getFlushPending() {
+ return getPropertyValue("rocksdb.mem-table-flush-pending");
+ }
+
+ /**
+ * Get whether a compaction is pending.
+ *
+ * @return 1 if compaction is pending, 0 otherwise
+ */
+ public long getCompactionPending() {
+ return getPropertyValue("rocksdb.compaction-pending");
+ }
+
+ /**
+ * Get compaction time in microseconds (P99).
+ *
+ * <p>This uses RocksDB Statistics histogram data to get the P99
compaction time. P99 is used
+ * instead of average because it better reflects tail latency issues in
compaction operations.
+ *
+ * @return P99 compaction time in microseconds, or 0 if not available
+ */
+ public long getCompactionTimeMicros() {
+ return getHistogramValue(HistogramType.COMPACTION_TIME);
+ }
+
+ /**
+ * Get total memory usage across all RocksDB components including block
cache, memtables,
+ * indexes, filters, etc.
+ *
+ * <p>This uses RocksDB MemoryUtil to get approximate memory usage by type
and sums all types.
+ * This includes:
+ *
+ * <ul>
+ * <li>Block cache usage (if explicit cache is provided)
+ * <li>All memtables (active and immutable)
+ * <li>Table readers (indexes and bloom filters)
+ * <li>Pinned blocks
+ * </ul>
+ *
+ * <p>Note: To get accurate block cache memory usage, an explicit Cache
object must be provided
+ * during construction. If no cache is provided (null), the block cache
memory usage may not be
+ * fully accounted for.
+ *
+ * @return total memory usage in bytes, or 0 if not available
+ */
+ public long getTotalMemoryUsage() {
+ try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+ if (db == null) {
+ return 0L;
+ }
+
+ // Create cache set for memory usage calculation.
+ // If blockCache is null, pass null to MemoryUtil (will only count
memtables, etc.)
+ Set<Cache> caches = null;
+ if (blockCache != null) {
+ caches = new HashSet<>();
+ caches.add(blockCache);
+ }
+
+ Map<MemoryUsageType, Long> memoryUsage =
+ MemoryUtil.getApproximateMemoryUsageByType(
+ Collections.singletonList(db), caches);
+ return
memoryUsage.values().stream().mapToLong(Long::longValue).sum();
+ } catch (Exception e) {
+ LOG.debug(
+ "Failed to get total memory usage from RocksDB (possibly
closed or unavailable)",
+ e);
+ return 0L;
+ }
+ }
+
+ // ==================== Internal Helper Methods ====================
+
+ /**
+ * Get ticker value from RocksDB Statistics with resource guard protection.
+ *
+ * @param tickerType the ticker type to query
+ * @return the ticker value, or 0 if not available or RocksDB is closed
+ */
+ private long getTickerValue(TickerType tickerType) {
+ try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+ if (statistics != null) {
+ return statistics.getTickerCount(tickerType);
+ }
+ } catch (Exception e) {
+ LOG.debug(
+ "Failed to get ticker {} from RocksDB (possibly closed or
unavailable)",
+ tickerType,
+ e);
+ }
+ return 0L;
+ }
+
+ /**
+ * Get histogram P99 value from RocksDB Statistics with resource guard
protection.
+ *
+ * <p>Histograms are used for latency metrics and provide average, median,
percentile values.
+ * This method returns the P99 value (99th percentile) instead of average,
which better reflects
+ * tail latency and is more useful for performance monitoring. For
microsecond-level latencies,
+ * we round to the nearest long value to avoid precision loss where it
matters.
+ *
+ * <p>Why P99 instead of average:
+ *
+ * <ul>
+ * <li>P99 captures tail latency issues that average would hide
+ * <li>More aligned with industry best practices for latency monitoring
+ * <li>Better indicator of user-facing performance problems
+ * </ul>
+ *
+ * @param histogramType the histogram type to query
+ * @return the P99 histogram value (rounded to nearest long), or 0 if not
available or RocksDB
+ * is closed
+ */
+ private long getHistogramValue(HistogramType histogramType) {
+ try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+ if (statistics != null) {
+ HistogramData histogramData =
statistics.getHistogramData(histogramType);
+ if (histogramData != null) {
+ // Use P99 instead of average for better tail latency
monitoring
+ // Round to nearest long to preserve precision for
microsecond-level values
+ return Math.round(histogramData.getPercentile99());
+ }
+ }
+ } catch (Exception e) {
+ LOG.debug(
+ "Failed to get histogram {} from RocksDB Statistics
(possibly closed or unavailable)",
+ histogramType,
+ e);
+ }
+ return 0L;
+ }
+
+ /**
+ * Get property value from RocksDB with resource guard protection.
+ *
+ * @param propertyName the property name to query
+ * @return the property value as long, or 0 if not available or RocksDB is
closed
+ */
+ private long getPropertyValue(String propertyName) {
+ try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+ String value = db.getProperty(propertyName);
+ if (value != null && !value.isEmpty()) {
+ return Long.parseLong(value);
+ }
+ } catch (RocksDBException e) {
+ LOG.debug(
+ "Failed to get property {} from RocksDB (possibly closed
or unavailable)",
+ propertyName,
+ e);
+ } catch (NumberFormatException e) {
+ LOG.debug("Failed to parse property {} value as long",
propertyName, e);
+ } catch (Exception e) {
+ // ResourceGuard may throw exception if RocksDB is closed
+ LOG.debug(
+ "Failed to access RocksDB for property {} (possibly
closed)", propertyName, e);
+ }
+ return 0L;
+ }
+
+ /**
+ * Get property value from RocksDB for a specific column family with
resource guard protection.
+ *
+ * <p>Some RocksDB properties are column family specific and must be
accessed through the column
+ * family handle.
+ *
+ * @param columnFamilyHandle the column family handle
+ * @param propertyName the property name to query
+ * @return the property value as long, or 0 if not available or RocksDB is
closed
+ */
+ private long getPropertyValue(ColumnFamilyHandle columnFamilyHandle,
String propertyName) {
+ try (ResourceGuard.Lease lease = resourceGuard.acquireResource()) {
+ if (columnFamilyHandle == null) {
+ return 0L;
+ }
+ String value = db.getProperty(columnFamilyHandle, propertyName);
+ if (value != null && !value.isEmpty()) {
+ return Long.parseLong(value);
+ }
+ } catch (RocksDBException e) {
+ LOG.debug(
+ "Failed to get property {} from RocksDB column family
(possibly closed or unavailable)",
+ propertyName,
+ e);
+ } catch (NumberFormatException e) {
+ LOG.debug("Failed to parse property {} value as long",
propertyName, e);
+ } catch (Exception e) {
+ // ResourceGuard may throw exception if RocksDB is closed
+ LOG.debug(
+ "Failed to access RocksDB for property {} (possibly
closed)", propertyName, e);
+ }
+ return 0L;
+ }
+
+ @Override
+ public void close() {
+ // No resources to clean up, statistics are managed by TableMetricGroup
+ LOG.debug("RocksDB statistics accessor closed");
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
index fc8e281a5..ed5b11101 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
@@ -20,6 +20,11 @@ package org.apache.fluss.server.metrics.group;
import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
+import org.apache.fluss.utils.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -27,12 +32,24 @@ import java.util.Map;
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
-/** Metrics for the table buckets with table as parent group. */
+/**
+ * Metrics for the table buckets with table as parent group.
+ *
+ * <p>For KV tables, this class also manages the RocksDB statistics lifecycle.
The statistics are
+ * registered when KvTablet is initialized and automatically cleaned up when
this metric group is
+ * closed.
+ */
public class BucketMetricGroup extends AbstractMetricGroup {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BucketMetricGroup.class);
+
// will be null if the bucket doesn't belong to a partition
private final @Nullable String partitionName;
private final int bucket;
+ // RocksDB statistics for this bucket (null for non-KV tables)
+ private volatile @Nullable RocksDBStatistics rocksDBStatistics;
+
public BucketMetricGroup(
MetricRegistry registry,
@Nullable String partitionName,
@@ -62,4 +79,56 @@ public class BucketMetricGroup extends AbstractMetricGroup {
public TableMetricGroup getTableMetricGroup() {
return (TableMetricGroup) parent;
}
+
+ /**
+ * Register RocksDB statistics for this bucket. This should be called when
KvTablet is
+ * initialized.
+ *
+ * <p>This method must be paired with {@link
#unregisterRocksDBStatistics()} to ensure proper
+ * resource cleanup.
+ *
+ * @param statistics the RocksDB statistics collector
+ */
+ public void registerRocksDBStatistics(RocksDBStatistics statistics) {
+ if (this.rocksDBStatistics != null) {
+ LOG.warn(
+ "RocksDB statistics already registered for bucket {}, this
may indicate a resource leak",
+ bucket);
+ }
+ this.rocksDBStatistics = statistics;
+ LOG.debug("Registered RocksDB statistics for bucket {}", bucket);
+ }
+
+ /**
+ * Unregister and close RocksDB statistics for this bucket. This should be
called when KvTablet
+ * is destroyed.
+ *
+ * <p>This method must be paired with {@link
#registerRocksDBStatistics(RocksDBStatistics)} to
+ * ensure proper resource cleanup.
+ */
+ public void unregisterRocksDBStatistics() {
+ if (rocksDBStatistics != null) {
+ LOG.debug("Unregistering RocksDB statistics for bucket {}",
bucket);
+ IOUtils.closeQuietly(rocksDBStatistics);
+ rocksDBStatistics = null;
+ }
+ }
+
+ /**
+ * Get the RocksDB statistics for this bucket.
+ *
+ * @return the RocksDB statistics, or null if not a KV table or not yet
initialized
+ */
+ @Nullable
+ public RocksDBStatistics getRocksDBStatistics() {
+ return rocksDBStatistics;
+ }
+
+ @Override
+ public void close() {
+ // Clean up RocksDB statistics before closing the metric group
+ // This handles the case when the bucket is removed entirely
+ unregisterRocksDBStatistics();
+ super.close();
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
index 7620bcbfd..fc40463f2 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
@@ -27,11 +27,13 @@ import org.apache.fluss.metrics.NoOpCounter;
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
+import org.apache.fluss.utils.MapUtils;
import javax.annotation.Nullable;
-import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Stream;
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
@@ -41,7 +43,7 @@ import static
org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
*/
public class TableMetricGroup extends AbstractMetricGroup {
- private final Map<TableBucket, BucketMetricGroup> buckets = new
HashMap<>();
+ private final Map<TableBucket, BucketMetricGroup> buckets =
MapUtils.newConcurrentHashMap();
private final TablePath tablePath;
@@ -70,6 +72,8 @@ public class TableMetricGroup extends AbstractMetricGroup {
if (isKvTable) {
kvMetrics = new KvMetricGroup(this);
logMetrics = new LogMetricGroup(this, TabletType.CDC_LOG);
+ // Register RocksDB aggregated metrics for kv tables
+ registerRocksDBMetrics();
} else {
// otherwise, create log produce metrics
kvMetrics = null;
@@ -236,17 +240,124 @@ public class TableMetricGroup extends
AbstractMetricGroup {
public void removeBucketMetricGroup(TableBucket tableBucket) {
BucketMetricGroup metricGroup = buckets.remove(tableBucket);
- metricGroup.close();
+ if (metricGroup != null) {
+ // BucketMetricGroup.close() will automatically clean up RocksDB
statistics
+ metricGroup.close();
+ }
}
public int bucketGroupsCount() {
return buckets.size();
}
+ public java.util.Collection<BucketMetricGroup> getBucketMetricGroups() {
+ return buckets.values();
+ }
+
+ /**
+ * Get all RocksDB statistics from bucket metric groups for table-level
and server-level
+ * aggregation.
+ *
+ * <p>This method dynamically collects statistics from all buckets,
allowing automatic cleanup
+ * when buckets are removed without maintaining a separate map.
+ *
+ * @return stream of RocksDB statistics from all buckets in this table
+ */
+ public Stream<RocksDBStatistics> allRocksDBStatistics() {
+ return buckets.values().stream()
+ .map(BucketMetricGroup::getRocksDBStatistics)
+ .filter(stats -> stats != null);
+ }
+
public TabletServerMetricGroup getServerMetricGroup() {
return (TabletServerMetricGroup) parent;
}
+ /**
+ * Register RocksDB aggregated metrics at table level. These metrics
aggregate values from all
+ * buckets of this table.
+ *
+ * <p>This method is called once during TableMetricGroup construction for
KV tables.
+ */
+ private void registerRocksDBMetrics() {
+ // Max aggregation metrics - track the maximum value across all buckets
+ gauge(
+ MetricNames.ROCKSDB_WRITE_STALL_MICROS_MAX,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getWriteStallMicros)
+ .max()
+ .orElse(0L));
+ gauge(
+ MetricNames.ROCKSDB_GET_LATENCY_MICROS_MAX,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getGetLatencyMicros)
+ .max()
+ .orElse(0L));
+ gauge(
+ MetricNames.ROCKSDB_WRITE_LATENCY_MICROS_MAX,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getWriteLatencyMicros)
+ .max()
+ .orElse(0L));
+ gauge(
+ MetricNames.ROCKSDB_NUM_FILES_AT_LEVEL0_MAX,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getNumFilesAtLevel0)
+ .max()
+ .orElse(0L));
+ gauge(
+ MetricNames.ROCKSDB_FLUSH_PENDING_MAX,
+ () ->
+ allRocksDBStatistics()
+ .mapToLong(RocksDBStatistics::getFlushPending)
+ .max()
+ .orElse(0L));
+ gauge(
+ MetricNames.ROCKSDB_COMPACTION_PENDING_MAX,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getCompactionPending)
+ .max()
+ .orElse(0L));
+ gauge(
+ MetricNames.ROCKSDB_COMPACTION_TIME_MICROS_MAX,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getCompactionTimeMicros)
+ .max()
+ .orElse(0L));
+
+ // Sum aggregation metrics - track the total value across all buckets
+ gauge(
+ MetricNames.ROCKSDB_BYTES_READ_TOTAL,
+ () ->
allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesRead).sum());
+ gauge(
+ MetricNames.ROCKSDB_BYTES_WRITTEN_TOTAL,
+ () ->
allRocksDBStatistics().mapToLong(RocksDBStatistics::getBytesWritten).sum());
+ gauge(
+ MetricNames.ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getFlushBytesWritten)
+ .sum());
+ gauge(
+ MetricNames.ROCKSDB_COMPACTION_BYTES_READ_TOTAL,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getCompactionBytesRead)
+ .sum());
+ gauge(
+ MetricNames.ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL,
+ () ->
+ allRocksDBStatistics()
+
.mapToLong(RocksDBStatistics::getCompactionBytesWritten)
+ .sum());
+ }
+
/** Metric group for specific kind of tablet of a table. */
private static class TabletMetricGroup extends AbstractMetricGroup {
private final TabletType tabletType;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
index 59730b69f..f4b2c6b73 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
@@ -30,6 +30,7 @@ import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
import org.apache.fluss.utils.MapUtils;
import java.util.Map;
@@ -133,6 +134,24 @@ public class TabletServerMetricGroup extends
AbstractMetricGroup {
meter(MetricNames.ISR_SHRINKS_RATE, new MeterView(isrShrinks));
failedIsrUpdates = new SimpleCounter();
meter(MetricNames.FAILED_ISR_UPDATES_RATE, new
MeterView(failedIsrUpdates));
+
+ // Register server-level RocksDB aggregated metrics
+ registerServerRocksDBMetrics();
+ }
+
+ /**
+ * Register server-level RocksDB aggregated metrics. These metrics
aggregate memory usage from
+ * all tables.
+ */
+ private void registerServerRocksDBMetrics() {
+ // Total memory usage across all RocksDB instances in this server.
+ gauge(
+ MetricNames.ROCKSDB_MEMORY_USAGE_TOTAL,
+ () ->
+ metricGroupByTable.values().stream()
+
.flatMap(TableMetricGroup::allRocksDBStatistics)
+
.mapToLong(RocksDBStatistics::getTotalMemoryUsage)
+ .sum());
}
@Override
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 1669e004d..f86778ebc 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -596,6 +596,10 @@ public final class Replica {
IOUtils.closeQuietly(closeableRegistryForKv);
}
if (kvTablet != null) {
+ // Unregister RocksDB statistics before dropping KvTablet
+ // This ensures statistics are cleaned up when KvTablet is
destroyed
+ bucketMetricGroup.unregisterRocksDBStatistics();
+
// drop the kv tablet
checkNotNull(kvManager);
kvManager.dropKv(tableBucket);
@@ -689,6 +693,12 @@ public final class Replica {
physicalPath,
tableBucket,
endTime - startTime);
+
+ // Register RocksDB statistics to BucketMetricGroup
+ if (kvTablet != null && kvTablet.getRocksDBStatistics() != null) {
+
bucketMetricGroup.registerRocksDBStatistics(kvTablet.getRocksDBStatistics());
+ }
+
return optCompletedSnapshot;
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index ec2fab37c..52b1080ed 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -48,12 +48,14 @@ import org.apache.fluss.row.encode.ValueEncoder;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Key;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.KvEntry;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.Value;
+import org.apache.fluss.server.kv.rocksdb.RocksDBStatistics;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.log.FetchIsolation;
import org.apache.fluss.server.log.LogAppendInfo;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.log.LogTestUtils;
import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
@@ -1343,4 +1345,186 @@ class KvTabletTest {
private Value valueOf(BinaryRow row) {
return Value.of(ValueEncoder.encodeValue(schemaId, row));
}
+
+ @Test
+ void testRocksDBMetrics() throws Exception {
+ // Initialize tablet with schema
+ initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
+
+ // Get RocksDB statistics
+ RocksDBStatistics statistics = kvTablet.getRocksDBStatistics();
+ assertThat(statistics).as("RocksDB statistics should be
available").isNotNull();
+
+ // Verify statistics is properly initialized
+ org.rocksdb.Statistics stats = kvTablet.getRocksDBKv().getStatistics();
+ assertThat(stats).as("RocksDB Statistics should be
enabled").isNotNull();
+
+ // All metrics should start at 0 for a fresh database
+ assertThat(statistics.getBytesWritten()).isEqualTo(0);
+ assertThat(statistics.getBytesRead()).isEqualTo(0);
+ assertThat(statistics.getFlushBytesWritten()).isEqualTo(0);
+ assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0);
+ assertThat(statistics.getGetLatencyMicros()).isEqualTo(0);
+ assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0);
+ assertThat(statistics.getFlushPending()).isEqualTo(0);
+ assertThat(statistics.getCompactionPending()).isEqualTo(0);
+ assertThat(statistics.getTotalMemoryUsage())
+ .isGreaterThan(0); // Block cache is pre-allocated
+
+ // ========== Phase 1: Write and Flush ==========
+ int numRecords = 10000;
+ List<KvRecord> rows = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ rows.add(
+ kvRecordFactory.ofRecord(
+ String.valueOf(i).getBytes(), new Object[] {i,
"value-" + i}));
+ }
+ kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(rows), null);
+ kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+ // After write and flush: must have written data to RocksDB
+ long bytesWrittenAfterFlush = statistics.getBytesWritten();
+ assertThat(bytesWrittenAfterFlush)
+ .as("Must write data to RocksDB after flush")
+ .isGreaterThan(0);
+
+ // Flush must have written bytes (memtable to SST file)
+ long flushBytesWritten = statistics.getFlushBytesWritten();
+ // Note: FLUSH_WRITE_BYTES may not be tracked in all configurations
+ assertThat(flushBytesWritten)
+ .as("Flush bytes written is non-negative")
+ .isGreaterThanOrEqualTo(0);
+
+ // Write latency must be tracked for write operations
+ long writeLatency = statistics.getWriteLatencyMicros();
+ assertThat(writeLatency)
+ .as("Write latency must be > 0 after write operations")
+ .isGreaterThan(0);
+
+ // After flush, there should be at least 1 L0 file (unless immediate
compaction occurred)
+ long numL0Files = statistics.getNumFilesAtLevel0();
+ assertThat(numL0Files)
+ .as("Should have L0 files after flush (or 0 if compacted)")
+ .isGreaterThanOrEqualTo(0);
+
+ // Flush pending must be 0 after flush completes
+ assertThat(statistics.getFlushPending())
+ .as("No pending flush after completion")
+ .isEqualTo(0);
+
+ // ========== Phase 2: Read Operations ==========
+ List<byte[]> keysToRead = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ keysToRead.add(String.valueOf(i).getBytes());
+ }
+ List<byte[]> readValues = kvTablet.multiGet(keysToRead);
+ assertThat(readValues).hasSize(100);
+
+ // After reads: get latency must be tracked
+ long getLatency = statistics.getGetLatencyMicros();
+ assertThat(getLatency)
+ .as("Get latency must be tracked after read operations")
+ .isGreaterThan(0);
+
+ // Bytes read may increase (depending on cache hits)
+ long bytesRead = statistics.getBytesRead();
+ // Note: bytesRead could be 0 if all data was served from block cache
+ assertThat(bytesRead).as("Bytes read is
non-negative").isGreaterThanOrEqualTo(0);
+
+ // ========== Phase 3: Write More Data to Trigger Compaction ==========
+ List<KvRecord> moreRows = new ArrayList<>();
+ for (int i = numRecords; i < numRecords * 2; i++) {
+ moreRows.add(
+ kvRecordFactory.ofRecord(
+ String.valueOf(i).getBytes(), new Object[] {i,
"value-" + i}));
+ }
+ kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(moreRows), null);
+ kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+ // Bytes written must increase with more data
+ long bytesWrittenAfterSecondFlush = statistics.getBytesWritten();
+ assertThat(bytesWrittenAfterSecondFlush)
+ .as("Bytes written must increase with second batch")
+ .isGreaterThan(bytesWrittenAfterFlush);
+
+ // ========== Phase 4: Manual Compaction ==========
+ long compactionBytesReadBefore = statistics.getCompactionBytesRead();
+ long compactionBytesWrittenBefore =
statistics.getCompactionBytesWritten();
+ long compactionTimeBefore = statistics.getCompactionTimeMicros();
+
+ // Trigger manual compaction
+ try {
+ kvTablet.getRocksDBKv().getDb().compactRange();
+ } catch (Exception e) {
+ // Compaction failure is acceptable in test
+ }
+
+ // After compaction: verify compaction metrics increased
+ long compactionBytesReadAfter = statistics.getCompactionBytesRead();
+ long compactionBytesWrittenAfter =
statistics.getCompactionBytesWritten();
+ long compactionTimeAfter = statistics.getCompactionTimeMicros();
+
+ // If any compaction occurred, all three metrics should increase
+ boolean compactionOccurred =
+ compactionBytesReadAfter > compactionBytesReadBefore
+ || compactionBytesWrittenAfter >
compactionBytesWrittenBefore
+ || compactionTimeAfter > compactionTimeBefore;
+
+ if (compactionOccurred) {
+ assertThat(compactionBytesReadAfter)
+ .as("Compaction must read data")
+ .isGreaterThan(compactionBytesReadBefore);
+ assertThat(compactionBytesWrittenAfter)
+ .as("Compaction must write data")
+ .isGreaterThan(compactionBytesWrittenBefore);
+ assertThat(compactionTimeAfter)
+ .as("Compaction must take time")
+ .isGreaterThan(compactionTimeBefore);
+ }
+
+ // Compaction pending must be 0 after compaction completes
+ assertThat(statistics.getCompactionPending())
+ .as("No pending compaction after completion")
+ .isEqualTo(0);
+
+ // ========== Phase 5: Verify Final State Before Close ==========
+ // Bytes written must be positive after all operations
+ assertThat(statistics.getBytesWritten())
+ .as("Total bytes written must be positive")
+ .isGreaterThan(0);
+
+ // Write and get latency must be positive (operations occurred)
+ assertThat(statistics.getWriteLatencyMicros())
+ .as("Write latency must be positive after writes")
+ .isGreaterThan(0);
+ assertThat(statistics.getGetLatencyMicros())
+ .as("Get latency must be positive after reads")
+ .isGreaterThan(0);
+
+ // No pending operations
+ assertThat(statistics.getFlushPending()).isEqualTo(0);
+ assertThat(statistics.getCompactionPending()).isEqualTo(0);
+
+ // Memory usage should be reasonable (> 0 due to block cache +
memtables)
+ assertThat(statistics.getTotalMemoryUsage())
+ .as("Total memory usage must be positive")
+ .isGreaterThan(0);
+
+ // ========== Phase 6: Verify Metrics After Close ==========
+ kvTablet.close();
+
+ // After close: all metrics must return 0 (ResourceGuard protection)
+ assertThat(statistics.getBytesWritten()).isEqualTo(0);
+ assertThat(statistics.getBytesRead()).isEqualTo(0);
+ assertThat(statistics.getFlushBytesWritten()).isEqualTo(0);
+ assertThat(statistics.getWriteLatencyMicros()).isEqualTo(0);
+ assertThat(statistics.getGetLatencyMicros()).isEqualTo(0);
+ assertThat(statistics.getNumFilesAtLevel0()).isEqualTo(0);
+ assertThat(statistics.getFlushPending()).isEqualTo(0);
+ assertThat(statistics.getCompactionPending()).isEqualTo(0);
+ assertThat(statistics.getCompactionBytesRead()).isEqualTo(0);
+ assertThat(statistics.getCompactionBytesWritten()).isEqualTo(0);
+ assertThat(statistics.getCompactionTimeMicros()).isEqualTo(0);
+ assertThat(statistics.getTotalMemoryUsage()).isEqualTo(0);
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
index f36853cd4..676a17a6b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java
@@ -189,7 +189,8 @@ class RocksDBResourceContainerTest {
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertThat(tableConfig.blockSize()).isEqualTo(4 * SizeUnit.KB);
assertThat(tableConfig.metadataBlockSize()).isEqualTo(8 *
SizeUnit.KB);
- assertThat(tableConfig.blockCacheSize()).isEqualTo(512 *
SizeUnit.MB);
+ // Verify block cache was created with explicit LRUCache for
memory tracking
+ assertThat(optionsContainer.getBlockCache()).isNotNull();
assertThat(tableConfig.filterPolicy() instanceof
BloomFilter).isTrue();
}
}
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index 2d8ef8077..65109551c 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -825,6 +825,134 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</tbody>
</table>
+### RocksDB
+
+RocksDB metrics provide insights into the performance and health of the
underlying RocksDB storage engine used by Fluss. These metrics are categorized
into table-level metrics (aggregated from all buckets of a table) and
server-level metrics (aggregated from all tables in a server).
+
+#### Table-level RocksDB Metrics (Max Aggregation)
+
+These metrics use Max aggregation to show the maximum value across all buckets
of a table, which helps identify the worst-performing bucket.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style={{width: '30pt'}}>Scope</th>
+ <th class="text-left" style={{width: '150pt'}}>Infix</th>
+ <th class="text-left" style={{width: '80pt'}}>Metrics</th>
+ <th class="text-left" style={{width: '300pt'}}>Description</th>
+ <th class="text-left" style={{width: '40pt'}}>Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th rowspan="7"><strong>tabletserver</strong></th>
+ <td rowspan="7">table</td>
+ <td>rocksdbWriteStallMicrosMax</td>
+ <td>Maximum write stall duration across all buckets of this table (in
microseconds). Write stalls occur when RocksDB needs to slow down writes due to
compaction pressure or memory limits.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbGetLatencyMicrosMax</td>
+ <td>Maximum get operation latency across all buckets of this table (in
microseconds). This represents the slowest read operation among all
buckets.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbWriteLatencyMicrosMax</td>
+ <td>Maximum write operation latency across all buckets of this table (in
microseconds). This represents the slowest write operation among all
buckets.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbNumFilesAtLevel0Max</td>
+ <td>Maximum number of L0 files across all buckets of this table. A high
number of L0 files indicates compaction pressure and may impact read
performance.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbFlushPendingMax</td>
+ <td>Maximum flush pending indicator across all buckets of this table. A
value greater than 0 indicates that some buckets have pending flush
operations.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbCompactionPendingMax</td>
+ <td>Maximum compaction pending indicator across all buckets of this
table. A value greater than 0 indicates that some buckets have pending
compaction operations.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbCompactionTimeMicrosMax</td>
+ <td>Maximum compaction time across all buckets of this table (in
microseconds). This represents the longest compaction operation among all
buckets.</td>
+ <td>Gauge</td>
+ </tr>
+ </tbody>
+</table>
+
+#### Table-level RocksDB Metrics (Sum Aggregation)
+
+These metrics use Sum aggregation to show the total value across all buckets
of a table, providing an overall view of table-level I/O and storage operations.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style={{width: '30pt'}}>Scope</th>
+ <th class="text-left" style={{width: '150pt'}}>Infix</th>
+ <th class="text-left" style={{width: '80pt'}}>Metrics</th>
+ <th class="text-left" style={{width: '300pt'}}>Description</th>
+ <th class="text-left" style={{width: '40pt'}}>Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th rowspan="5"><strong>tabletserver</strong></th>
+ <td rowspan="5">table</td>
+ <td>rocksdbBytesReadTotal</td>
+ <td>Total bytes read across all buckets of this table. This includes
both user reads and internal reads (e.g., compaction reads).</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbBytesWrittenTotal</td>
+ <td>Total bytes written across all buckets of this table. This includes
both user writes and internal writes (e.g., compaction writes).</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbFlushBytesWrittenTotal</td>
+ <td>Total flush bytes written across all buckets of this table. This
represents the amount of data flushed from memtable to persistent storage.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbCompactionBytesReadTotal</td>
+ <td>Total compaction bytes read across all buckets of this table. This
represents the amount of data read during compaction operations.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>rocksdbCompactionBytesWrittenTotal</td>
+ <td>Total compaction bytes written across all buckets of this table.
This represents the amount of data written during compaction operations.</td>
+ <td>Gauge</td>
+ </tr>
+ </tbody>
+</table>
+
+#### Server-level RocksDB Metrics (Sum Aggregation)
+
+These metrics use Sum aggregation to show the total value across all tables in
a server, providing a server-wide view of RocksDB resource usage.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style={{width: '30pt'}}>Scope</th>
+ <th class="text-left" style={{width: '150pt'}}>Infix</th>
+ <th class="text-left" style={{width: '80pt'}}>Metrics</th>
+ <th class="text-left" style={{width: '300pt'}}>Description</th>
+ <th class="text-left" style={{width: '40pt'}}>Type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th rowspan="1"><strong>tabletserver</strong></th>
+ <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="1">-</td>
+ <td>rocksdbMemoryUsageTotal</td>
+ <td>Total memory usage across all RocksDB instances in this server (in
bytes). This includes memory used by memtables, block cache, and other RocksDB
internal structures.</td>
+ <td>Gauge</td>
+ </tr>
+ </tbody>
+</table>
### Flink connector standard metrics