This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41611b4bd2e MINOR: Followup KAFKA-19112 document updated (#20492)
41611b4bd2e is described below

commit 41611b4bd2e708a3a30c7b8f5d89aa274a64676c
Author: Ken Huang <[email protected]>
AuthorDate: Sun Sep 28 19:06:06 2025 +0800

    MINOR: Followup KAFKA-19112 document updated (#20492)
    
    Some sections are not very clear, and we need to update the
    documentation.
    
    Reviewers: TengYao Chi <[email protected]>, Jun Rao
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 46 +++++++++++++++++++++-
 docs/upgrade.html                                  | 10 ++++-
 .../kafka/storage/internals/log/UnifiedLog.java    |  4 +-
 4 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b45a08b067..44b28a1f07e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition,
   def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = 
inReadLock(leaderIsrUpdateLock) {
     leaderLogIfLocal match {
       case Some(leaderLog) =>
-        if (!leaderLog.config.delete)
+        if (!leaderLog.config.delete && leaderLog.config.compact)
           throw new PolicyViolationException(s"Records of partition 
$topicPartition can not be deleted due to the configured policy")
 
         val convertedOffset = if (offset == 
DeleteRecordsRequest.HIGH_WATERMARK)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 8e512ad4d01..5662c2d2276 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric
 import kafka.log.LogManager
 import kafka.server._
 import kafka.utils._
-import org.apache.kafka.common.errors.{ApiException, 
FencedLeaderEpochException, InconsistentTopicIdException, 
InvalidTxnStateException, NotLeaderOrFollowerException, 
OffsetNotAvailableException, OffsetOutOfRangeException, 
UnknownLeaderEpochException}
+import org.apache.kafka.common.errors.{ApiException, 
FencedLeaderEpochException, InconsistentTopicIdException, 
InvalidTxnStateException, NotLeaderOrFollowerException, 
OffsetNotAvailableException, OffsetOutOfRangeException, 
PolicyViolationException, UnknownLeaderEpochException}
 import org.apache.kafka.common.message.{AlterPartitionResponseData, 
FetchResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
FetchParams, Unexpec
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, 
LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, 
LogStartOffsetIncrementReason, ProducerStateManager, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, 
EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, 
LogStartOffsetIncrementReason, ProducerStateManager, 
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -4030,4 +4030,46 @@ class PartitionTest extends AbstractPartitionTest {
       alterPartitionManager)
     partition.tryCompleteDelayedRequests()
   }
+
+  @Test
+  def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = {
+    val leaderEpoch = 5
+    val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
+
+    val emptyPolicyConfig = new LogConfig(util.Map.of(
+      TopicConfig.CLEANUP_POLICY_CONFIG, ""
+    ))
+
+    val mockLog = mock(classOf[UnifiedLog])
+    when(mockLog.config).thenReturn(emptyPolicyConfig)
+    when(mockLog.logEndOffset).thenReturn(2L)
+    when(mockLog.logStartOffset).thenReturn(0L)
+    when(mockLog.highWatermark).thenReturn(2L)
+    when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
+
+    partition.setLog(mockLog, false)
+
+    val result = partition.deleteRecordsOnLeader(1L)
+    assertEquals(1L, result.requestedOffset)
+  }
+
+  @Test
+  def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = {
+    val leaderEpoch = 5
+    val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
+
+    val emptyPolicyConfig = new LogConfig(util.Map.of(
+      TopicConfig.CLEANUP_POLICY_CONFIG, "compact"
+    ))
+
+    val mockLog = mock(classOf[UnifiedLog])
+    when(mockLog.config).thenReturn(emptyPolicyConfig)
+    when(mockLog.logEndOffset).thenReturn(2L)
+    when(mockLog.logStartOffset).thenReturn(0L)
+    when(mockLog.highWatermark).thenReturn(2L)
+    when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
+
+    partition.setLog(mockLog, false)
+    assertThrows(classOf[PolicyViolationException], () =>  
partition.deleteRecordsOnLeader(1L))
+  }
 }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 66e05d90a5d..81af2d65261 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -136,9 +136,17 @@
                 settings.
             </li>
             <li>
-                The <code>cleanup.policy</code> is empty and 
<code>remote.storage.enable</code> is set to true, the
+                <code>cleanup.policy</code> now supports empty values, which 
means infinite retention.
+                This is equivalent to setting <code>retention.ms=-1</code> and 
<code>retention.bytes=-1</code>
+                <br>
+                If <code>cleanup.policy</code> is empty and 
<code>remote.storage.enable</code> is set to true, the
                 local log segments will be cleaned based on the values of 
<code>log.local.retention.bytes</code> and
                 <code>log.local.retention.ms</code>.
+                <br>
+                If <code>cleanup.policy</code> is empty and 
<code>remote.storage.enable</code> is set to false,
+                local log segments will not be deleted automatically. However, 
records can still be deleted
+                explicitly through <code>deleteRecords</code> API calls, which 
will advance the log start offset
+                and remove the corresponding log segments.
             </li>
         </ul>
     </li>
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index d190c861998..c683ffc6c55 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -1934,8 +1934,8 @@ public class UnifiedLog implements AutoCloseable {
                         deleteRetentionSizeBreachedSegments() +
                         deleteRetentionMsBreachedSegments();
             } else {
-                // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local 
-                // log segments
+                // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local log segments 
+                // unless the log start offset advances through deleteRecords
                 return deleteLogStartOffsetBreachedSegments();
             }
         }

Reply via email to