This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 7fe3cec4eb2 KAFKA-17236: Handle local log deletion when 
remote.log.copy.disabled=true (#16765)
7fe3cec4eb2 is described below

commit 7fe3cec4eb20ac60291e437a3dd9fa46f9fb2afc
Author: Luke Chen <[email protected]>
AuthorDate: Thu Aug 8 20:37:40 2024 +0900

    KAFKA-17236: Handle local log deletion when remote.log.copy.disabled=true 
(#16765)
    
    Handle local log deletion when remote.log.copy.disabled=true based on the 
KIP-950.
    
    When tiered storage is disabled or becomes read-only on a topic, the local 
retention configuration becomes irrelevant, and all data expiration follows the 
topic-wide retention configuration exclusively.
    
    - added remoteLogEnabledAndRemoteCopyEnabled method to check if this topic 
enables tiered storage and remote log copy is enabled. We should adopt 
local.retention.ms/bytes when 
remote.storage.enable=true,remote.log.copy.disable=false.
    - Changed to use retention.bytes/retention.ms when remote copy disabled.
    - Added validation to ask users to set local.retention.ms == retention.ms 
and local.retention.bytes == retention.bytes
    - Added tests
    
    Reviewers: Kamal Chandraprakash<[email protected]>, Satish 
Duggana <[email protected]>, Christo Lolov <[email protected]>
---
 .../apache/kafka/common/config/TopicConfig.java    |   4 +-
 core/src/main/scala/kafka/log/LogManager.scala     |   1 +
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  24 ++--
 .../kafka/admin/RemoteTopicCrudTest.scala          | 132 +++++++++++++++++++++
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |   7 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |  82 +++++++++++++
 .../kafka/storage/internals/log/LogConfig.java     |  21 ++++
 .../integration/DisableRemoteLogOnTopicTest.java   |  20 +++-
 8 files changed, 275 insertions(+), 16 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 04c6c487cd0..b3cea19c551 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -95,7 +95,9 @@ public class TopicConfig {
 
     public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = 
"remote.log.copy.disable";
     public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines 
whether tiered data for a topic should become read only," +
-            " and no more data uploading on a topic.";
+            " and no more data uploading on a topic. Once this config is set 
to true, the local retention configuration " +
+            "(i.e. local.retention.ms/bytes) becomes irrelevant, and all data 
expiration follows the topic-wide retention configuration" +
+            "(i.e. retention.ms/bytes).";
 
     public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = 
"remote.log.delete.on.disable";
     public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines 
whether tiered data for a topic should be " +
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 66fedbfae6d..7c04dee1f78 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -981,6 +981,7 @@ class LogManager(logDirs: Seq[File],
       
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
     }
     LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), 
wasRemoteLogEnabled, isRemoteLogStorageEnabled)
+    
LogConfig.validateRetentionConfigsWhenRemoteCopyDisabled(newLogConfig.values(), 
isRemoteLogStorageEnabled)
     if (logs.nonEmpty) {
       logs.foreach { log =>
         val oldLogConfig = log.updateConfig(newLogConfig)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index bef18806b0d..69b6787ca63 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1463,6 +1463,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  /**
+   * @return true if this topic enables tiered storage and remote log copy is 
enabled (i.e. remote.log.copy.disable=false)
+   */
+  private def remoteLogEnabledAndRemoteCopyEnabled(): Boolean = {
+    remoteLogEnabled() && !config.remoteLogCopyDisable()
+  }
+
   /**
    * Find segments starting from the oldest until the user-supplied predicate 
is false.
    * A final segment that is empty will never be returned.
@@ -1477,7 +1484,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       // Segments are eligible for deletion when:
       //    1. they are uploaded to the remote storage
       //    2. log-start-offset was incremented higher than the largest offset 
in the candidate segment
-      if (remoteLogEnabled()) {
+      // Note: when remote log copy is disabled, we will fall back to local 
log check using retention.ms/bytes
+      if (remoteLogEnabledAndRemoteCopyEnabled()) {
         (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage()) ||
           allowDeletionDueToLogStartOffsetIncremented
       } else {
@@ -1522,7 +1530,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def incrementStartOffset(startOffset: Long, reason: 
LogStartOffsetIncrementReason): Unit = {
-    if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset, 
reason)
+    if (remoteLogEnabledAndRemoteCopyEnabled()) 
maybeIncrementLocalLogStartOffset(startOffset, reason)
     else maybeIncrementLogStartOffset(startOffset, reason)
   }
 
@@ -1570,7 +1578,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteRetentionMsBreachedSegments(): Int = {
-    val retentionMs = localRetentionMs(config, remoteLogEnabled())
+    val retentionMs = localRetentionMs(config, 
remoteLogEnabledAndRemoteCopyEnabled())
     if (retentionMs < 0) return 0
     val startMs = time.milliseconds
 
@@ -1582,7 +1590,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteRetentionSizeBreachedSegments(): Int = {
-    val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
+    val retentionSize: Long = localRetentionSize(config, 
remoteLogEnabledAndRemoteCopyEnabled())
     if (retentionSize < 0 || size < retentionSize) return 0
     var diff = size - retentionSize
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
@@ -2338,12 +2346,12 @@ object UnifiedLog extends Logging {
     }
   }
 
-  private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: 
Boolean): Long = {
-    if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else 
config.retentionMs
+  private[log] def localRetentionMs(config: LogConfig, 
remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
+    if (remoteLogEnabledAndRemoteCopyEnabled) 
config.remoteLogConfig.localRetentionMs else config.retentionMs
   }
 
-  private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: 
Boolean): Long = {
-    if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else 
config.retentionSize
+  private[log] def localRetentionSize(config: LogConfig, 
remoteLogEnabledAndRemoteCopyEnabled: Boolean): Long = {
+    if (remoteLogEnabledAndRemoteCopyEnabled) 
config.remoteLogConfig.localRetentionBytes else config.retentionSize
   }
 
 }
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index c88962cb845..2983ba0e307 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -168,6 +168,138 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     verifyRemoteLogTopicConfigs(topicConfig)
   }
 
+  // `remote.log.delete.on.disable` only works in KRaft mode.
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testCreateTopicRetentionMsValidationWithRemoteCopyDisabled(quorum: 
String): Unit = {
+    val testTopicName2 = testTopicName + "2"
+    val testTopicName3 = testTopicName + "3"
+    val errorMsgMs = "When `remote.log.copy.disable` is set to true, the 
`local.retention.ms` and `retention.ms` " +
+      "must be set to the identical value because there will be no more logs 
copied to the remote storage."
+
+    // 1. create a topic with `remote.log.copy.disable=true` and have 
different local.retention.ms and retention.ms value,
+    //    it should fail to create the topic
+    val topicConfig = new Properties()
+    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
+
+    val admin = createAdminClient()
+    val err = assertThrowsException(classOf[InvalidConfigurationException],
+            () => TestUtils.createTopicWithAdmin(admin, testTopicName, 
brokers, controllerServers, numPartitions,
+              numReplicationFactor, topicConfig = topicConfig))
+    assertEquals(errorMsgMs, err.getMessage)
+
+    // 2. change the local.retention.ms value to the same value as 
retention.ms should successfully create the topic
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000")
+    TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    // 3. change the local.retention.ms value to "-2" should also successfully 
create the topic
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
+    TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    // 4. create a topic with `remote.log.copy.disable=false` and have 
different local.retention.ms and retention.ms value,
+    //    it should successfully creates the topic.
+    topicConfig.clear()
+    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
+    TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    // 5. alter the config to `remote.log.copy.disable=true`, it should fail 
the config change
+    val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET),
+      ))
+    val err2 = assertThrowsException(classOf[InvalidConfigurationException],
+      () => admin.incrementalAlterConfigs(configs).all().get())
+    assertEquals(errorMsgMs, err2.getMessage)
+
+    // 6. alter the config to `remote.log.copy.disable=true` and 
local.retention.ms == retention.ms, it should work without error
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1000"),
+          AlterConfigOp.OpType.SET),
+      ))
+
+    admin.incrementalAlterConfigs(configs).all().get()
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testCreateTopicRetentionBytesValidationWithRemoteCopyDisabled(quorum: 
String): Unit = {
+    val testTopicName2 = testTopicName + "2"
+    val testTopicName3 = testTopicName + "3"
+    val errorMsgBytes = "When `remote.log.copy.disable` is set to true, the 
`local.retention.bytes` and `retention.bytes` " +
+      "must be set to the identical value because there will be no more logs 
copied to the remote storage."
+
+    // 1. create a topic with `remote.log.copy.disable=true` and have 
different local.retention.bytes and retention.bytes value,
+    //    it should fail to create the topic
+    val topicConfig = new Properties()
+    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
+    topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
+
+    val admin = createAdminClient()
+    val err = assertThrowsException(classOf[InvalidConfigurationException],
+      () => TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions,
+        numReplicationFactor, topicConfig = topicConfig))
+    assertEquals(errorMsgBytes, err.getMessage)
+
+    // 2. change the local.retention.bytes value to the same value as 
retention.bytes should successfully create the topic
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000")
+    TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    // 3. change the local.retention.bytes value to "-2" should also 
successfully create the topic
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
+    TestUtils.createTopicWithAdmin(admin, testTopicName2, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    // 4. create a topic with `remote.log.copy.disable=false` and have 
different local.retention.bytes and retention.bytes value,
+    //    it should successfully creates the topic.
+    topicConfig.clear()
+    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "100")
+    topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
+    TestUtils.createTopicWithAdmin(admin, testTopicName3, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    // 5. alter the config to `remote.log.copy.disable=true`, it should fail 
the config change
+    val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET),
+      ))
+    val err2 = assertThrowsException(classOf[InvalidConfigurationException],
+      () => admin.incrementalAlterConfigs(configs).all().get())
+    assertEquals(errorMsgBytes, err2.getMessage)
+
+    // 6. alter the config to `remote.log.copy.disable=true` and 
local.retention.bytes == retention.bytes, it should work without error
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName3),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET),
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1000"),
+          AlterConfigOp.OpType.SET),
+      ))
+    admin.incrementalAlterConfigs(configs).all().get()
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala 
b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 706a5ec30f2..264dfb36cfe 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -36,6 +36,7 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfigs
 import org.apache.kafka.server.config.ServerLogConfigs
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
+import 
org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
 DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
FetchDataInfo, FetchIsolation, LazyIndex, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, 
ProducerStateManager, ProducerStateManagerConfig, TransactionIndex}
 
 import scala.jdk.CollectionConverters._
@@ -68,7 +69,9 @@ object LogTestUtils {
                       indexIntervalBytes: Int = 
ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT,
                       segmentIndexBytes: Int = 
ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT,
                       fileDeleteDelayMs: Long = 
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
-                      remoteLogStorageEnable: Boolean = 
LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE): LogConfig = {
+                      remoteLogStorageEnable: Boolean = 
LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE,
+                      remoteLogCopyDisable: Boolean = 
DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG,
+                      remoteLogDeleteOnDisable: Boolean = 
DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG): LogConfig = {
     val logProps = new Properties()
     logProps.put(TopicConfig.SEGMENT_MS_CONFIG, segmentMs: java.lang.Long)
     logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes: Integer)
@@ -83,6 +86,8 @@ object LogTestUtils {
     logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, segmentIndexBytes: 
Integer)
     logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, fileDeleteDelayMs: 
java.lang.Long)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
remoteLogStorageEnable: java.lang.Boolean)
+    logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
remoteLogCopyDisable: java.lang.Boolean)
+    logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
remoteLogDeleteOnDisable: java.lang.Boolean)
     new LogConfig(logProps)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index c18371854fe..b45c9171d66 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4187,6 +4187,88 @@ class UnifiedLogTest {
     assertEquals(1, log.logSegments.size)
   }
 
+  @Test
+  def testRetentionOnLocalLogDeletionWhenRemoteLogCopyDisabled(): Unit = {
+    def createRecords = TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "a".getBytes)))
+    val segmentBytes = createRecords.sizeInBytes()
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = segmentBytes, 
localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+          fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    // Given 6 segments of 1 message each
+    for (_ <- 0 until 6) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(6, log.logSegments.size)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    // Should not delete local log because highest remote storage offset is -1 
(default value)
+    log.deleteOldSegments()
+    assertEquals(6, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(0, log.localLogStartOffset())
+
+    // simulate calls to upload 2 segments to remote storage
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    log.deleteOldSegments()
+    assertEquals(4, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // add remoteCopyDisabled = true
+    val copyDisabledLogConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionBytes = 1, retentionBytes = segmentBytes * 5,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, 
remoteLogCopyDisable = true)
+    log.updateConfig(copyDisabledLogConfig)
+
+    // No local logs will be deleted even though local retention bytes is 1 
because we'll adopt retention.ms/bytes
+    // when remote.log.copy.disable = true
+    log.deleteOldSegments()
+    assertEquals(4, log.logSegments.size())
+    assertEquals(0, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // simulate the remote logs are all deleted due to retention policy
+    log.updateLogStartOffsetFromRemoteTier(2)
+    assertEquals(4, log.logSegments.size())
+    assertEquals(2, log.logStartOffset)
+    assertEquals(2, log.localLogStartOffset())
+
+    // produce 3 more segments
+    for (_ <- 0 until 3) {
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
+    }
+    assertEquals(7, log.logSegments.size)
+    log.updateHighWatermark(log.logEndOffset)
+
+    // try to delete local logs again, 2 segments will be deleted this time 
because we'll adopt retention.ms/bytes (retention.bytes = 5)
+    // when remote.log.copy.disable = true
+    log.deleteOldSegments()
+    assertEquals(5, log.logSegments.size())
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.localLogStartOffset())
+
+    // add localRetentionMs = 1, retentionMs = 1000
+    val retentionMsConfig = LogTestUtils.createLogConfig(segmentBytes = 
segmentBytes, localRetentionMs = 1, retentionMs = 1000,
+      fileDeleteDelayMs = 0, remoteLogStorageEnable = true, 
remoteLogCopyDisable = true)
+    log.updateConfig(retentionMsConfig)
+
+    // Should not delete any logs because no local logs expired using 
retention.ms = 1000
+    mockTime.sleep(10)
+    log.deleteOldSegments()
+    assertEquals(5, log.logSegments.size())
+    assertEquals(4, log.logStartOffset)
+    assertEquals(4, log.localLogStartOffset())
+
+    // Should delete all logs because all of them are expired based on 
retentionMs = 1000
+    mockTime.sleep(1000)
+    log.deleteOldSegments()
+    assertEquals(1, log.logSegments.size())
+    assertEquals(9, log.logStartOffset)
+    assertEquals(9, log.localLogStartOffset())
+  }
+
   @Test
   def testIncrementLocalLogStartOffsetAfterLocalLogDeletion(): Unit = {
     val logConfig = LogTestUtils.createLogConfig(localRetentionBytes = 1, 
fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 8d6df1ee93e..f3f151ddc18 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -184,6 +184,8 @@ public class LogConfig extends AbstractConfig {
     public static final long DEFAULT_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS = 
ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DEFAULT;
 
     public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
+    public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false;
+    public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = 
false;
     public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It 
indicates the value to be derived from RetentionBytes
     public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates 
the value to be derived from RetentionMs
 
@@ -635,6 +637,7 @@ public class LogConfig extends AbstractConfig {
             validateNoRemoteStorageForCompactedTopic(newConfigs);
             validateRemoteStorageRetentionSize(newConfigs);
             validateRemoteStorageRetentionTime(newConfigs);
+            validateRetentionConfigsWhenRemoteCopyDisabled(newConfigs, 
isRemoteLogStorageEnabled);
         } else {
             // The new config "remote.storage.enable" is false, validate if 
it's turning from true to false
             boolean wasRemoteLogEnabled = 
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"));
@@ -651,6 +654,24 @@ public class LogConfig extends AbstractConfig {
         }
     }
 
+    public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?, 
?> newConfigs, boolean isRemoteLogStorageEnabled) {
+        boolean isRemoteLogCopyDisabled = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
 false);
+        long retentionMs = (Long) 
newConfigs.get(TopicConfig.RETENTION_MS_CONFIG);
+        long localRetentionMs = (Long) 
newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
+        long retentionBytes = (Long) 
newConfigs.get(TopicConfig.RETENTION_BYTES_CONFIG);
+        long localRetentionBytes = (Long) 
newConfigs.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
+        if (isRemoteLogStorageEnabled && isRemoteLogCopyDisabled) {
+            if (localRetentionBytes != -2 && localRetentionBytes != 
retentionBytes) {
+                throw new InvalidConfigurationException("When 
`remote.log.copy.disable` is set to true, the `local.retention.bytes` " +
+                        "and `retention.bytes` must be set to the identical 
value because there will be no more logs copied to the remote storage.");
+            }
+            if (localRetentionMs != -2 && localRetentionMs != retentionMs) {
+                throw new InvalidConfigurationException("When 
`remote.log.copy.disable` is set to true, the `local.retention.ms` " +
+                        "and `retention.ms` must be set to the identical value 
because there will be no more logs copied to the remote storage.");
+            }
+        }
+    }
+
     public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> 
newConfigs) {
         boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
         boolean isRemoteLogCopyDisabled = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
 false);
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
index 6bcaea6a363..2655f600b3b 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
@@ -60,8 +60,16 @@ public final class DisableRemoteLogOnTopicTest extends 
TieredStorageTestHarness
         final Map<Integer, List<Integer>> assignment = mkMap(
                 mkEntry(p0, Arrays.asList(broker0, broker1))
         );
-        final Map<String, String> disableCopy = new HashMap<>();
-        disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
+        // local.retention.ms/bytes need to set to the same value as 
retention.ms/bytes when disabling remote log copy
+        final Map<String, String> disableRemoteCopy = new HashMap<>();
+        disableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
"true");
+        disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
"-2");
+        disableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2");
+
+        // revert the change to local.retention.bytes
+        final Map<String, String> enableRemoteCopy = new HashMap<>();
+        enableRemoteCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
"false");
+        enableRemoteCopy.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
"1");
 
         final Map<String, String> deleteOnDisable = new HashMap<>();
         deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"false");
@@ -78,7 +86,7 @@ public final class DisableRemoteLogOnTopicTest extends 
TieredStorageTestHarness
                         new KeyValueSpec("k2", "v2"))
                 // disable remote log copy
                 .updateTopicConfig(topicA,
-                        
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+                        disableRemoteCopy,
                         Collections.emptyList())
 
                 // make sure we can still consume from the beginning of the 
topic to read data from local and remote storage
@@ -87,16 +95,16 @@ public final class DisableRemoteLogOnTopicTest extends 
TieredStorageTestHarness
 
                 // re-enable remote log copy
                 .updateTopicConfig(topicA,
-                        
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
+                        enableRemoteCopy,
                         Collections.emptyList())
 
                 // make sure the logs can be offloaded
                 .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
                 .produce(topicA, p0, new KeyValueSpec("k3", "v3"))
 
-                // explicitly disable remote log copy
+                // disable remote log copy again
                 .updateTopicConfig(topicA,
-                        disableCopy,
+                        disableRemoteCopy,
                         Collections.emptyList())
                 // make sure we can still consume from the beginning of the 
topic to read data from local and remote storage
                 .expectFetchFromTieredStorage(broker0, topicA, p0, 3)

Reply via email to