This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new df9f405ec89 KAFKA-15290: Handle topic-level dynamic remote storage
enable configuration (#14238)
df9f405ec89 is described below
commit df9f405ec8910f16070ab9312cc3fba01c44f806
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Aug 24 21:05:48 2023 +0530
KAFKA-15290: Handle topic-level dynamic remote storage enable configuration
(#14238)
* KAFKA-15290: Handle topic-level dynamic remote log storage enable
configuration.
To onboard existing topics to tiered storage, bootstrap the
remote-log-components when updating the dynamic `remote.storage.enable` config
on the topic.
Reviewers: Christo Lolov <[email protected]>, Divij Vaidya
<[email protected], Luke Chen <[email protected]>, Satish Duggana
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../main/scala/kafka/server/ConfigHandler.scala | 39 +++++++++++--
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 64 +++++++++++++++++++++-
4 files changed, 97 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index bd55cc2571a..80155f65433 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -303,7 +303,7 @@ class BrokerServer(
groupCoordinator, transactionCoordinator)
dynamicConfigHandlers = Map[String, ConfigHandler](
- ConfigType.Topic -> new TopicConfigHandler(logManager, config,
quotaManagers, None),
+ ConfigType.Topic -> new TopicConfigHandler(replicaManager, config,
quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
val networkListeners = new ListenerCollection()
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 60de2c5d04f..0e8c55a1d69 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -18,10 +18,10 @@
package kafka.server
import java.net.{InetAddress, UnknownHostException}
-import java.util.Properties
+import java.util.{Collections, Properties}
import DynamicConfig.Broker._
import kafka.controller.KafkaController
-import kafka.log.LogManager
+import kafka.log.UnifiedLog
import kafka.network.ConnectionQuotas
import kafka.security.CredentialProvider
import kafka.server.Constants._
@@ -52,17 +52,44 @@ trait ConfigHandler {
* The TopicConfigHandler will process topic config changes from ZooKeeper or
the metadata log.
* The callback provides the topic name and the full properties set.
*/
-class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaConfig,
- val quotas: QuotaManagers, kafkaController:
Option[KafkaController]) extends ConfigHandler with Logging {
-
- def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
+class TopicConfigHandler(private val replicaManager: ReplicaManager,
+ kafkaConfig: KafkaConfig,
+ val quotas: QuotaManagers,
+ kafkaController: Option[KafkaController]) extends
ConfigHandler with Logging {
+
+ private def updateLogConfig(topic: String,
+ topicConfig: Properties): Unit = {
+ val logManager = replicaManager.logManager
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)
val props = new Properties()
topicConfig.asScala.forKeyValue { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
+ val logs = logManager.logsByTopic(topic)
+ val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
logManager.updateTopicConfig(topic, props)
+ maybeBootstrapRemoteLogComponents(topic, logs,
wasRemoteLogEnabledBeforeUpdate)
+ }
+
+ private[server] def maybeBootstrapRemoteLogComponents(topic: String,
+ logs: Seq[UnifiedLog],
+
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+ val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+ // Topic configs gets updated incrementally. This check is added to
prevent redundant updates.
+ if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
+ val (leaderPartitions, followerPartitions) =
+ logs.flatMap(log =>
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+ val topicIds = Collections.singletonMap(topic,
replicaManager.metadataCache.getTopicId(topic))
+ replicaManager.remoteLogManager.foreach(rlm =>
+ rlm.onLeadershipChange(leaderPartitions.toSet.asJava,
followerPartitions.toSet.asJava, topicIds))
+ } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
+ warn(s"Disabling remote log on the topic: $topic is not supported.")
+ }
+ }
+
+ def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
+ updateLogConfig(topic, topicConfig)
def updateThrottledList(prop: String, quotaManager:
ReplicationQuotaManager): Unit = {
if (topicConfig.containsKey(prop) &&
topicConfig.getProperty(prop).nonEmpty) {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 645ad40f9e6..3b2ad089316 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -560,7 +560,7 @@ class KafkaServer(
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
/* start dynamic config manager */
- dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic ->
new TopicConfigHandler(logManager, config, quotaManagers,
Some(kafkaController)),
+ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic ->
new TopicConfigHandler(replicaManager, config, quotaManagers,
Some(kafkaController)),
ConfigType.Client
-> new ClientIdConfigHandler(quotaManagers),
ConfigType.User ->
new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker
-> new BrokerConfigHandler(config, quotaManagers),
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 1a699a2a165..2751f0c71af 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -16,6 +16,8 @@
*/
package kafka.server
+import kafka.cluster.Partition
+
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
@@ -23,13 +25,15 @@ import java.util.Collections.{singletonList, singletonMap}
import java.util.{Collections, Properties}
import java.util.concurrent.ExecutionException
import kafka.integration.KafkaServerTestHarness
+import kafka.log.UnifiedLog
+import kafka.log.remote.RemoteLogManager
import kafka.utils._
import kafka.server.Constants._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config,
ConfigEntry}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidRequestException,
UnknownTopicOrPartitionException}
@@ -45,8 +49,9 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
-import org.mockito.Mockito.{mock, verify}
+import org.mockito.Mockito.{doNothing, mock, never, verify, when}
import scala.annotation.nowarn
import scala.collection.{Map, Seq}
@@ -539,4 +544,59 @@ class DynamicConfigChangeUnitTest {
//Then
assertEquals(Seq(), result)
}
+
+ @Test
+ def testEnableRemoteLogStorageOnTopic(): Unit = {
+ val topic = "test-topic"
+ val topicUuid = Uuid.randomUuid()
+ val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ val metadataCache = mock(classOf[MetadataCache])
+ when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+ when(replicaManager.metadataCache).thenReturn(metadataCache)
+ when(metadataCache.getTopicId(topic)).thenReturn(topicUuid)
+
+ val tp0 = new TopicPartition(topic, 0)
+ val log0: UnifiedLog = mock(classOf[UnifiedLog])
+ val partition0: Partition = mock(classOf[Partition])
+ when(log0.topicPartition).thenReturn(tp0)
+ when(log0.remoteLogEnabled()).thenReturn(true)
+ when(partition0.isLeader).thenReturn(true)
+ when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
+
+ val tp1 = new TopicPartition(topic, 1)
+ val log1: UnifiedLog = mock(classOf[UnifiedLog])
+ val partition1: Partition = mock(classOf[Partition])
+ when(log1.topicPartition).thenReturn(tp1)
+ when(log1.remoteLogEnabled()).thenReturn(true)
+ when(partition1.isLeader).thenReturn(false)
+ when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
+
+ val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] =
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+ val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] =
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+ doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(),
followerPartitionsArg.capture(), any())
+
+ val isRemoteLogEnabledBeforeUpdate = false
+ val configHandler: TopicConfigHandler = new
TopicConfigHandler(replicaManager, null, null, None)
+ configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1),
isRemoteLogEnabledBeforeUpdate)
+ assertEquals(Collections.singleton(partition0),
leaderPartitionsArg.getValue)
+ assertEquals(Collections.singleton(partition1),
followerPartitionsArg.getValue)
+ }
+
+ @Test
+ def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = {
+ val topic = "test-topic"
+ val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+
+ val log0: UnifiedLog = mock(classOf[UnifiedLog])
+ when(log0.remoteLogEnabled()).thenReturn(true)
+ doNothing().when(rlm).onLeadershipChange(any(), any(), any())
+
+ val isRemoteLogEnabledBeforeUpdate = true
+ val configHandler: TopicConfigHandler = new
TopicConfigHandler(replicaManager, null, null, None)
+ configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0),
isRemoteLogEnabledBeforeUpdate)
+ verify(rlm, never()).onLeadershipChange(any(), any(), any())
+ }
}