This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new 57b6c2ef98d KAFKA-17360 local log retention ms/bytes "-2" is not
treated correctly (#16996)
57b6c2ef98d is described below
commit 57b6c2ef98d8177ab0e43f7653c8079d4daa8789
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Aug 26 06:22:35 2024 +0800
KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly
(#16996)
1) When the local.retention.ms/bytes is set to -2, we didn't replace it
with the server-side retention.ms/bytes config, so the -2 local retention won't
take effect.
2) When setting retention.ms/bytes to -2, we can notice this log message:
```
Deleting segment LogSegment(baseOffset=10045, size=1037087,
lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to
local log retention size -2 breach. Local log size after deletion will be
13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]
```
This is not helpful for users. We should replace -2 with real retention
value when logging.
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 4 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 48 ++++++++++++++++++++++
.../kafka/storage/internals/log/LogConfig.java | 10 ++---
.../integration/BaseDeleteSegmentsTest.java | 5 ++-
4 files changed, 58 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index c8de1325496..0b5d6440c74 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -2308,11 +2308,11 @@ object UnifiedLog extends Logging {
}
private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled:
Boolean): Long = {
- if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else
config.retentionMs
+ if (remoteLogEnabled) config.localRetentionMs else config.retentionMs
}
private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled:
Boolean): Long = {
- if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else
config.retentionSize
+ if (remoteLogEnabled) config.localRetentionBytes else config.retentionSize
}
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 4976ff80eeb..4e85935088f 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4097,6 +4097,54 @@ class UnifiedLogTest {
assertEquals(1, log.logSegments.size)
}
+ @Test
+ def
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionBytes():
Unit = {
+ def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+ val segmentBytes = createRecords.sizeInBytes()
+ val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, retentionBytes = 1,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+ val log = createLog(logDir, retentionBytesConfig,
remoteStorageSystemEnable = true)
+
+ // Given 6 segments of 1 message each
+ for (_ <- 0 until 6) {
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ }
+ assertEquals(6, log.logSegments.size)
+
+ log.updateHighWatermark(log.logEndOffset)
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1)
+ log.deleteOldSegments()
+ assertEquals(4, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+ }
+
+ @Test
+ def
testRetentionOnLocalLogDeletionWhenRemoteLogCopyEnabledAndDefaultLocalRetentionMs():
Unit = {
+ def createRecords = TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+ val segmentBytes = createRecords.sizeInBytes()
+ val retentionBytesConfig = LogTestUtils.createLogConfig(segmentBytes =
segmentBytes, retentionMs = 1000,
+ fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+ val log = createLog(logDir, retentionBytesConfig,
remoteStorageSystemEnable = true)
+
+ // Given 6 segments of 1 message each
+ for (_ <- 0 until 6) {
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+ }
+ assertEquals(6, log.logSegments.size)
+
+ log.updateHighWatermark(log.logEndOffset)
+ // simulate calls to upload 2 segments to remote storage
+ log.updateHighestOffsetInRemoteStorage(1)
+
+ mockTime.sleep(1001)
+ log.deleteOldSegments()
+ assertEquals(4, log.logSegments.size())
+ assertEquals(0, log.logStartOffset)
+ assertEquals(2, log.localLogStartOffset())
+ }
+
@Test
def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1,
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index bfe7fc3aa34..ffae6c30e7e 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -100,11 +100,11 @@ public class LogConfig extends AbstractConfig {
}
}
- public static class RemoteLogConfig {
+ private static class RemoteLogConfig {
- public final boolean remoteStorageEnable;
- public final long localRetentionMs;
- public final long localRetentionBytes;
+ private final boolean remoteStorageEnable;
+ private final long localRetentionMs;
+ private final long localRetentionBytes;
private RemoteLogConfig(LogConfig config) {
this.remoteStorageEnable =
config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
@@ -334,8 +334,8 @@ public class LogConfig extends AbstractConfig {
public final List<String> leaderReplicationThrottledReplicas;
public final List<String> followerReplicationThrottledReplicas;
public final boolean messageDownConversionEnable;
- public final RemoteLogConfig remoteLogConfig;
+ private final RemoteLogConfig remoteLogConfig;
private final int maxMessageSize;
private final Map<?, ?> props;
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
index 0147c9f7c54..11c3ca7ad09 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
@@ -54,8 +55,8 @@ public abstract class BaseDeleteSegmentsTest extends
TieredStorageTestHarness {
.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
- .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
- new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3"))
+ .produceWithTimestamp(topicA, p0, new KeyValueSpec("k0",
"v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3", System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1)))
// update the topic config such that it triggers the deletion
of segments
.updateTopicConfig(topicA, configsToBeAdded(),
Collections.emptyList())
// expect that the three offloaded remote log segments are
deleted