mumrah commented on code in PR #13028:
URL: https://github.com/apache/kafka/pull/13028#discussion_r1053840330


##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -313,35 +313,76 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(config: KafkaConfig,
-                                   controllerChannelManager: 
ControllerChannelManager,
-                                   controllerEventManager: 
ControllerEventManager,
-                                   controllerContext: ControllerContext,
-                                   stateChangeLogger: StateChangeLogger)
-  extends AbstractControllerBrokerRequestBatch(config, controllerContext, 
stateChangeLogger) {
+class ControllerBrokerRequestBatch(
+  config: KafkaConfig,
+  controllerChannelManager: ControllerChannelManager,
+  controllerEventManager: ControllerEventManager,
+  controllerContext: ControllerContext,
+  stateChangeLogger: StateChangeLogger
+) extends AbstractControllerBrokerRequestBatch(
+  config,
+  () => controllerContext,
+  () => config.interBrokerProtocolVersion,
+  stateChangeLogger
+) {
 
   def sendEvent(event: ControllerEvent): Unit = {
     controllerEventManager.put(event)
   }
-
   def sendRequest(brokerId: Int,
                   request: AbstractControlRequest.Builder[_ <: 
AbstractControlRequest],
                   callback: AbstractResponse => Unit = null): Unit = {
     controllerChannelManager.sendRequest(brokerId, request, callback)
   }
 
+  override def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, 
broker: Int): Unit = {
+    sendEvent(LeaderAndIsrResponseReceived(response, broker))
+  }
+
+  override def handleUpdateMetadataResponse(response: UpdateMetadataResponse, 
broker: Int): Unit = {
+    sendEvent(UpdateMetadataResponseReceived(response, broker))
+  }
+
+  override def handleStopReplicaResponse(stopReplicaResponse: 
StopReplicaResponse, brokerId: Int,
+                                         partitionErrorsForDeletingTopics: 
Map[TopicPartition, Errors]): Unit = {
+    if (partitionErrorsForDeletingTopics.nonEmpty)
+      sendEvent(TopicDeletionStopReplicaResponseReceived(brokerId, 
stopReplicaResponse.error, partitionErrorsForDeletingTopics))
+  }
+}
+
+abstract class ControllerBrokerRequestMetadata {
+  def isTopicDeletionInProgress(topicName: String): Boolean
+
+  def topicIds: collection.Map[String, Uuid]
+
+  def liveBrokerIdAndEpochs: collection.Map[Int, Long]
+
+  def liveOrShuttingDownBrokers: collection.Set[Broker]
+
+  def isTopicQueuedUpForDeletion(topic: String): Boolean
+
+  def isReplicaOnline(brokerId: Int, partition: TopicPartition): Boolean
+
+  def partitionReplicaAssignment(partition: TopicPartition): 
collection.Seq[Int]
+
+  def leaderEpoch(topicPartition: TopicPartition): Int
+
+  def liveOrShuttingDownBrokerIds: collection.Set[Int]
+
+  def partitionLeadershipInfo(topicPartition: TopicPartition): 
Option[LeaderIsrAndControllerEpoch]
 }
 
 abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,

Review Comment:
   It might be worthwhile to document the arguments now that we have a few that 
vary between KRaft and ZK modes.



##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -427,51 +464,64 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   /** Send UpdateMetadataRequest to the given brokers for the given partitions 
and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
                                          partitions: 
collection.Set[TopicPartition]): Unit = {
-
-    def updateMetadataRequestPartitionInfo(partition: TopicPartition, 
beingDeleted: Boolean): Unit = {
-      controllerContext.partitionLeadershipInfo(partition) match {
+    updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
+    partitions.foreach { partition =>
+      val beingDeleted = 
metadataInstance.isTopicQueuedUpForDeletion(partition.topic())
+      metadataInstance.partitionLeadershipInfo(partition) match {
         case Some(LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) 
=>
-          val replicas = 
controllerContext.partitionReplicaAssignment(partition)
-          val offlineReplicas = 
replicas.filterNot(controllerContext.isReplicaOnline(_, partition))
-          val updatedLeaderAndIsr =
-            if (beingDeleted) LeaderAndIsr.duringDelete(leaderAndIsr.isr)
-            else leaderAndIsr
-
-          val partitionStateInfo = new UpdateMetadataPartitionState()
-            .setTopicName(partition.topic)
-            .setPartitionIndex(partition.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(updatedLeaderAndIsr.leader)
-            .setLeaderEpoch(updatedLeaderAndIsr.leaderEpoch)
-            .setIsr(updatedLeaderAndIsr.isr.map(Integer.valueOf).asJava)
-            .setZkVersion(updatedLeaderAndIsr.partitionEpoch)
-            .setReplicas(replicas.map(Integer.valueOf).asJava)
-            .setOfflineReplicas(offlineReplicas.map(Integer.valueOf).asJava)
-          updateMetadataRequestPartitionInfoMap.put(partition, 
partitionStateInfo)
-
+          val updatedLeaderAndIsr = if (beingDeleted) 
LeaderAndIsr.duringDelete(leaderAndIsr.isr) else leaderAndIsr
+          val replicas = metadataInstance.partitionReplicaAssignment(partition)
+          val offlineReplicas = 
replicas.filter(!metadataInstance.isReplicaOnline(_, partition))

Review Comment:
   Did these need to get reordered? Seems like the only difference here is 
"controllerContext" -> "metadataInstance"



##########
metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java:
##########
@@ -56,6 +58,15 @@ public BrokerRegistration broker(int nodeId) {
         return image.broker(nodeId);
     }
 
+    public Set<Integer> liveZkBrokerIdChanges() {
+        return changedBrokers
+            .values()
+            .stream()
+            .filter(broker -> broker.isPresent() && 
broker.get().isMigratingZkBroker())

Review Comment:
   We can do `filter(Optional::isPresent)` followed by `map(Optional::get)` to 
make this a little neater (after we drop java 8 support, we'll be able to use 
Stream.flatMap with Optional.stream)



##########
metadata/src/main/java/org/apache/kafka/image/ClusterImage.java:
##########
@@ -34,9 +34,15 @@ public final class ClusterImage {
     public static final ClusterImage EMPTY = new 
ClusterImage(Collections.emptyMap());
 
     private final Map<Integer, BrokerRegistration> brokers;
+    private final Map<Integer, BrokerRegistration> zkBrokers;
 
     public ClusterImage(Map<Integer, BrokerRegistration> brokers) {
         this.brokers = Collections.unmodifiableMap(brokers);
+        this.zkBrokers = Collections.unmodifiableMap(brokers

Review Comment:
   Generally, we try not to store the same metadata twice in-memory. Can we 
just use an accessor that filters for ZK brokers rather than storing them here?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/BrokersRpcClient.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+
+public interface BrokersRpcClient {
+
+    void startup();
+
+    void shutdown();
+
+    void publishMetadata(MetadataImage image);
+
+    void sendRPCsToBrokersFromMetadataDelta(MetadataDelta delta,
+                                            MetadataImage image,
+                                            int controllerEpoch);

Review Comment:
   Let's name this `zkControllerEpoch` to avoid confusion with the KRaft 
controller epoch. Same for the method below.



##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -206,6 +206,9 @@ public void replay(ApiMessage record) {
                  * updating the highest offset and epoch.
                  */
                 break;
+            case ZK_MIGRATION_STATE_RECORD:
+                // TODO handle this
+                break;

Review Comment:
   Do we need this for this PR?



##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -313,35 +313,76 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(config: KafkaConfig,
-                                   controllerChannelManager: 
ControllerChannelManager,
-                                   controllerEventManager: 
ControllerEventManager,
-                                   controllerContext: ControllerContext,
-                                   stateChangeLogger: StateChangeLogger)
-  extends AbstractControllerBrokerRequestBatch(config, controllerContext, 
stateChangeLogger) {
+class ControllerBrokerRequestBatch(
+  config: KafkaConfig,
+  controllerChannelManager: ControllerChannelManager,
+  controllerEventManager: ControllerEventManager,
+  controllerContext: ControllerContext,
+  stateChangeLogger: StateChangeLogger
+) extends AbstractControllerBrokerRequestBatch(
+  config,
+  () => controllerContext,
+  () => config.interBrokerProtocolVersion,
+  stateChangeLogger
+) {
 
   def sendEvent(event: ControllerEvent): Unit = {
     controllerEventManager.put(event)
   }
-
   def sendRequest(brokerId: Int,
                   request: AbstractControlRequest.Builder[_ <: 
AbstractControlRequest],
                   callback: AbstractResponse => Unit = null): Unit = {
     controllerChannelManager.sendRequest(brokerId, request, callback)
   }
 
+  override def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, 
broker: Int): Unit = {
+    sendEvent(LeaderAndIsrResponseReceived(response, broker))
+  }
+
+  override def handleUpdateMetadataResponse(response: UpdateMetadataResponse, 
broker: Int): Unit = {
+    sendEvent(UpdateMetadataResponseReceived(response, broker))
+  }
+
+  override def handleStopReplicaResponse(stopReplicaResponse: 
StopReplicaResponse, brokerId: Int,
+                                         partitionErrorsForDeletingTopics: 
Map[TopicPartition, Errors]): Unit = {
+    if (partitionErrorsForDeletingTopics.nonEmpty)
+      sendEvent(TopicDeletionStopReplicaResponseReceived(brokerId, 
stopReplicaResponse.error, partitionErrorsForDeletingTopics))
+  }
+}
+
+abstract class ControllerBrokerRequestMetadata {

Review Comment:
   Few things with this one ^
   
   * Can it be a trait? 
   * Can we avoid "Metadata" in the name? Maybe ControllerBrokerRequestContext?
   * Can we define it in a separate file?



##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -359,13 +400,15 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
       throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
         s"new one. Some UpdateMetadata state changes to brokers 
$updateMetadataRequestBrokerSet with partition info " +
         s"$updateMetadataRequestPartitionInfoMap might be lost ")
+    metadataInstance = metadataProvider()

Review Comment:
   For ZK, the provider always just returns the (one and only) 
ControllerContext, right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to