This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41611b4bd2e MINOR: Followup KAFKA-19112 document updated (#20492)
41611b4bd2e is described below
commit 41611b4bd2e708a3a30c7b8f5d89aa274a64676c
Author: Ken Huang <[email protected]>
AuthorDate: Sun Sep 28 19:06:06 2025 +0800
MINOR: Followup KAFKA-19112 document updated (#20492)
Some sections are not very clear, and we need to update the
documentation.
Reviewers: TengYao Chi <[email protected]>, Jun Rao
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 2 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 46 +++++++++++++++++++++-
docs/upgrade.html | 10 ++++-
.../kafka/storage/internals/log/UnifiedLog.java | 4 +-
4 files changed, 56 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b45a08b067..44b28a1f07e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition,
def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult =
inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
- if (!leaderLog.config.delete)
+ if (!leaderLog.config.delete && leaderLog.config.compact)
throw new PolicyViolationException(s"Records of partition
$topicPartition can not be deleted due to the configured policy")
val convertedOffset = if (offset ==
DeleteRecordsRequest.HIGH_WATERMARK)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 8e512ad4d01..5662c2d2276 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric
import kafka.log.LogManager
import kafka.server._
import kafka.utils._
-import org.apache.kafka.common.errors.{ApiException,
FencedLeaderEpochException, InconsistentTopicIdException,
InvalidTxnStateException, NotLeaderOrFollowerException,
OffsetNotAvailableException, OffsetOutOfRangeException,
UnknownLeaderEpochException}
+import org.apache.kafka.common.errors.{ApiException,
FencedLeaderEpochException, InconsistentTopicIdException,
InvalidTxnStateException, NotLeaderOrFollowerException,
OffsetNotAvailableException, OffsetOutOfRangeException,
PolicyViolationException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData,
FetchResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation,
FetchParams, Unexpec
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig,
EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader,
LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments,
LogStartOffsetIncrementReason, ProducerStateManager,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig,
EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel,
LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments,
LogStartOffsetIncrementReason, ProducerStateManager,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -4030,4 +4030,46 @@ class PartitionTest extends AbstractPartitionTest {
alterPartitionManager)
partition.tryCompleteDelayedRequests()
}
+
+ @Test
+ def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = {
+ val leaderEpoch = 5
+ val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
+
+ val emptyPolicyConfig = new LogConfig(util.Map.of(
+ TopicConfig.CLEANUP_POLICY_CONFIG, ""
+ ))
+
+ val mockLog = mock(classOf[UnifiedLog])
+ when(mockLog.config).thenReturn(emptyPolicyConfig)
+ when(mockLog.logEndOffset).thenReturn(2L)
+ when(mockLog.logStartOffset).thenReturn(0L)
+ when(mockLog.highWatermark).thenReturn(2L)
+ when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
+
+ partition.setLog(mockLog, false)
+
+ val result = partition.deleteRecordsOnLeader(1L)
+ assertEquals(1L, result.requestedOffset)
+ }
+
+ @Test
+ def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = {
+ val leaderEpoch = 5
+ val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
+
+ val emptyPolicyConfig = new LogConfig(util.Map.of(
+ TopicConfig.CLEANUP_POLICY_CONFIG, "compact"
+ ))
+
+ val mockLog = mock(classOf[UnifiedLog])
+ when(mockLog.config).thenReturn(emptyPolicyConfig)
+ when(mockLog.logEndOffset).thenReturn(2L)
+ when(mockLog.logStartOffset).thenReturn(0L)
+ when(mockLog.highWatermark).thenReturn(2L)
+ when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
+
+ partition.setLog(mockLog, false)
+ assertThrows(classOf[PolicyViolationException], () =>
partition.deleteRecordsOnLeader(1L))
+ }
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 66e05d90a5d..81af2d65261 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -136,9 +136,17 @@
settings.
</li>
<li>
- The <code>cleanup.policy</code> is empty and
<code>remote.storage.enable</code> is set to true, the
+ <code>cleanup.policy</code> now supports empty values, which
means infinite retention.
+ This is equivalent to setting <code>retention.ms=-1</code> and
<code>retention.bytes=-1</code>
+ <br>
+ If <code>cleanup.policy</code> is empty and
<code>remote.storage.enable</code> is set to true, the
local log segments will be cleaned based on the values of
<code>log.local.retention.bytes</code> and
<code>log.local.retention.ms</code>.
+ <br>
+ If <code>cleanup.policy</code> is empty and
<code>remote.storage.enable</code> is set to false,
+ local log segments will not be deleted automatically. However,
records can still be deleted
+ explicitly through <code>deleteRecords</code> API calls, which
will advance the log start offset
+ and remove the corresponding log segments.
</li>
</ul>
</li>
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index d190c861998..c683ffc6c55 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -1934,8 +1934,8 @@ public class UnifiedLog implements AutoCloseable {
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
} else {
- // If cleanup.policy is empty and remote storage is disabled,
we should not delete any local
- // log segments
+ // If cleanup.policy is empty and remote storage is disabled,
we should not delete any local log segments
+ // unless the log start offset advances through deleteRecords
return deleteLogStartOffsetBreachedSegments();
}
}