This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a9d2dccbde KAFKA-20157 Add RetentionSizeInPercent metrics for regular 
topics (#21612)
0a9d2dccbde is described below

commit 0a9d2dccbde4bbd32452168b8d6818ff368620a0
Author: manangupta111 <[email protected]>
AuthorDate: Mon Mar 2 20:58:08 2026 +0530

    KAFKA-20157 Add RetentionSizeInPercent metrics for regular topics (#21612)
    
    Introduces `RetentionSizeInPercent` JMX metric for regular (non-tiered)
    topics to express partition storage as a percentage of configured
    retention limits. No existing metrics or behavior are changed.  This PR
    is a follow-up to the tiered storage metrics implementation and
    completes [KIP-1257: Partition Size Percentage Metrics for Storage
    
    
Monitoring](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1257%3A+Partition+Size+Percentage+Metrics+for+Storage+Monitoring).
    
    JMX Metric
    ```
    
kafka.log:type=Log,name=RetentionSizeInPercent,topic=<topic>,partition=<partition>
    ```
    
    Testing
    - Parameterized tests covering tiered/non-tiered topic scenarios
    - Tests for unlimited retention bytes configuration
    - Tests verify metric values via Yammer/JMX gauges
    
    Reviewers: Kamal Chandraprakash <[email protected]>
    
    ---------
    
    Co-authored-by: manan.gupta <[email protected]>
---
 .../storage/internals/log/LogMetricNames.java      |   7 +-
 .../kafka/storage/internals/log/UnifiedLog.java    |  58 +++++--
 .../storage/internals/log/UnifiedLogTest.java      | 166 +++++++++++++++++++++
 3 files changed, 216 insertions(+), 15 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
index 47230096701..7529115baa6 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java
@@ -24,6 +24,11 @@ public class LogMetricNames {
     public static final String LOG_START_OFFSET = "LogStartOffset";
     public static final String LOG_END_OFFSET = "LogEndOffset";
     public static final String SIZE = "Size";
+    public static final String RETENTION_SIZE_IN_PERCENT = 
"RetentionSizeInPercent";
 
-    public static final List<String> ALL_METRIC_NAMES = 
List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE);
+    public static final List<String> ALL_METRIC_NAMES = List.of(
+                                                            NUM_LOG_SEGMENTS, 
+                                                            LOG_START_OFFSET, 
+                                                            LOG_END_OFFSET, 
SIZE, 
+                                                            
RETENTION_SIZE_IN_PERCENT);
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 3bac4c3b9d8..ef5cc755937 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -82,6 +82,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -121,6 +122,7 @@ public class UnifiedLog implements AutoCloseable {
     /* A lock that guards all modifications to the log */
     private final Object lock = new Object();
     private final Map<String, Map<String, String>> metricNames = new 
HashMap<>();
+    private final AtomicInteger retentionSizeInPercentValue = new 
AtomicInteger(0);
 
     // localLog The LocalLog instance containing non-empty log segments 
recovered from disk
     private final LocalLog localLog;
@@ -714,10 +716,29 @@ public class UnifiedLog implements AutoCloseable {
         metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET, 
this::logStartOffset, tags);
         metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, 
this::logEndOffset, tags);
         metricsGroup.newGauge(LogMetricNames.SIZE, this::size, tags);
+        metricsGroup.newGauge(LogMetricNames.RETENTION_SIZE_IN_PERCENT, 
retentionSizeInPercentValue::get, tags);
         metricNames.put(LogMetricNames.NUM_LOG_SEGMENTS, tags);
         metricNames.put(LogMetricNames.LOG_START_OFFSET, tags);
         metricNames.put(LogMetricNames.LOG_END_OFFSET, tags);
         metricNames.put(LogMetricNames.SIZE, tags);
+        metricNames.put(LogMetricNames.RETENTION_SIZE_IN_PERCENT, tags);
+    }
+
+    /**
+     * Calculates the partition size as a percentage of the configured 
retention size.
+     * This metric is only meaningful for non-tiered topics with size-based 
retention configured.
+     *
+     * @return The partition size as a percentage of retention.bytes, or 0 if:
+     *         - Remote storage is enabled with remote copy enabled (metric 
handled by RemoteLogManager)
+     *         - Retention size is not configured (0 or negative)
+     */
+    // Visible for testing
+    int calculateRetentionSizeInPercent() {
+        long retentionSize = localRetentionSize(config(), 
remoteLogEnabledAndRemoteCopyEnabled());
+        if (!remoteLogEnabledAndRemoteCopyEnabled() && retentionSize > 0) {
+            return (int) ((size() * 100) / retentionSize);
+        }
+        return 0;
     }
 
     public void removeExpiredProducers(long currentTimeMs) {
@@ -1944,25 +1965,34 @@ public class UnifiedLog implements AutoCloseable {
      * not deletion is enabled, delete any local log segments that are before 
the log start offset
      */
     public int deleteOldSegments() throws IOException {
-        if (config().delete) {
-            return deleteLogStartOffsetBreachedSegments() +
-                    deleteRetentionSizeBreachedSegments() +
-                    deleteRetentionMsBreachedSegments();
-        } else if (config().compact) {
-            return deleteLogStartOffsetBreachedSegments();
-        } else {
-            // If cleanup.policy is empty and remote storage is enabled, the 
local log segments will 
-            // be cleaned based on the values of log.local.retention.bytes and 
log.local.retention.ms
-            if (remoteLogEnabledAndRemoteCopyEnabled()) {
-                return deleteLogStartOffsetBreachedSegments() +
+        int deletedSegments;
+        try {
+            if (config().delete) {
+                deletedSegments = deleteLogStartOffsetBreachedSegments() +
                         deleteRetentionSizeBreachedSegments() +
                         deleteRetentionMsBreachedSegments();
+            } else if (config().compact) {
+                deletedSegments = deleteLogStartOffsetBreachedSegments();
             } else {
-                // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local log segments 
-                // unless the log start offset advances through deleteRecords
-                return deleteLogStartOffsetBreachedSegments();
+                // If cleanup.policy is empty and remote storage is enabled, 
the local log segments will 
+                // be cleaned based on the values of log.local.retention.bytes 
and log.local.retention.ms
+                if (remoteLogEnabledAndRemoteCopyEnabled()) {
+                    deletedSegments = deleteLogStartOffsetBreachedSegments() +
+                            deleteRetentionSizeBreachedSegments() +
+                            deleteRetentionMsBreachedSegments();
+                } else {
+                    // If cleanup.policy is empty and remote storage is 
disabled, we should not delete any local log segments 
+                    // unless the log start offset advances through 
deleteRecords
+                    deletedSegments = deleteLogStartOffsetBreachedSegments();
+                }
             }
+        } finally {
+            // Calculate retentionSizeInPercent in finally block to ensure the 
metric is updated
+            // even when log deletion encounters errors. This also saves CPU 
cycles by only
+            // calculating when the log-cleaner thread runs.
+            retentionSizeInPercentValue.set(calculateRetentionSizeInPercent());
         }
+        return deletedSegments;
     }
 
     public interface DeletionCondition {
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
index 6a580b494a8..efef8c8ee4c 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
 import org.apache.kafka.server.common.TransactionVersion;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.Scheduler;
@@ -35,8 +36,12 @@ import 
org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 import org.apache.kafka.test.TestUtils;
 
+import com.yammer.metrics.core.Gauge;
+
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,7 +56,10 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
 
 public class UnifiedLogTest {
 
@@ -747,4 +755,162 @@ public class UnifiedLogTest {
         }
         return builder.build();
     }
+
+    /**
+     * Test RetentionSizeInPercent metric for regular (non-tiered) topics.
+     * The metric should only be reported for non-tiered topics with 
size-based retention configured.
+     *
+     * @param remoteLogStorageEnable whether remote log storage is enabled
+     * @param remoteLogCopyDisable whether remote log copy is disabled (only 
relevant when remote storage is enabled)
+     * @param expectedSizeInPercent expected percentage value after retention 
cleanup
+     */
+    @ParameterizedTest
+    @CsvSource({
+        // Remote storage enabled with copy enabled: metric handled by 
RemoteLogManager, returns 0 here
+        "true, false, 0",
+        // Remote storage enabled but copy disabled: metric should be 
calculated (100%)
+        "true, true, 100",
+        // Remote storage disabled: metric should be calculated (100%)
+        "false, false, 100",
+        // Remote storage disabled (remoteLogCopyDisable is ignored): metric 
should be calculated (100%)
+        "false, true, 100"
+    })
+    public void testRetentionSizeInPercentMetric(
+            boolean remoteLogStorageEnable,
+            boolean remoteLogCopyDisable,
+            int expectedSizeInPercent
+    ) throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        int recordSize = records.get().sizeInBytes();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(recordSize * 5)
+                .retentionBytes(recordSize * 10L)
+                .remoteLogStorageEnable(remoteLogStorageEnable)
+                .remoteLogCopyDisable(remoteLogCopyDisable)
+                .build();
+        log = createLog(logDir, logConfig, true);
+
+        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() + 
+                ",partition=" + log.topicPartition().partition();
+
+        // Append some messages to create 3 segments (15 records / 5 records 
per segment = 3 segments)
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // Before deletion, calculate what the percentage should be
+        // Total size = 15 * recordSize, retention = 10 * recordSize
+        // Percentage = (15 * 100) / 10 = 150% (for non-tiered topics)
+        if (!remoteLogStorageEnable || remoteLogCopyDisable) {
+            assertEquals(150, log.calculateRetentionSizeInPercent());
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        // For tiered storage tests, simulate remote storage having the data
+        if (remoteLogStorageEnable) {
+            log.updateHighestOffsetInRemoteStorage(9);
+        }
+        log.deleteOldSegments();
+
+        // After deletion: log size should be ~10 * recordSize (2 segments), 
retention = 10 * recordSize
+        // Percentage = (10 * 100) / 10 = 100% (for non-tiered topics)
+        // Verify via Yammer metric (JMX)
+        assertEquals(expectedSizeInPercent, yammerMetricValue(metricName));
+        assertEquals(2, log.numberOfSegments(), "should have 2 segments after 
deletion");
+    }
+
+    @Test
+    public void testRetentionSizeInPercentWithInfiniteRetention() throws 
IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        // Create log with no retention configured (retentionBytes = -1 means 
unlimited)
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.get().sizeInBytes() * 5)
+                .retentionBytes(-1L)
+                .build();
+        log = createLog(logDir, logConfig, false);
+
+        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() + 
+                ",partition=" + log.topicPartition().partition();
+
+        for (int i = 0; i < 10; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // With unlimited retention, the metric should be 0
+        assertEquals(0, log.calculateRetentionSizeInPercent());
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.deleteOldSegments();
+
+        // After deleteOldSegments, metric should still be 0
+        // Verify via Yammer metric (JMX)
+        assertEquals(0, yammerMetricValue(metricName));
+    }
+
+    /**
+     * Test that verifies the RetentionSizeInPercent metric is always updated 
in the finally block
+     * of deleteOldSegments(), even when an exception is thrown during 
deletion.
+     * This ensures the metric is calculated even when log deletion encounters 
errors.
+     */
+    @Test
+    public void testRetentionSizeInPercentMetricUpdatedOnDeletionError() 
throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        int recordSize = records.get().sizeInBytes();
+
+        // Create log with retention smaller than data to force deletion
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(recordSize * 5)
+                .retentionBytes(recordSize * 10L)
+                .build();
+        log = createLog(logDir, logConfig, false);
+
+        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() +
+                ",partition=" + log.topicPartition().partition();
+
+        // Append messages to create multiple segments (15 records / 5 per 
segment = 3 segments)
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+        assertEquals(3, log.numberOfSegments(), "Should have 3 segments");
+
+        log.updateHighWatermark(log.logEndOffset());
+
+        // First call to initialize the metric normally
+        log.deleteOldSegments();
+        assertEquals(100, yammerMetricValue(metricName), "Metric should be 
100% after initial deletion");
+
+        // Add more data to change the metric value
+        for (int i = 0; i < 10; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+        log.updateHighWatermark(log.logEndOffset());
+
+        // Create a spy and make config() throw on first call, but work 
normally on subsequent calls
+        // This simulates an error in the try block while allowing the finally 
block to succeed
+        // The config() method is called in both the try block and 
calculateRetentionSizeInPercent()
+        UnifiedLog spyLog = spy(log);
+        doThrow(new RuntimeException("Simulated error during deletion"))
+                .doCallRealMethod()  // Allow subsequent calls to work (for 
finally block)
+                .when(spyLog).config();
+
+        // Call deleteOldSegments on the spy - it should throw due to config() 
error
+        // But the finally block should still execute and update the metric
+        assertThrows(RuntimeException.class, spyLog::deleteOldSegments);
+
+        // Verify the metric was still updated in the finally block despite 
the exception
+        // After adding 10 more records (2 more segments), total = 4 segments 
= 20 records
+        // Percentage = (20 * 100) / 10 = 200%
+        assertEquals(200, yammerMetricValue(metricName),
+                "Metric should be updated in finally block even when exception 
occurs");
+    }
+
+    @SuppressWarnings("unchecked")
+    private Object yammerMetricValue(String name) {
+        Gauge<Object> gauge = (Gauge<Object>) 
KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+                .filter(e -> e.getKey().getMBeanName().endsWith(name))
+                .findFirst()
+                .get()
+                .getValue();
+        return gauge.value();
+    }
 }

Reply via email to