This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new efec0f57565 KAFKA-15267: Do not allow Tiered Storage to be disabled 
while topics have remote.storage.enable property (#14161)
efec0f57565 is described below

commit efec0f5756510bb02ee578b1a01dd8388237c14b
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Aug 30 01:04:20 2023 +0100

    KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have 
remote.storage.enable property (#14161)
    
    The purpose of this change is to not allow a broker to start up with Tiered 
Storage disabled (remote.log.storage.system.enable=false) while there are still 
topics that have 'remote.storage.enable' set.
    
    Reviewers: Kamal Chandraprakash<[email protected]>, Divij 
Vaidya <[email protected]>, Satish Duggana <[email protected]>, Luke Chen 
<[email protected]>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 11 ++++--
 .../main/scala/kafka/server/ConfigHandler.scala    |  4 ++-
 .../kafka/admin/RemoteTopicCrudTest.scala          | 42 ++++++++++++++++++++--
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  2 +-
 .../kafka/storage/internals/log/LogConfig.java     | 17 +++++----
 5 files changed, 62 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 78e48010e52..216063ae6ea 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -871,12 +871,17 @@ class LogManager(logDirs: Seq[File],
    * Update the configuration of the provided topic.
    */
   def updateTopicConfig(topic: String,
-                        newTopicConfig: Properties): Unit = {
+                        newTopicConfig: Properties,
+                        isRemoteLogStorageSystemEnabled: Boolean): Unit = {
     topicConfigUpdated(topic)
     val logs = logsByTopic(topic)
+    // Combine the default properties with the overrides in zk to create the 
new LogConfig
+    val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, 
newTopicConfig)
+    // We would like to validate the configuration no matter whether the logs 
have materialised on disk or not.
+    // Otherwise we risk someone creating a tiered-topic, disabling Tiered 
Storage cluster-wide and the check
+    // failing since the logs for the topic are non-existent.
+    LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), 
isRemoteLogStorageSystemEnabled, true)
     if (logs.nonEmpty) {
-      // Combine the default properties with the overrides in zk to create the 
new LogConfig
-      val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, 
newTopicConfig)
       logs.foreach { log =>
         val oldLogConfig = log.updateConfig(newLogConfig)
         if (oldLogConfig.compact && !newLogConfig.compact) {
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 0e8c55a1d69..02e57b4009f 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -66,9 +66,11 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     topicConfig.asScala.forKeyValue { (key, value) =>
       if (!configNamesToExclude.contains(key)) props.put(key, value)
     }
+
     val logs = logManager.logsByTopic(topic)
     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
-    logManager.updateTopicConfig(topic, props)
+
+    logManager.updateTopicConfig(topic, props, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
     maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 23439e120cf..6d8fbe1bbe7 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -21,11 +21,10 @@ import kafka.server.KafkaConfig
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.config.{ConfigException, ConfigResource, 
TopicConfig}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.utils.MockTime
-import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager,
-  RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, 
RemoteLogSegmentState}
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, 
RemoteLogSegmentMetadata, RemoteLogSegmentState}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
@@ -299,6 +298,43 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
       "Remote log segments should be deleted only once by the leader")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: 
String): Unit = {
+    val topicConfig = new Properties()
+    topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true")
+
+    TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, brokerCount,
+      topicConfig = topicConfig)
+
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).head
+    instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
+
+    if (isKRaftTest()) {
+      recreateBrokers(startup = true)
+      
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
+      // Normally the exception is thrown as part of the TearDown method of 
the parent class(es). We would like to not do this.
+      faultHandler.setIgnore(true)
+    } else {
+      assertThrows(classOf[ConfigException], () => recreateBrokers(startup = 
true))
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def 
testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum:
 String): Unit = {
+    val topicConfig = new Properties()
+    topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
false.toString)
+
+    TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, brokerCount,
+      topicConfig = topicConfig)
+
+    val tsDisabledProps = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).head
+    instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
+
+    recreateBrokers(startup = true)
+  }
+
   private def assertThrowsException(exceptionType: Class[_ <: Throwable],
                                     executable: Executable,
                                     message: String = ""): Throwable = {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 5cab80b1864..824ce7ea327 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -630,7 +630,7 @@ class LogManagerTest {
     val newProperties = new Properties()
     newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
 
-    spyLogManager.updateTopicConfig(topic, newProperties)
+    spyLogManager.updateTopicConfig(topic, newProperties, false)
 
     assertTrue(log0.config.delete)
     assertTrue(log1.config.delete)
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index e177dfcfcb7..d73a37485cb 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -548,21 +548,26 @@ public class LogConfig extends AbstractConfig {
      * @param props The properties to be validated
      */
     private static void validateTopicLogConfigValues(Map<?, ?> props,
-                                                     boolean 
isRemoteLogStorageSystemEnabled) {
+                                                    boolean 
isRemoteLogStorageSystemEnabled) {
         validateValues(props);
         boolean isRemoteLogStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
         if (isRemoteLogStorageEnabled) {
-            
validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled);
+            validateRemoteStorageOnlyIfSystemEnabled(props, 
isRemoteLogStorageSystemEnabled, false);
             validateNoRemoteStorageForCompactedTopic(props);
             validateRemoteStorageRetentionSize(props);
             validateRemoteStorageRetentionTime(props);
         }
     }
 
-    private static void validateRemoteStorageOnlyIfSystemEnabled(boolean 
isRemoteLogStorageSystemEnabled) {
-        if (!isRemoteLogStorageSystemEnabled) {
-            throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
-                    "Topic cannot be configured with remote log storage.");
+    public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> 
props, boolean isRemoteLogStorageSystemEnabled, boolean 
isReceivingConfigFromStore) {
+        boolean isRemoteLogStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+        if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
+            if (isReceivingConfigFromStore) {
+                throw new ConfigException("You have to delete all topics with 
the property remote.storage.enable=true before disabling tiered storage 
cluster-wide");
+            } else {
+                throw new ConfigException("Tiered Storage functionality is 
disabled in the broker. " +
+                        "Topic cannot be configured with remote log storage.");
+            }
         }
     }
 

Reply via email to