kamalcph commented on code in PR #14161: URL: https://github.com/apache/kafka/pull/14161#discussion_r1296143313
########## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ########## @@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness { server.shutdown() } + @Test + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = { + val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + + val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) + server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") + } + + val topicProps = new Properties() + topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) + + TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + + server.shutdown() + + val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + + assertThrows(classOf[ConfigException], () => TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))) + } + + @Test + def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit = { + val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + + var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) + server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") + } + + val topicProps = new Properties() + topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) + + TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + + server.shutdown() + + val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + + server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)) + + server.shutdown() + } + + @Test + def testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit = { + val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head + + val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps)) + + val topicProps = new Properties() + topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) + + TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + + server.shutdown() + + assertThrows(classOf[ConfigException], () => TestUtils.createServer(KafkaConfig.fromProps(serverProps))) + } + + @Test + def testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringDisabled(): Unit = { Review Comment: Can the test name be changed? The test doesn't fail anywhere when `remote.storage.enable=false`. Could you remove the `FailsOnStartup` from test name? ########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -62,6 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } + + val remoteLogStorageEnable = topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG) + + if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem() + && remoteLogStorageEnable != null + && remoteLogStorageEnable.toBoolean) { + throw new ConfigException(s"You have to delete all topics with the property remote.storage.enable (i.e. $topic) before disabling tiered storage cluster-wide") Review Comment: `delete` -> `disable`. We can hold on this till KAFKA-15290 gets reviewed. ########## core/src/test/scala/unit/kafka/server/KafkaServerTest.scala: ########## @@ -154,6 +155,96 @@ class KafkaServerTest extends QuorumTestHarness { server.shutdown() } + @Test + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(): Unit = { + val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + + val server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) + server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") + } + + val topicProps = new Properties() + topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) + + TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + + server.shutdown() + + val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + + assertThrows(classOf[ConfigException], () => TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps))) + } + + @Test + def testClusterWideDisablementOfTieredStorageWithDisabledTieredTopic(): Unit = { + val tsEnabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString) + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager") + tsEnabledProps.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + + var server = TestUtils.createServer(KafkaConfig.fromProps(tsEnabledProps)) + server.remoteLogManagerOpt match { + case Some(_) => + case None => fail("RemoteLogManager should be initialized") + } + + val topicProps = new Properties() + topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) + + TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) + + server.shutdown() + + val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnect).head + + server = TestUtils.createServer(KafkaConfig.fromProps(tsDisabledProps)) + + server.shutdown() + } + + @Test + def testClusterWithoutTieredStorageFailsOnStartupIfTopicWithTieringEnabled(): Unit = { + val serverProps = TestUtils.createBrokerConfigs(1, zkConnect).head + + val server = TestUtils.createServer(KafkaConfig.fromProps(serverProps)) + + val topicProps = new Properties() + topicProps.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, true.toString) + + TestUtils.createTopic(zkClient = server.zkClient, topic = "batman", servers = Seq(server), topicConfig = topicProps) Review Comment: If system-level tiered storage is not enabled, then topic creation with `remote.storage.enable=true` should fail now. ########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -62,6 +62,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } + + val remoteLogStorageEnable = topicConfig.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG) + + if (!kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem() Review Comment: Instead of `kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()` -> `kafkaConfig.isRemoteLogStorageSystemEnabled` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org