This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a9d2dccbde KAFKA-20157 Add RetentionSizeInPercent metrics for regular
topics (#21612)
0a9d2dccbde is described below
commit 0a9d2dccbde4bbd32452168b8d6818ff368620a0
Author: manangupta111 <[email protected]>
AuthorDate: Mon Mar 2 20:58:08 2026 +0530
KAFKA-20157 Add RetentionSizeInPercent metrics for regular topics (#21612)
Introduces `RetentionSizeInPercent` JMX metric for regular (non-tiered)
topics to express partition storage as a percentage of configured
retention limits. No existing metrics or behavior are changed. This PR
is a follow-up to the tiered storage metrics implementation and
completes [KIP-1257: Partition Size Percentage Metrics for Storage
Monitoring](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1257%3A+Partition+Size+Percentage+Metrics+for+Storage+Monitoring).
JMX Metric
```
kafka.log:type=Log,name=RetentionSizeInPercent,topic=<topic>,partition=<partition>
```
Testing
- Parameterized tests covering tiered/non-tiered topic scenarios
- Tests for unlimited retention bytes configuration
- Tests verify metric values via Yammer/JMX gauges
Reviewers: Kamal Chandraprakash <[email protected]>
---------
Co-authored-by: manan.gupta <[email protected]>
---
.../storage/internals/log/LogMetricNames.java | 7 +-
.../kafka/storage/internals/log/UnifiedLog.java | 58 +++++--
.../storage/internals/log/UnifiedLogTest.java | 166 +++++++++++++++++++++
3 files changed, 216 insertions(+), 15 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
index 47230096701..7529115baa6 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
@@ -24,6 +24,11 @@ public class LogMetricNames {
public static final String LOG_START_OFFSET = "LogStartOffset";
public static final String LOG_END_OFFSET = "LogEndOffset";
public static final String SIZE = "Size";
+ public static final String RETENTION_SIZE_IN_PERCENT =
"RetentionSizeInPercent";
- public static final List<String> ALL_METRIC_NAMES =
List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE);
+ public static final List<String> ALL_METRIC_NAMES = List.of(
+ NUM_LOG_SEGMENTS,
+ LOG_START_OFFSET,
+ LOG_END_OFFSET,
SIZE,
+
RETENTION_SIZE_IN_PERCENT);
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 3bac4c3b9d8..ef5cc755937 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -82,6 +82,7 @@ import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -121,6 +122,7 @@ public class UnifiedLog implements AutoCloseable {
/* A lock that guards all modifications to the log */
private final Object lock = new Object();
private final Map<String, Map<String, String>> metricNames = new
HashMap<>();
+ private final AtomicInteger retentionSizeInPercentValue = new
AtomicInteger(0);
// localLog The LocalLog instance containing non-empty log segments
recovered from disk
private final LocalLog localLog;
@@ -714,10 +716,29 @@ public class UnifiedLog implements AutoCloseable {
metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET,
this::logStartOffset, tags);
metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET,
this::logEndOffset, tags);
metricsGroup.newGauge(LogMetricNames.SIZE, this::size, tags);
+ metricsGroup.newGauge(LogMetricNames.RETENTION_SIZE_IN_PERCENT,
retentionSizeInPercentValue::get, tags);
metricNames.put(LogMetricNames.NUM_LOG_SEGMENTS, tags);
metricNames.put(LogMetricNames.LOG_START_OFFSET, tags);
metricNames.put(LogMetricNames.LOG_END_OFFSET, tags);
metricNames.put(LogMetricNames.SIZE, tags);
+ metricNames.put(LogMetricNames.RETENTION_SIZE_IN_PERCENT, tags);
+ }
+
+ /**
+ * Calculates the partition size as a percentage of the configured
retention size.
+ * This metric is only meaningful for non-tiered topics with size-based
retention configured.
+ *
+ * @return The partition size as a percentage of retention.bytes, or 0 if:
+ * - Remote storage is enabled with remote copy enabled (metric
handled by RemoteLogManager)
+ * - Retention size is not configured (0 or negative)
+ */
+ // Visible for testing
+ int calculateRetentionSizeInPercent() {
+ long retentionSize = localRetentionSize(config(),
remoteLogEnabledAndRemoteCopyEnabled());
+ if (!remoteLogEnabledAndRemoteCopyEnabled() && retentionSize > 0) {
+ return (int) ((size() * 100) / retentionSize);
+ }
+ return 0;
}
public void removeExpiredProducers(long currentTimeMs) {
@@ -1944,25 +1965,34 @@ public class UnifiedLog implements AutoCloseable {
* not deletion is enabled, delete any local log segments that are before
the log start offset
*/
public int deleteOldSegments() throws IOException {
- if (config().delete) {
- return deleteLogStartOffsetBreachedSegments() +
- deleteRetentionSizeBreachedSegments() +
- deleteRetentionMsBreachedSegments();
- } else if (config().compact) {
- return deleteLogStartOffsetBreachedSegments();
- } else {
- // If cleanup.policy is empty and remote storage is enabled, the
local log segments will
- // be cleaned based on the values of log.local.retention.bytes and
log.local.retention.ms
- if (remoteLogEnabledAndRemoteCopyEnabled()) {
- return deleteLogStartOffsetBreachedSegments() +
+ int deletedSegments;
+ try {
+ if (config().delete) {
+ deletedSegments = deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
+ } else if (config().compact) {
+ deletedSegments = deleteLogStartOffsetBreachedSegments();
} else {
- // If cleanup.policy is empty and remote storage is disabled,
we should not delete any local log segments
- // unless the log start offset advances through deleteRecords
- return deleteLogStartOffsetBreachedSegments();
+ // If cleanup.policy is empty and remote storage is enabled,
the local log segments will
+ // be cleaned based on the values of log.local.retention.bytes
and log.local.retention.ms
+ if (remoteLogEnabledAndRemoteCopyEnabled()) {
+ deletedSegments = deleteLogStartOffsetBreachedSegments() +
+ deleteRetentionSizeBreachedSegments() +
+ deleteRetentionMsBreachedSegments();
+ } else {
+ // If cleanup.policy is empty and remote storage is
disabled, we should not delete any local log segments
+ // unless the log start offset advances through
deleteRecords
+ deletedSegments = deleteLogStartOffsetBreachedSegments();
+ }
}
+ } finally {
+ // Calculate retentionSizeInPercent in finally block to ensure the
metric is updated
+ // even when log deletion encounters errors. This also saves CPU
cycles by only
+ // calculating when the log-cleaner thread runs.
+ retentionSizeInPercentValue.set(calculateRetentionSizeInPercent());
}
+ return deletedSegments;
}
public interface DeletionCondition {
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index 6a580b494a8..efef8c8ee4c 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.server.common.TransactionVersion;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
@@ -35,8 +36,12 @@ import
org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
+import com.yammer.metrics.core.Gauge;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import java.io.File;
import java.io.IOException;
@@ -51,7 +56,10 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
public class UnifiedLogTest {
@@ -747,4 +755,162 @@ public class UnifiedLogTest {
}
return builder.build();
}
+
+ /**
+ * Test RetentionSizeInPercent metric for regular (non-tiered) topics.
+ * The metric should only be reported for non-tiered topics with
size-based retention configured.
+ *
+ * @param remoteLogStorageEnable whether remote log storage is enabled
+ * @param remoteLogCopyDisable whether remote log copy is disabled (only
relevant when remote storage is enabled)
+ * @param expectedSizeInPercent expected percentage value after retention
cleanup
+ */
+ @ParameterizedTest
+ @CsvSource({
+ // Remote storage enabled with copy enabled: metric handled by
RemoteLogManager, returns 0 here
+ "true, false, 0",
+ // Remote storage enabled but copy disabled: metric should be
calculated (100%)
+ "true, true, 100",
+ // Remote storage disabled: metric should be calculated (100%)
+ "false, false, 100",
+ // Remote storage disabled (remoteLogCopyDisable is ignored): metric
should be calculated (100%)
+ "false, true, 100"
+ })
+ public void testRetentionSizeInPercentMetric(
+ boolean remoteLogStorageEnable,
+ boolean remoteLogCopyDisable,
+ int expectedSizeInPercent
+ ) throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ int recordSize = records.get().sizeInBytes();
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(recordSize * 5)
+ .retentionBytes(recordSize * 10L)
+ .remoteLogStorageEnable(remoteLogStorageEnable)
+ .remoteLogCopyDisable(remoteLogCopyDisable)
+ .build();
+ log = createLog(logDir, logConfig, true);
+
+ String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
+ ",partition=" + log.topicPartition().partition();
+
+ // Append some messages to create 3 segments (15 records / 5 records
per segment = 3 segments)
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // Before deletion, calculate what the percentage should be
+ // Total size = 15 * recordSize, retention = 10 * recordSize
+ // Percentage = (15 * 100) / 10 = 150% (for non-tiered topics)
+ if (!remoteLogStorageEnable || remoteLogCopyDisable) {
+ assertEquals(150, log.calculateRetentionSizeInPercent());
+ }
+
+ log.updateHighWatermark(log.logEndOffset());
+ // For tiered storage tests, simulate remote storage having the data
+ if (remoteLogStorageEnable) {
+ log.updateHighestOffsetInRemoteStorage(9);
+ }
+ log.deleteOldSegments();
+
+ // After deletion: log size should be ~10 * recordSize (2 segments),
retention = 10 * recordSize
+ // Percentage = (10 * 100) / 10 = 100% (for non-tiered topics)
+ // Verify via Yammer metric (JMX)
+ assertEquals(expectedSizeInPercent, yammerMetricValue(metricName));
+ assertEquals(2, log.numberOfSegments(), "should have 2 segments after
deletion");
+ }
+
+ @Test
+ public void testRetentionSizeInPercentWithInfiniteRetention() throws
IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ // Create log with no retention configured (retentionBytes = -1 means
unlimited)
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.get().sizeInBytes() * 5)
+ .retentionBytes(-1L)
+ .build();
+ log = createLog(logDir, logConfig, false);
+
+ String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
+ ",partition=" + log.topicPartition().partition();
+
+ for (int i = 0; i < 10; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+
+ // With unlimited retention, the metric should be 0
+ assertEquals(0, log.calculateRetentionSizeInPercent());
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.deleteOldSegments();
+
+ // After deleteOldSegments, metric should still be 0
+ // Verify via Yammer metric (JMX)
+ assertEquals(0, yammerMetricValue(metricName));
+ }
+
+ /**
+ * Test that verifies the RetentionSizeInPercent metric is always updated
in the finally block
+ * of deleteOldSegments(), even when an exception is thrown during
deletion.
+ * This ensures the metric is calculated even when log deletion encounters
errors.
+ */
+ @Test
+ public void testRetentionSizeInPercentMetricUpdatedOnDeletionError()
throws IOException {
+ Supplier<MemoryRecords> records = () ->
singletonRecords("test".getBytes());
+ int recordSize = records.get().sizeInBytes();
+
+ // Create log with retention smaller than data to force deletion
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(recordSize * 5)
+ .retentionBytes(recordSize * 10L)
+ .build();
+ log = createLog(logDir, logConfig, false);
+
+ String metricName = "name=RetentionSizeInPercent,topic=" +
log.topicPartition().topic() +
+ ",partition=" + log.topicPartition().partition();
+
+ // Append messages to create multiple segments (15 records / 5 per
segment = 3 segments)
+ for (int i = 0; i < 15; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+ assertEquals(3, log.numberOfSegments(), "Should have 3 segments");
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ // First call to initialize the metric normally
+ log.deleteOldSegments();
+ assertEquals(100, yammerMetricValue(metricName), "Metric should be
100% after initial deletion");
+
+ // Add more data to change the metric value
+ for (int i = 0; i < 10; i++) {
+ log.appendAsLeader(records.get(), 0);
+ }
+ log.updateHighWatermark(log.logEndOffset());
+
+ // Create a spy and make config() throw on first call, but work
normally on subsequent calls
+ // This simulates an error in the try block while allowing the finally
block to succeed
+ // The config() method is called in both the try block and
calculateRetentionSizeInPercent()
+ UnifiedLog spyLog = spy(log);
+ doThrow(new RuntimeException("Simulated error during deletion"))
+ .doCallRealMethod() // Allow subsequent calls to work (for
finally block)
+ .when(spyLog).config();
+
+ // Call deleteOldSegments on the spy - it should throw due to config()
error
+ // But the finally block should still execute and update the metric
+ assertThrows(RuntimeException.class, spyLog::deleteOldSegments);
+
+ // Verify the metric was still updated in the finally block despite
the exception
+ // After adding 10 more records (2 more segments), total = 4 segments
= 20 records
+ // Percentage = (20 * 100) / 10 = 200%
+ assertEquals(200, yammerMetricValue(metricName),
+ "Metric should be updated in finally block even when exception
occurs");
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object yammerMetricValue(String name) {
+ Gauge<Object> gauge = (Gauge<Object>)
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(e -> e.getKey().getMBeanName().endsWith(name))
+ .findFirst()
+ .get()
+ .getValue();
+ return gauge.value();
+ }
}