This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new df9f405ec89 KAFKA-15290: Handle topic-level dynamic remote storage 
enable configuration (#14238)
df9f405ec89 is described below

commit df9f405ec8910f16070ab9312cc3fba01c44f806
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Aug 24 21:05:48 2023 +0530

    KAFKA-15290: Handle topic-level dynamic remote storage enable configuration 
(#14238)
    
    * KAFKA-15290: Handle topic-level dynamic remote log storage enable 
configuration.
    
    To onboard existing topics to tiered storage, bootstrap the 
remote-log-components when updating the dynamic `remote.storage.enable` config 
on the topic.
    
    Reviewers: Christo Lolov <[email protected]>, Divij Vaidya 
<[email protected], Luke Chen <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ConfigHandler.scala    | 39 +++++++++++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../kafka/server/DynamicConfigChangeTest.scala     | 64 +++++++++++++++++++++-
 4 files changed, 97 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index bd55cc2571a..80155f65433 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -303,7 +303,7 @@ class BrokerServer(
         groupCoordinator, transactionCoordinator)
 
       dynamicConfigHandlers = Map[String, ConfigHandler](
-        ConfigType.Topic -> new TopicConfigHandler(logManager, config, 
quotaManagers, None),
+        ConfigType.Topic -> new TopicConfigHandler(replicaManager, config, 
quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
       val networkListeners = new ListenerCollection()
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 60de2c5d04f..0e8c55a1d69 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -18,10 +18,10 @@
 package kafka.server
 
 import java.net.{InetAddress, UnknownHostException}
-import java.util.Properties
+import java.util.{Collections, Properties}
 import DynamicConfig.Broker._
 import kafka.controller.KafkaController
-import kafka.log.LogManager
+import kafka.log.UnifiedLog
 import kafka.network.ConnectionQuotas
 import kafka.security.CredentialProvider
 import kafka.server.Constants._
@@ -52,17 +52,44 @@ trait ConfigHandler {
   * The TopicConfigHandler will process topic config changes from ZooKeeper or 
the metadata log.
   * The callback provides the topic name and the full properties set.
   */
-class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: 
KafkaConfig,
-                         val quotas: QuotaManagers, kafkaController: 
Option[KafkaController]) extends ConfigHandler with Logging  {
-
-  def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
+class TopicConfigHandler(private val replicaManager: ReplicaManager,
+                         kafkaConfig: KafkaConfig,
+                         val quotas: QuotaManagers,
+                         kafkaController: Option[KafkaController]) extends 
ConfigHandler with Logging  {
+
+  private def updateLogConfig(topic: String,
+                              topicConfig: Properties): Unit = {
+    val logManager = replicaManager.logManager
     // Validate the configurations.
     val configNamesToExclude = excludedConfigs(topic, topicConfig)
     val props = new Properties()
     topicConfig.asScala.forKeyValue { (key, value) =>
       if (!configNamesToExclude.contains(key)) props.put(key, value)
     }
+    val logs = logManager.logsByTopic(topic)
+    val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
     logManager.updateTopicConfig(topic, props)
+    maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
+  }
+
+  private[server] def maybeBootstrapRemoteLogComponents(topic: String,
+                                                        logs: Seq[UnifiedLog],
+                                                        
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
+    val isRemoteLogEnabled = logs.exists(_.remoteLogEnabled())
+    // Topic configs gets updated incrementally. This check is added to 
prevent redundant updates.
+    if (!wasRemoteLogEnabledBeforeUpdate && isRemoteLogEnabled) {
+      val (leaderPartitions, followerPartitions) =
+        logs.flatMap(log => 
replicaManager.onlinePartition(log.topicPartition)).partition(_.isLeader)
+      val topicIds = Collections.singletonMap(topic, 
replicaManager.metadataCache.getTopicId(topic))
+      replicaManager.remoteLogManager.foreach(rlm =>
+        rlm.onLeadershipChange(leaderPartitions.toSet.asJava, 
followerPartitions.toSet.asJava, topicIds))
+    } else if (wasRemoteLogEnabledBeforeUpdate && !isRemoteLogEnabled) {
+      warn(s"Disabling remote log on the topic: $topic is not supported.")
+    }
+  }
+
+  def processConfigChanges(topic: String, topicConfig: Properties): Unit = {
+    updateLogConfig(topic, topicConfig)
 
     def updateThrottledList(prop: String, quotaManager: 
ReplicationQuotaManager): Unit = {
       if (topicConfig.containsKey(prop) && 
topicConfig.getProperty(prop).nonEmpty) {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 645ad40f9e6..3b2ad089316 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -560,7 +560,7 @@ class KafkaServer(
         
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
 
         /* start dynamic config manager */
-        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager, config, quotaManagers, 
Some(kafkaController)),
+        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(replicaManager, config, quotaManagers, 
Some(kafkaController)),
                                                            ConfigType.Client 
-> new ClientIdConfigHandler(quotaManagers),
                                                            ConfigType.User -> 
new UserConfigHandler(quotaManagers, credentialProvider),
                                                            ConfigType.Broker 
-> new BrokerConfigHandler(config, quotaManagers),
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 1a699a2a165..2751f0c71af 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -16,6 +16,8 @@
   */
 package kafka.server
 
+import kafka.cluster.Partition
+
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.util
@@ -23,13 +25,15 @@ import java.util.Collections.{singletonList, singletonMap}
 import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
 import kafka.integration.KafkaServerTestHarness
+import kafka.log.UnifiedLog
+import kafka.log.remote.RemoteLogManager
 import kafka.utils._
 import kafka.server.Constants._
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, 
ConfigEntry}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.apache.kafka.common.errors.{InvalidRequestException, 
UnknownTopicOrPartitionException}
@@ -45,8 +49,9 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, anyString}
-import org.mockito.Mockito.{mock, verify}
+import org.mockito.Mockito.{doNothing, mock, never, verify, when}
 
 import scala.annotation.nowarn
 import scala.collection.{Map, Seq}
@@ -539,4 +544,59 @@ class DynamicConfigChangeUnitTest {
     //Then
     assertEquals(Seq(), result)
   }
+
+  @Test
+  def testEnableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val topicUuid = Uuid.randomUuid()
+    val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    val metadataCache = mock(classOf[MetadataCache])
+    when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+    when(replicaManager.metadataCache).thenReturn(metadataCache)
+    when(metadataCache.getTopicId(topic)).thenReturn(topicUuid)
+
+    val tp0 = new TopicPartition(topic, 0)
+    val log0: UnifiedLog = mock(classOf[UnifiedLog])
+    val partition0: Partition = mock(classOf[Partition])
+    when(log0.topicPartition).thenReturn(tp0)
+    when(log0.remoteLogEnabled()).thenReturn(true)
+    when(partition0.isLeader).thenReturn(true)
+    when(replicaManager.onlinePartition(tp0)).thenReturn(Some(partition0))
+
+    val tp1 = new TopicPartition(topic, 1)
+    val log1: UnifiedLog = mock(classOf[UnifiedLog])
+    val partition1: Partition = mock(classOf[Partition])
+    when(log1.topicPartition).thenReturn(tp1)
+    when(log1.remoteLogEnabled()).thenReturn(true)
+    when(partition1.isLeader).thenReturn(false)
+    when(replicaManager.onlinePartition(tp1)).thenReturn(Some(partition1))
+
+    val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), 
followerPartitionsArg.capture(), any())
+
+    val isRemoteLogEnabledBeforeUpdate = false
+    val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, None)
+    configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0, log1), 
isRemoteLogEnabledBeforeUpdate)
+    assertEquals(Collections.singleton(partition0), 
leaderPartitionsArg.getValue)
+    assertEquals(Collections.singleton(partition1), 
followerPartitionsArg.getValue)
+  }
+
+  @Test
+  def testEnableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = {
+    val topic = "test-topic"
+    val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+
+    val log0: UnifiedLog = mock(classOf[UnifiedLog])
+    when(log0.remoteLogEnabled()).thenReturn(true)
+    doNothing().when(rlm).onLeadershipChange(any(), any(), any())
+
+    val isRemoteLogEnabledBeforeUpdate = true
+    val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, None)
+    configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0), 
isRemoteLogEnabledBeforeUpdate)
+    verify(rlm, never()).onLeadershipChange(any(), any(), any())
+  }
 }

Reply via email to