This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 67c3d966f5c KAFKA-15267: Do not allow Tiered Storage to be disabled
while topics have remote.storage.enable property (#14161)
67c3d966f5c is described below
commit 67c3d966f5c6d32cfd8b410a98867190932c22d3
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Aug 30 01:04:20 2023 +0100
KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have
remote.storage.enable property (#14161)
The purpose of this change is to not allow a broker to start up with Tiered
Storage disabled (remote.log.storage.system.enable=false) while there are still
topics that have 'remote.storage.enable' set.
Reviewers: Kamal Chandraprakash<[email protected]>, Divij
Vaidya <[email protected]>, Satish Duggana <[email protected]>, Luke Chen
<[email protected]>
---
core/src/main/scala/kafka/log/LogManager.scala | 11 ++++--
.../main/scala/kafka/server/ConfigHandler.scala | 4 ++-
.../kafka/admin/RemoteTopicCrudTest.scala | 42 ++++++++++++++++++++--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +-
.../kafka/storage/internals/log/LogConfig.java | 17 +++++----
5 files changed, 62 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 78e48010e52..216063ae6ea 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -871,12 +871,17 @@ class LogManager(logDirs: Seq[File],
* Update the configuration of the provided topic.
*/
def updateTopicConfig(topic: String,
- newTopicConfig: Properties): Unit = {
+ newTopicConfig: Properties,
+ isRemoteLogStorageSystemEnabled: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
+ // Combine the default properties with the overrides in zk to create the
new LogConfig
+ val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals,
newTopicConfig)
+ // We would like to validate the configuration no matter whether the logs
have materialised on disk or not.
+ // Otherwise we risk someone creating a tiered-topic, disabling Tiered
Storage cluster-wide and the check
+ // failing since the logs for the topic are non-existent.
+ LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(),
isRemoteLogStorageSystemEnabled, true)
if (logs.nonEmpty) {
- // Combine the default properties with the overrides in zk to create the
new LogConfig
- val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals,
newTopicConfig)
logs.foreach { log =>
val oldLogConfig = log.updateConfig(newLogConfig)
if (oldLogConfig.compact && !newLogConfig.compact) {
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 0e8c55a1d69..02e57b4009f 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -66,9 +66,11 @@ class TopicConfigHandler(private val replicaManager:
ReplicaManager,
topicConfig.asScala.forKeyValue { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
+
val logs = logManager.logsByTopic(topic)
val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
- logManager.updateTopicConfig(topic, props)
+
+ logManager.updateTopicConfig(topic, props,
kafkaConfig.isRemoteLogStorageSystemEnabled)
maybeBootstrapRemoteLogComponents(topic, logs,
wasRemoteLogEnabledBeforeUpdate)
}
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 23439e120cf..6d8fbe1bbe7 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -21,11 +21,10 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.config.{ConfigException, ConfigResource,
TopicConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.MockTime
-import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager,
- RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata,
RemoteLogSegmentState}
+import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId,
RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
@@ -299,6 +298,43 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
"Remote log segments should be deleted only once by the leader")
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum:
String): Unit = {
+ val topicConfig = new Properties()
+ topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"true")
+
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, brokerCount,
+ topicConfig = topicConfig)
+
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
+ instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
+
+ if (isKRaftTest()) {
+ recreateBrokers(startup = true)
+
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
+ // Normally the exception is thrown as part of the TearDown method of
the parent class(es). We would like to not do this.
+ faultHandler.setIgnore(true)
+ } else {
+ assertThrows(classOf[ConfigException], () => recreateBrokers(startup =
true))
+ }
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def
testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum:
String): Unit = {
+ val topicConfig = new Properties()
+ topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
false.toString)
+
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, brokerCount,
+ topicConfig = topicConfig)
+
+ val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
+ instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
+
+ recreateBrokers(startup = true)
+ }
+
private def assertThrowsException(exceptionType: Class[_ <: Throwable],
executable: Executable,
message: String = ""): Throwable = {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 5cab80b1864..824ce7ea327 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -630,7 +630,7 @@ class LogManagerTest {
val newProperties = new Properties()
newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
- spyLogManager.updateTopicConfig(topic, newProperties)
+ spyLogManager.updateTopicConfig(topic, newProperties, false)
assertTrue(log0.config.delete)
assertTrue(log1.config.delete)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index e177dfcfcb7..d73a37485cb 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -548,21 +548,26 @@ public class LogConfig extends AbstractConfig {
* @param props The properties to be validated
*/
private static void validateTopicLogConfigValues(Map<?, ?> props,
- boolean
isRemoteLogStorageSystemEnabled) {
+ boolean
isRemoteLogStorageSystemEnabled) {
validateValues(props);
boolean isRemoteLogStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
-
validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled);
+ validateRemoteStorageOnlyIfSystemEnabled(props,
isRemoteLogStorageSystemEnabled, false);
validateNoRemoteStorageForCompactedTopic(props);
validateRemoteStorageRetentionSize(props);
validateRemoteStorageRetentionTime(props);
}
}
- private static void validateRemoteStorageOnlyIfSystemEnabled(boolean
isRemoteLogStorageSystemEnabled) {
- if (!isRemoteLogStorageSystemEnabled) {
- throw new ConfigException("Tiered Storage functionality is
disabled in the broker. " +
- "Topic cannot be configured with remote log storage.");
+ public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?>
props, boolean isRemoteLogStorageSystemEnabled, boolean
isReceivingConfigFromStore) {
+ boolean isRemoteLogStorageEnabled = (Boolean)
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
+ if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
+ if (isReceivingConfigFromStore) {
+ throw new ConfigException("You have to delete all topics with
the property remote.storage.enable=true before disabling tiered storage
cluster-wide");
+ } else {
+ throw new ConfigException("Tiered Storage functionality is
disabled in the broker. " +
+ "Topic cannot be configured with remote log storage.");
+ }
}
}