This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 57b6c2ef98d KAFKA-17360 local log retention ms/bytes "-2" is not 
treated correctly (#16996)
57b6c2ef98d is described below

commit 57b6c2ef98d8177ab0e43f7653c8079d4daa8789
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Aug 26 06:22:35 2024 +0800

    KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly 
(#16996)
    
    1) When the local.retention.ms/bytes is set to -2, we didn't replace it 
with the server-side retention.ms/bytes config, so the -2 local retention won't 
take effect.
    2) When setting retention.ms/bytes to -2, we can notice this log message:
    
    ```
    Deleting segment LogSegment(baseOffset=10045, size=1037087, 
lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to 
local log retention size -2 breach. Local log size after deletion will be 
13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]
    ```
    This is not helpful for users. We should replace -2 with real retention 
value when logging.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  4 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 48 ++++++++++++++++++++++
 .../kafka/storage/internals/log/LogConfig.java     | 10 ++---
 .../integration/BaseDeleteSegmentsTest.java        |  5 ++-
 4 files changed, 58 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index c8de1325496..0b5d6440c74 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -2308,11 +2308,11 @@ object UnifiedLog extends Logging {
   }
 
   private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: 
Boolean): Long = {
-    if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else 
config.retentionMs
+    if (remoteLogEnabled) config.localRetentionMs else config.retentionMs
   }
 
   private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: 
Boolean): Long = {
-    if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else 
config.retentionSize
+    if (remoteLogEnabled) config.localRetentionBytes else config.retentionSize
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 4976ff80eeb..4e85935088f 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4097,6 +4097,54 @@ class UnifiedLogTest {
     assertEquals(1, log.logSegments.size)
   }
 
+  @Test
+  def 
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes():
 Unit = {
+    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+    val segmentBytes = createRecords.sizeInBytes()
+    val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, retentionBytes = 1,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, retentionBytesConfig, 
remoteStorageSystemEnable = true)
+
+    // Given 6 segments of 1 message each
+    for (_ <- 0 until 6) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(6, log.logSegments.size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 2 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(1)
+    log.deleteOldSegments()
+    assertEquals(4, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+  }
+
+  @Test
+  def 
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs():
 Unit = {
+    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+    val segmentBytes = createRecords.sizeInBytes()
+    val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, retentionMs = 1000,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, retentionBytesConfig, 
remoteStorageSystemEnable = true)
+
+    // Given 6 segments of 1 message each
+    for (_ <- 0 until 6) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(6, log.logSegments.size)
+
+    log.updateHighWatermark(log.logEndOffset)
+    // simulate calls to upload 2 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    mockTime.sleep(1001)
+    log.deleteOldSegments()
+    assertEquals(4, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+  }
+
   @Test
   def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
     val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index bfe7fc3aa34..ffae6c30e7e 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -100,11 +100,11 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
-    public static class RemoteLogConfig {
+    private static class RemoteLogConfig {
 
-        public final boolean remoteStorageEnable;
-        public final long localRetentionMs;
-        public final long localRetentionBytes;
+        private final boolean remoteStorageEnable;
+        private final long localRetentionMs;
+        private final long localRetentionBytes;
 
         private RemoteLogConfig(LogConfig config) {
             this.remoteStorageEnable = 
config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
@@ -334,8 +334,8 @@ public class LogConfig extends AbstractConfig {
     public final List<String> leaderReplicationThrottledReplicas;
     public final List<String> followerReplicationThrottledReplicas;
     public final boolean messageDownConversionEnable;
-    public final RemoteLogConfig remoteLogConfig;
 
+    private final RemoteLogConfig remoteLogConfig;
     private final int maxMessageSize;
     private final Map<?, ?> props;
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
index 0147c9f7c54..11c3ca7ad09 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
 
@@ -54,8 +55,8 @@ public abstract class BaseDeleteSegmentsTest extends 
TieredStorageTestHarness {
                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
                 .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
-                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
-                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
+                .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0", 
"v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
                 // update the topic config such that it triggers the deletion 
of segments
                 .updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())
                 // expect that the three offloaded remote log segments are 
deleted

Reply via email to