This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f7e8d478a9 KAFKA-16855: remote log disable policy in KRaft (#16653)
9f7e8d478a9 is described below

commit 9f7e8d478a9252f03f6adae3736d13799f137d6a
Author: Luke Chen <[email protected]>
AuthorDate: Sat Aug 3 17:38:41 2024 +0900

    KAFKA-16855: remote log disable policy in KRaft (#16653)
    
    Reviewers: Kamal Chandraprakash <[email protected]>, Christo 
Lolov <[email protected]>
---
 .../apache/kafka/common/config/TopicConfig.java    |  19 ++--
 .../java/kafka/log/remote/RemoteLogManager.java    |  80 +++++++++-----
 core/src/main/scala/kafka/log/LogManager.scala     |  10 +-
 .../main/scala/kafka/server/ConfigHandler.scala    |  62 ++++++++---
 .../server/ControllerConfigurationValidator.scala  |   2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   5 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |   5 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     |  15 +--
 .../kafka/admin/RemoteTopicCrudTest.scala          |  49 +++++++--
 .../test/scala/unit/kafka/log/LogConfigTest.scala  |  75 +++++++++----
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   2 +-
 .../ControllerConfigurationValidatorTest.scala     |   4 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |  13 ++-
 .../kafka/storage/internals/log/LogConfig.java     |  62 +++++++----
 .../integration/DisableRemoteLogOnTopicTest.java   | 120 +++++++++++++++++++++
 15 files changed, 410 insertions(+), 113 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index 3689227d1fa..04c6c487cd0 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -93,17 +93,14 @@ public class TopicConfig {
             "deletes the old segments. Default value is -2, it represents 
`retention.bytes` value to be used. The effective value should always be " +
             "less than or equal to `retention.bytes` value.";
 
-    public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain";
-    public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete";
-
-    public static final String REMOTE_LOG_DISABLE_POLICY_CONFIG = 
"remote.log.disable.policy";
-
-    public static final String REMOTE_LOG_DISABLE_POLICY_DOC = 
String.format("Determines whether tiered data for a topic should be retained or 
" +
-            "deleted after tiered storage disablement on a topic. The two 
valid options are \"%s\" and \"%s\". If %s is " +
-            "selected then all data in remote will be kept post-disablement 
and will only be deleted when it breaches expiration " +
-            "thresholds. If %s is selected then the data will be made 
inaccessible immediately by advancing the log start offset and will be " +
-            "deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, 
REMOTE_LOG_DISABLE_POLICY_DELETE,
-            REMOTE_LOG_DISABLE_POLICY_RETAIN, 
REMOTE_LOG_DISABLE_POLICY_DELETE);
+    public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = 
"remote.log.copy.disable";
+    public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines 
whether tiered data for a topic should become read only," +
+            " and no more data uploading on a topic.";
+
+    public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = 
"remote.log.delete.on.disable";
+    public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines 
whether tiered data for a topic should be " +
+            "deleted after tiered storage is disabled on a topic. This 
configuration should be enabled when trying to " +
+            "set `remote.storage.enable` from true to false";
 
     public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
     public static final String MAX_MESSAGE_BYTES_DOC =
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index f3f0ccc11ad..f53f5a35623 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -437,30 +437,48 @@ public class RemoteLogManager implements Closeable {
             throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
         }
 
-        Set<TopicIdPartition> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
-                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
+        Map<TopicIdPartition, Boolean> leaderPartitions = 
filterPartitions(partitionsBecomeLeader)
+                .collect(Collectors.toMap(p -> new 
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+                        p -> p.log().exists(log -> 
log.config().remoteLogCopyDisable())));
 
-        Set<TopicIdPartition> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
-                .map(p -> new TopicIdPartition(topicIds.get(p.topic()), 
p.topicPartition())).collect(Collectors.toSet());
+        Map<TopicIdPartition, Boolean> followerPartitions = 
filterPartitions(partitionsBecomeFollower)
+                .collect(Collectors.toMap(p -> new 
TopicIdPartition(topicIds.get(p.topic()), p.topicPartition()),
+                        p -> p.log().exists(log -> 
log.config().remoteLogCopyDisable())));
 
         if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
             LOGGER.debug("Effective topic partitions after filtering compact 
and internal topics, leaders: {} and followers: {}",
                     leaderPartitions, followerPartitions);
 
-            leaderPartitions.forEach(this::cacheTopicPartitionIds);
-            followerPartitions.forEach(this::cacheTopicPartitionIds);
+            leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
+            followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp));
 
-            
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);
-            followerPartitions.forEach(this::doHandleFollowerPartition);
+            
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(),
 followerPartitions.keySet());
+            followerPartitions.forEach((tp, __) -> 
doHandleFollowerPartition(tp));
 
             // If this node was the previous leader for the partition, then 
the RLMTask might be running in the
             // background thread and might emit metrics. So, removing the 
metrics after marking this node as follower.
-            
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
+            followerPartitions.forEach((tp, __) -> 
removeRemoteTopicPartitionMetrics(tp));
 
             leaderPartitions.forEach(this::doHandleLeaderPartition);
         }
     }
 
+    public void stopLeaderCopyRLMTasks(Set<Partition> partitions) {
+        for (Partition partition : partitions) {
+            TopicPartition tp = partition.topicPartition();
+            if (topicIdByPartitionMap.containsKey(tp)) {
+                TopicIdPartition tpId = new 
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+                leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
+                    LOGGER.info("Cancelling the copy RLM task for tpId: {}", 
tpId);
+                    task.cancel();
+                    LOGGER.info("Resetting remote copy lag metrics for tpId: 
{}", tpId);
+                    ((RLMCopyTask) task.rlmTask).resetLagStats();
+                    return null;
+                });
+            }
+        }
+    }
+
     /**
      * Stop the remote-log-manager task for the given partitions. And, calls 
the
      * {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link 
StopPartition#deleteLocalLog()} is true.
@@ -507,16 +525,18 @@ public class RemoteLogManager implements Closeable {
                 LOGGER.error("Error while stopping the partition: {}", 
stopPartition, ex);
             }
         }
-        // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is 
true but not the other way around.
-        Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
-                .filter(sp -> sp.deleteLocalLog() && 
topicIdByPartitionMap.containsKey(sp.topicPartition()))
+
+        // We want to remote topicId map and stopPartition on RLMM for 
deleteLocalLog or stopRLMM partitions because
+        // in both case, they all mean the topic will not be held in this 
broker anymore.
+        // NOTE: In ZK mode, this#stopPartitions method is called when Replica 
state changes to Offline and ReplicaDeletionStarted
+        Set<TopicIdPartition> pendingActionsPartitions = 
stopPartitions.stream()
+                .filter(sp -> (sp.stopRemoteLogMetadataManager() || 
sp.deleteLocalLog()) && topicIdByPartitionMap.containsKey(sp.topicPartition()))
                 .map(sp -> new 
TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), 
sp.topicPartition()))
                 .collect(Collectors.toSet());
-        if (!deleteLocalPartitions.isEmpty()) {
-            // NOTE: In ZK mode, this#stopPartitions method is called when 
Replica state changes to Offline and
-            // ReplicaDeletionStarted
-            remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
-            deleteLocalPartitions.forEach(tpId -> 
topicIdByPartitionMap.remove(tpId.topicPartition()));
+
+        if (!pendingActionsPartitions.isEmpty()) {
+            pendingActionsPartitions.forEach(tpId -> 
topicIdByPartitionMap.remove(tpId.topicPartition()));
+            
remoteLogMetadataManager.onStopPartitions(pendingActionsPartitions);
         }
     }
 
@@ -986,6 +1006,13 @@ public class RemoteLogManager implements Closeable {
             }
         }
 
+        void resetLagStats() {
+            String topic = topicIdPartition.topic();
+            int partition = topicIdPartition.partition();
+            brokerTopicStats.recordRemoteCopyLagBytes(topic, partition, 0);
+            brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0);
+        }
+
         private Path toPathIfExists(File file) {
             return file.exists() ? file.toPath() : null;
         }
@@ -1794,20 +1821,23 @@ public class RemoteLogManager implements Closeable {
                 new RemoteLogReader(fetchInfo, this, callback, 
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
     }
 
-    void doHandleLeaderPartition(TopicIdPartition topicPartition) {
+    void doHandleLeaderPartition(TopicIdPartition topicPartition, Boolean 
remoteLogCopyDisable) {
         RLMTaskWithFuture followerRLMTaskWithFuture = 
followerRLMTasks.remove(topicPartition);
         if (followerRLMTaskWithFuture != null) {
             LOGGER.info("Cancelling the follower task: {}", 
followerRLMTaskWithFuture.rlmTask);
             followerRLMTaskWithFuture.cancel();
         }
 
-        leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> 
{
-            RLMCopyTask task = new RLMCopyTask(topicIdPartition, 
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
-            // set this upfront when it is getting initialized instead of 
doing it after scheduling.
-            LOGGER.info("Created a new copy task: {} and getting scheduled", 
task);
-            ScheduledFuture<?> future = 
rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
-            return new RLMTaskWithFuture(task, future);
-        });
+        // Only create copy task when remoteLogCopyDisable is disabled
+        if (!remoteLogCopyDisable) {
+            leaderCopyRLMTasks.computeIfAbsent(topicPartition, 
topicIdPartition -> {
+                RLMCopyTask task = new RLMCopyTask(topicIdPartition, 
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
+                // set this upfront when it is getting initialized instead of 
doing it after scheduling.
+                LOGGER.info("Created a new copy task: {} and getting 
scheduled", task);
+                ScheduledFuture<?> future = 
rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs, 
TimeUnit.MILLISECONDS);
+                return new RLMTaskWithFuture(task, future);
+            });
+        }
 
         leaderExpirationRLMTasks.computeIfAbsent(topicPartition, 
topicIdPartition -> {
             RLMExpirationTask task = new RLMExpirationTask(topicIdPartition);
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 95d2062bf89..66fedbfae6d 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -964,15 +964,23 @@ class LogManager(logDirs: Seq[File],
    */
   def updateTopicConfig(topic: String,
                         newTopicConfig: Properties,
-                        isRemoteLogStorageSystemEnabled: Boolean): Unit = {
+                        isRemoteLogStorageSystemEnabled: Boolean,
+                        wasRemoteLogEnabled: Boolean,
+                        fromZK: Boolean): Unit = {
     topicConfigUpdated(topic)
     val logs = logsByTopic(topic)
     // Combine the default properties with the overrides in zk to create the 
new LogConfig
     val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, 
newTopicConfig)
+    val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
     // We would like to validate the configuration no matter whether the logs 
have materialised on disk or not.
     // Otherwise we risk someone creating a tiered-topic, disabling Tiered 
Storage cluster-wide and the check
     // failing since the logs for the topic are non-existent.
     LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), 
isRemoteLogStorageSystemEnabled, true)
+    // `remote.log.delete.on.disable` and `remote.log.copy.disable` are 
unsupported in ZK mode
+    if (fromZK) {
+      
LogConfig.validateNoInvalidRemoteStorageConfigsInZK(newLogConfig.values())
+    }
+    LogConfig.validateTurningOffRemoteStorageWithDelete(newLogConfig.values(), 
wasRemoteLogEnabled, isRemoteLogStorageEnabled)
     if (logs.nonEmpty) {
       logs.foreach { log =>
         val oldLogConfig = log.updateConfig(newLogConfig)
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 2d478cfc30a..9f183b63ea6 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -26,14 +26,14 @@ import kafka.server.Constants._
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Implicits._
 import kafka.utils.Logging
-import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs, 
ZooKeeperInternals}
+import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs, 
ZooKeeperInternals}
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.ClientMetricsManager
-import org.apache.kafka.storage.internals.log.ThrottledReplicaListValidator
+import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason, 
ThrottledReplicaListValidator}
 import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
 
 import scala.annotation.nowarn
@@ -68,25 +68,61 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     }
 
     val logs = logManager.logsByTopic(topic)
-    val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
+    val wasRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    val wasCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
 
-    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
+    // kafkaController is only defined in Zookeeper's mode
+    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
+      wasRemoteLogEnabled, kafkaController.isDefined)
+    maybeUpdateRemoteLogComponents(topic, logs, wasRemoteLogEnabled, 
wasCopyDisabled)
   }
 
-  private[server] def maybeBootstrapRemoteLogComponents(topic: String,
-                                                        logs: Seq[UnifiedLog],
-                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+  private[server] def maybeUpdateRemoteLogComponents(topic: String,
+                                                     logs: Seq[UnifiedLog],
+                                                     wasRemoteLogEnabled: 
Boolean,
+                                                     wasCopyDisabled: 
Boolean): Unit = {
     val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    val isCopyDisabled = logs.exists(_.config.remoteLogCopyDisable())
+    val isDeleteOnDisable = logs.exists(_.config.remoteLogDeleteOnDisable())
+
+    val (leaderPartitions, followerPartitions) =
+      logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+
     // Topic configs gets updated incrementally. This check is added to 
prevent redundant updates.
-    if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
-      val (leaderPartitions, followerPartitions) =
-        logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+    // When remote log is enabled, or remote copy is enabled, we should create 
RLM tasks accordingly via `onLeadershipChange`.
+    if (isRemoteLogEnabled && (!wasRemoteLogEnabled || (wasCopyDisabled && 
!isCopyDisabled))) {
       val topicIds = Collections.singletonMap(topic, 
replicaManager.metadataCache.getTopicId(topic))
       replicaManager.remoteLogManager.foreach(rlm =>
         rlm.onLeadershipChange(leaderPartitions.toSet.asJava, 
followerPartitions.toSet.asJava, topicIds))
-    } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
-      warn(s"Disabling remote log on the topic: $topic is not supported.")
+    }
+
+    // When copy disabled, we should stop leaderCopyRLMTask, but keep 
expirationTask
+    if (isRemoteLogEnabled && !wasCopyDisabled && isCopyDisabled) {
+      replicaManager.remoteLogManager.foreach(rlm => {
+        rlm.stopLeaderCopyRLMTasks(leaderPartitions.toSet.asJava);
+      })
+    }
+
+    // Disabling remote log storage on this topic
+    if (wasRemoteLogEnabled && !isRemoteLogEnabled && isDeleteOnDisable) {
+      val stopPartitions: java.util.HashSet[StopPartition] = new 
java.util.HashSet[StopPartition]()
+      leaderPartitions.foreach(partition => {
+        // delete remote logs and stop RemoteLogMetadataManager
+        stopPartitions.add(StopPartition(partition.topicPartition, 
deleteLocalLog = false,
+          deleteRemoteLog = true, stopRemoteLogMetadataManager = true))
+      })
+
+      followerPartitions.foreach(partition => {
+        // we need to cancel follower tasks and stop RemoteLogMetadataManager
+        stopPartitions.add(StopPartition(partition.topicPartition, 
deleteLocalLog = false,
+          deleteRemoteLog = false, stopRemoteLogMetadataManager = true))
+      })
+
+      // update the log start offset to local log start offset for the leader 
replicas
+      logs.filter(log => leaderPartitions.exists(p => 
p.topicPartition.equals(log.topicPartition)))
+        .foreach(log => 
log.maybeIncrementLogStartOffset(log.localLogStartOffset(), 
LogStartOffsetIncrementReason.SegmentDeletion))
+
+      replicaManager.remoteLogManager.foreach(rlm => 
rlm.stopPartitions(stopPartitions, (_, _) => {}))
     }
   }
 
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 06a60e30076..49083e3a0cd 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -109,7 +109,7 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
             nullTopicConfigs.mkString(","))
         }
         LogConfig.validate(oldConfigs, properties, 
kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
false)
       case BROKER => validateBrokerName(resource.name())
       case CLIENT_METRICS =>
         val properties = new Properties()
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ebd151a032..11f2e4ff03c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -99,7 +99,10 @@ case class LogDeleteRecordsResult(requestedOffset: Long, 
lowWatermark: Long, exc
   }
 }
 
-case class StopPartition(topicPartition: TopicPartition, deleteLocalLog: 
Boolean, deleteRemoteLog: Boolean = false)
+case class StopPartition(topicPartition: TopicPartition,
+                         deleteLocalLog: Boolean,
+                         deleteRemoteLog: Boolean = false,
+                         stopRemoteLogMetadataManager: Boolean = false)
 
 /**
  * Result metadata of a log read operation on the log
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 8db20583e24..15c95b998b6 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -163,7 +163,8 @@ class AdminZkClient(zkClient: KafkaZkClient,
 
     LogConfig.validate(Collections.emptyMap(), config,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-      
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()),
+      true)
   }
 
   private def writeTopicPartitionAssignment(topic: String, replicaAssignment: 
Map[Int, ReplicaAssignment],
@@ -481,7 +482,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
     // remove the topic overrides
     LogConfig.validate(Collections.emptyMap(), configs,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-      
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()), 
true)
   }
 
   /**
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4e5f70f9337..f27e39c1f1d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -1309,12 +1309,12 @@ public class RemoteLogManagerTest {
         verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
 
         // Evicts from topicId cache
-        remoteLogManager.stopPartitions(Collections.singleton(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) 
-> { });
+        remoteLogManager.stopPartitions(Collections.singleton(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)), (tp, 
ex) -> { });
         verifyNotInCache(leaderTopicIdPartition);
         verifyInCache(followerTopicIdPartition);
 
         // Evicts from topicId cache
-        remoteLogManager.stopPartitions(Collections.singleton(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) 
-> { });
+        remoteLogManager.stopPartitions(Collections.singleton(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)), 
(tp, ex) -> { });
         verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
     }
 
@@ -1344,7 +1344,7 @@ public class RemoteLogManagerTest {
 
         spyRemoteLogManager.onLeadershipChange(
             Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
-        
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition));
+        
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition), 
eq(false));
     }
 
     private MemoryRecords records(long timestamp,
@@ -1837,8 +1837,8 @@ public class RemoteLogManagerTest {
         remoteLogManager.startup();
         BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, 
throwable) -> fail("shouldn't be called");
         Set<StopPartition> partitions = new HashSet<>();
-        partitions.add(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, false));
-        partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, false));
+        partitions.add(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false));
+        partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, false, false));
         
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
                 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
         assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@@ -1860,8 +1860,8 @@ public class RemoteLogManagerTest {
         BiConsumer<TopicPartition, Throwable> errorHandler =
                 (topicPartition, ex) -> fail("shouldn't be called: " + ex);
         Set<StopPartition> partitions = new HashSet<>();
-        partitions.add(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, true));
-        partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, true));
+        partitions.add(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
+        partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
         
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
                 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
         assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
@@ -3208,6 +3208,7 @@ public class RemoteLogManagerTest {
         when(partition.topic()).thenReturn(tp.topic());
         when(log.remoteLogEnabled()).thenReturn(true);
         when(partition.log()).thenReturn(Option.apply(log));
+        when(log.config()).thenReturn(new LogConfig(new Properties()));
         return partition;
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 462698734a0..25f366bfc94 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -156,12 +156,13 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
         topicConfig = topicConfig))
   }
 
+  // `remote.log.delete.on.disable` and `remote.log.copy.disable` only works 
in KRaft mode.
   @ParameterizedTest
-  @CsvSource(Array("zk,retain", "zk,delete", "kraft,retain", "kraft,delete"))
-  def testCreateRemoteTopicWithDisablePolicyRetain(quorum: String, policy: 
String): Unit = {
+  @CsvSource(Array("kraft,true,true", "kraft,true,false", "kraft,false,true", 
"kraft,false,false"))
+  def testCreateRemoteTopicWithCopyDisabledAndDeleteOnDisable(quorum: String, 
copyDisabled: Boolean, deleteOnDisable: Boolean): Unit = {
     val topicConfig = new Properties()
-    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
-    topicConfig.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
+    topicConfig.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
copyDisabled.toString)
+    topicConfig.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
deleteOnDisable.toString)
     TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, controllerServers, numPartitions, numReplicationFactor,
       topicConfig = topicConfig)
     verifyRemoteLogTopicConfigs(topicConfig)
@@ -311,6 +312,35 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
       () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling 
remote storage feature on the topic level is not supported.")
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testUpdateInvalidRemoteStorageConfigUnderZK(quorum: String): Unit = {
+    val admin = createAdminClient()
+    val errorMsg = "It is invalid to set `remote.log.delete.on.disable` or 
`remote.log.copy.disable` under Zookeeper's mode."
+    val topicConfig = new Properties
+    topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true")
+    TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+      topicConfig = topicConfig)
+
+    val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET),
+      ))
+    assertThrowsException(classOf[InvalidConfigurationException],
+      () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
+
+    configs.clear()
+    configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+      util.Arrays.asList(
+        new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
+          AlterConfigOp.OpType.SET),
+      ))
+    assertThrowsException(classOf[InvalidConfigurationException],
+      () => admin.incrementalAlterConfigs(configs).all().get(), errorMsg)
+  }
+
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testTopicDeletion(quorum: String): Unit = {
@@ -409,10 +439,15 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
             topicConfig.getProperty(TopicConfig.RETENTION_BYTES_CONFIG).toLong 
==
               logBuffer.head.config.retentionSize
         }
-        if 
(topicConfig.contains(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG)) {
+        if (topicConfig.contains(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG)) {
+          result = result &&
+            
topicConfig.getProperty(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG).toBoolean ==
+              logBuffer.head.config.remoteLogCopyDisable()
+        }
+        if 
(topicConfig.contains(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG)) {
           result = result &&
-            
topicConfig.getProperty(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG).equals(
-              logBuffer.head.config.remoteLogDisablePolicy())
+            
topicConfig.getProperty(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG).toBoolean
 ==
+              logBuffer.head.config.remoteLogDeleteOnDisable()
         }
       }
       result
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 4e7e4e23b38..8126bb08b07 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -100,7 +100,8 @@ class LogConfigTest {
       case TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-2")
       case TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-1")
       case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-0.1")
-      case TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "remove", "0", "true")
+      case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "remove", "0")
+      case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "remove", "0")
 
       case _ => assertPropertyInvalid(name, "not_a_number", "-1")
     })
@@ -300,7 +301,7 @@ class LogConfigTest {
     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs.toString)
     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes.toString)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), props, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
   }
 
   @Test
@@ -312,17 +313,17 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
-    LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+    LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
 
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
   }
 
   @ParameterizedTest(name = "testEnableRemoteLogStorage with 
sysRemoteStorageEnabled: {0}")
@@ -335,10 +336,10 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
     } else {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
       assertTrue(message.getMessage.contains("Tiered Storage functionality is 
disabled in the broker"))
     }
   }
@@ -355,11 +356,20 @@ class LogConfigTest {
     if (wasRemoteStorageEnabled) {
       val message = assertThrows(classOf[InvalidConfigurationException],
         () => 
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
-          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
-      assertTrue(message.getMessage.contains("Disabling remote storage feature 
on the topic level is not supported."))
+          logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
+      assertTrue(message.getMessage.contains("It is invalid to disable remote 
storage without deleting remote data. " +
+        "If you want to keep the remote data and turn to read only, please set 
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+        "If you want to disable remote storage and delete all remote data, 
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
+
+
+      // It should be able to disable the remote log storage when delete on 
disable is set to true
+      logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
+      
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "true"),
+        logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
-      
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"), logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
+      
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"), logProps,
+        kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
     }
   }
 
@@ -378,10 +388,12 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
+        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true)
     }
   }
 
@@ -400,10 +412,12 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
+        () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+      LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap,
+        kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), 
true)
     }
   }
 
@@ -426,19 +440,34 @@ class LogConfigTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, 
TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE))
-  def testValidRemoteLogDisablePolicy(policy: String): Unit = {
+  @ValueSource(booleans = Array(true, false))
+  def testValidRemoteLogCopyDisabled(copyDisabled: Boolean): Unit = {
     val logProps = new Properties
-    logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
+    logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, 
copyDisabled.toString)
     LogConfig.validate(logProps)
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("keep", "remove"))
-  def testInvalidRemoteLogDisablePolicy(policy: String): Unit = {
+  @ValueSource(booleans = Array(true, false))
+  def testValidRemoteLogDeleteOnDisable(deleteOnDisable: Boolean): Unit = {
     val logProps = new Properties
-    logProps.put(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, policy)
-    assertThrows(classOf[ConfigException], () => LogConfig.validate(logProps))
+    logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
deleteOnDisable.toString)
+    LogConfig.validate(logProps)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = 
Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
+  def testInValidRemoteConfigsInZK(configKey: String): Unit = {
+    val kafkaProps = TestUtils.createDummyBrokerConfig()
+    
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true")
+    val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+    val logProps = new Properties
+    logProps.put(configKey, "true")
+
+    val message = assertThrows(classOf[InvalidConfigurationException],
+      () => LogConfig.validate(Collections.emptyMap(), logProps, 
kafkaConfig.extractLogConfigMap, true, true))
+    assertTrue(message.getMessage.contains("It is invalid to set 
`remote.log.delete.on.disable` or " +
+      "`remote.log.copy.disable` under Zookeeper's mode."))
   }
 
   /* Verify that when the deprecated config 
LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG has non default value the new 
configs
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b5a029a7411..ceb2342446f 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -797,7 +797,7 @@ class LogManagerTest {
     val newProperties = new Properties()
     newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
 
-    spyLogManager.updateTopicConfig(topic, newProperties, 
isRemoteLogStorageSystemEnabled = false)
+    spyLogManager.updateTopicConfig(topic, newProperties, 
isRemoteLogStorageSystemEnabled = false, wasRemoteLogEnabled = false, fromZK = 
false)
 
     assertTrue(log0.config.delete)
     assertTrue(log1.config.delete)
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 250f07ca23e..4cf5ad70cee 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -93,7 +93,9 @@ class ControllerConfigurationValidatorTest {
     val config = new util.TreeMap[String, String]()
     config.put(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
     if (wasRemoteStorageEnabled) {
-      assertEquals("Disabling remote storage feature on the topic level is not 
supported.",
+      assertEquals("It is invalid to disable remote storage without deleting 
remote data. " +
+        "If you want to keep the remote data and turn to read only, please set 
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+        "If you want to disable remote storage and delete all remote data, 
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.",
         assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
           new ConfigResource(TOPIC, "foo"), config, 
util.Collections.singletonMap(REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true"))).getMessage)
     } else {
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cc4623b1277..dcec4a75d96 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
 import org.apache.kafka.server.config.{ConfigType, QuotaConfigs, 
ServerLogConfigs, ZooKeeperInternals}
+import org.apache.kafka.storage.internals.log.LogConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
@@ -659,6 +660,7 @@ class DynamicConfigChangeUnitTest {
     when(log0.remoteLogEnabled()).thenReturn(true)
     when(partition0.isLeader).thenReturn(true)
     when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
+    when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))
 
     val tp1 = new TopicPartition(topic, 1)
     val log1: UnifiedLog = mock(classOf[UnifiedLog])
@@ -667,6 +669,7 @@ class DynamicConfigChangeUnitTest {
     when(log1.remoteLogEnabled()).thenReturn(true)
     when(partition1.isLeader).thenReturn(false)
     when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
+    when(log1.config).thenReturn(new LogConfig(Collections.emptyMap()))
 
     val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
     val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
@@ -674,7 +677,7 @@ class DynamicConfigChangeUnitTest {
 
     val isRemoteLogEnabledBeforeUpdate = false
     val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, None)
-    configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1), 
isRemoteLogEnabledBeforeUpdate)
+    configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0, log1), 
isRemoteLogEnabledBeforeUpdate, false)
     assertEquals(Collections.singleton(partition0), 
leaderPartitionsArg.getValue)
     assertEquals(Collections.singleton(partition1), 
followerPartitionsArg.getValue)
   }
@@ -682,17 +685,23 @@ class DynamicConfigChangeUnitTest {
   @Test
   def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = {
     val topic = "test-topic"
+    val tp0 = new TopicPartition(topic, 0)
     val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
     val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    val partition: Partition = mock(classOf[Partition])
     when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+    when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition))
 
     val log0: UnifiedLog = mock(classOf[UnifiedLog])
     when(log0.remoteLogEnabled()).thenReturn(true)
     doNothing().when(rlm).onLeadershipChange(any(), any(), any())
+    when(log0.config).thenReturn(new LogConfig(Collections.emptyMap()))
+    when(log0.topicPartition).thenReturn(tp0)
+    when(partition.isLeader).thenReturn(true)
 
     val isRemoteLogEnabledBeforeUpdate = true
     val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, None)
-    configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0), 
isRemoteLogEnabledBeforeUpdate)
+    configHandler.maybeUpdateRemoteLogComponents(topic, Seq(log0), 
isRemoteLogEnabledBeforeUpdate, false)
     verify(rlm, never()).onLeadershipChange(any(), any(), any())
   }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index d026ea3b4b1..a0ca52678ad 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -113,13 +113,15 @@ public class LogConfig extends AbstractConfig {
     public static class RemoteLogConfig {
 
         public final boolean remoteStorageEnable;
-        public final String remoteLogDisablePolicy;
+        public final boolean remoteLogDeleteOnDisable;
+        public final boolean remoteLogCopyDisable;
         public final long localRetentionMs;
         public final long localRetentionBytes;
 
         private RemoteLogConfig(LogConfig config) {
             this.remoteStorageEnable = 
config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-            this.remoteLogDisablePolicy = 
config.getString(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG);
+            this.remoteLogCopyDisable = 
config.getBoolean(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG);
+            this.remoteLogDeleteOnDisable = 
config.getBoolean(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG);
             this.localRetentionMs = 
config.getLong(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG);
             this.localRetentionBytes = 
config.getLong(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG);
         }
@@ -128,7 +130,8 @@ public class LogConfig extends AbstractConfig {
         public String toString() {
             return "RemoteLogConfig{" +
                     "remoteStorageEnable=" + remoteStorageEnable +
-                    ", remoteLogDisablePolicy=" + remoteLogDisablePolicy +
+                    ", remoteLogCopyDisable=" + remoteLogCopyDisable +
+                    ", remoteLogDeleteOnDisable=" + remoteLogDeleteOnDisable +
                     ", localRetentionMs=" + localRetentionMs +
                     ", localRetentionBytes=" + localRetentionBytes +
                     '}';
@@ -204,7 +207,8 @@ public class LogConfig extends AbstractConfig {
     // Visible for testing
     public static final Set<String> CONFIGS_WITH_NO_SERVER_DEFAULTS = 
Collections.unmodifiableSet(Utils.mkSet(
             TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
-            TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG,
+            TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
+            TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
             QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG,
             QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
     ));
@@ -325,9 +329,8 @@ public class LogConfig extends AbstractConfig {
                         TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
                 .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
                         TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
-                .define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, 
TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
-                        in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, 
TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
-                        MEDIUM, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DOC);
+                .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, 
false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
+                .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
     }
 
     public final Set<String> overriddenConfigs;
@@ -508,8 +511,12 @@ public class LogConfig extends AbstractConfig {
         return remoteLogConfig.remoteStorageEnable;
     }
 
-    public String remoteLogDisablePolicy() {
-        return remoteLogConfig.remoteLogDisablePolicy;
+    public Boolean remoteLogDeleteOnDisable() {
+        return remoteLogConfig.remoteLogDeleteOnDisable;
+    }
+
+    public Boolean remoteLogCopyDisable() {
+        return remoteLogConfig.remoteLogCopyDisable;
     }
 
     public long localRetentionMs() {
@@ -613,11 +620,17 @@ public class LogConfig extends AbstractConfig {
      * @param existingConfigs                   The existing properties
      * @param newConfigs                        The new properties to be 
validated
      * @param isRemoteLogStorageSystemEnabled   true if system wise remote log 
storage is enabled
+     * @param fromZK                            true if this is a ZK cluster
      */
     private static void validateTopicLogConfigValues(Map<String, String> 
existingConfigs,
                                                      Map<?, ?> newConfigs,
-                                                     boolean 
isRemoteLogStorageSystemEnabled) {
+                                                     boolean 
isRemoteLogStorageSystemEnabled,
+                                                     boolean fromZK) {
         validateValues(newConfigs);
+
+        if (fromZK) {
+            validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
+        }
         boolean isRemoteLogStorageEnabled = (Boolean) 
newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
         if (isRemoteLogStorageEnabled) {
             validateRemoteStorageOnlyIfSystemEnabled(newConfigs, 
isRemoteLogStorageSystemEnabled, false);
@@ -626,14 +639,26 @@ public class LogConfig extends AbstractConfig {
             validateRemoteStorageRetentionTime(newConfigs);
         } else {
             // The new config "remote.storage.enable" is false, validate if 
it's turning from true to false
-            validateNotTurningOffRemoteStorage(existingConfigs);
+            boolean wasRemoteLogEnabled = 
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"));
+            validateTurningOffRemoteStorageWithDelete(newConfigs, 
wasRemoteLogEnabled, isRemoteLogStorageEnabled);
+        }
+    }
+
+    public static void validateTurningOffRemoteStorageWithDelete(Map<?, ?> 
newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
+        boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
+        if (wasRemoteLogEnabled && !isRemoteLogStorageEnabled && 
!isRemoteLogDeleteOnDisable) {
+            throw new InvalidConfigurationException("It is invalid to disable 
remote storage without deleting remote data. " +
+                    "If you want to keep the remote data and turn to read 
only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
+                    "If you want to disable remote storage and delete all 
remote data, please set 
`remote.storage.enable=false,remote.log.delete.on.disable=true`.");
         }
     }
 
-    public static void validateNotTurningOffRemoteStorage(Map<String, String> 
existingConfigs) {
-        boolean wasRemoteLogEnabledBeforeUpdate = 
Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
 "false"));
-        if (wasRemoteLogEnabledBeforeUpdate) {
-            throw new InvalidConfigurationException("Disabling remote storage 
feature on the topic level is not supported.");
+    public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> 
newConfigs) {
+        boolean isRemoteLogDeleteOnDisable = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
 false);
+        boolean isRemoteLogCopyDisabled = (Boolean) 
Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG,
 false);
+        if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
+            throw new InvalidConfigurationException("It is invalid to set 
`remote.log.delete.on.disable` or " +
+                    "`remote.log.copy.disable` under Zookeeper's mode.");
         }
     }
 
@@ -694,13 +719,14 @@ public class LogConfig extends AbstractConfig {
      * Check that the given properties contain only valid log config names and 
that all values can be parsed and are valid
      */
     public static void validate(Properties props) {
-        validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
+        validate(Collections.emptyMap(), props, Collections.emptyMap(), false, 
false);
     }
 
     public static void validate(Map<String, String> existingConfigs,
                                 Properties props,
                                 Map<?, ?> configuredProps,
-                                boolean isRemoteLogStorageSystemEnabled) {
+                                boolean isRemoteLogStorageSystemEnabled,
+                                boolean fromZK) {
         validateNames(props);
         if (configuredProps == null || configuredProps.isEmpty()) {
             Map<?, ?> valueMaps = CONFIG.parse(props);
@@ -709,7 +735,7 @@ public class LogConfig extends AbstractConfig {
             Map<Object, Object> combinedConfigs = new 
HashMap<>(configuredProps);
             combinedConfigs.putAll(props);
             Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
-            validateTopicLogConfigValues(existingConfigs, valueMaps, 
isRemoteLogStorageSystemEnabled);
+            validateTopicLogConfigValues(existingConfigs, valueMaps, 
isRemoteLogStorageSystemEnabled, fromZK);
         }
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
new file mode 100644
index 00000000000..6bcaea6a363
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DisableRemoteLogOnTopicTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public final class DisableRemoteLogOnTopicTest extends 
TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 2;
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"kraft"})
+    public void executeTieredStorageTest(String quorum) {
+        super.executeTieredStorageTest(quorum);
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final Integer broker1 = 1;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 2;
+        final Integer maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final Map<Integer, List<Integer>> assignment = mkMap(
+                mkEntry(p0, Arrays.asList(broker0, broker1))
+        );
+        final Map<String, String> disableCopy = new HashMap<>();
+        disableCopy.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true");
+
+        final Map<String, String> deleteOnDisable = new HashMap<>();
+        deleteOnDisable.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"false");
+        deleteOnDisable.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
"true");
+
+        builder
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
+                        enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // disable remote log copy
+                .updateTopicConfig(topicA,
+                        
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "true"),
+                        Collections.emptyList())
+
+                // make sure we can still consume from the beginning of the 
topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+                .consume(topicA, p0, 0L, 3, 2)
+
+                // re-enable remote log copy
+                .updateTopicConfig(topicA,
+                        
Collections.singletonMap(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, "false"),
+                        Collections.emptyList())
+
+                // make sure the logs can be offloaded
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+                .produce(topicA, p0, new KeyValueSpec("k3", "v3"))
+
+                // explicitly disable remote log copy
+                .updateTopicConfig(topicA,
+                        disableCopy,
+                        Collections.emptyList())
+                // make sure we can still consume from the beginning of the 
topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 3)
+                .consume(topicA, p0, 0L, 4, 3)
+
+                // verify the remote retention policy is working.
+                // Use DELETE_RECORDS API to delete the records upto offset 1 
and expect 1 remote segment to be deleted
+                .expectDeletionInRemoteStorage(broker0, topicA, p0, 
DELETE_SEGMENT, 1)
+                .deleteRecords(topicA, p0, 1L)
+                .waitForRemoteLogSegmentDeletion(topicA)
+
+                // disabling remote log on topicA and enabling deleteOnDisable
+                .updateTopicConfig(topicA,
+                        deleteOnDisable,
+                        Collections.emptyList())
+                // make sure all remote data is deleted
+                .expectEmptyRemoteStorage(topicA, p0)
+                // verify the local log is still consumable
+                .consume(topicA, p0, 3L, 1, 0);
+    }
+}

Reply via email to