This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 788cc11f455 KAFKA-14462; [3/N] Add `onNewMetadataImage` to 
`GroupCoordinator` interface (#13357)
788cc11f455 is described below

commit 788cc11f4554f515e4002992936c36b68c989f2d
Author: David Jacot <[email protected]>
AuthorDate: Wed Mar 8 08:52:01 2023 +0100

    KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface 
(#13357)
    
    The new group coordinator needs to access cluster metadata (e.g. topics, 
partitions, etc.) and it needs a mechanism to be notified when the metadata 
changes (e.g. to trigger a rebalance). In KRaft clusters, the easiest is to 
subscribe to metadata changes via the MetadataPublisher.
    
    Reviewers: Justine Olshan <[email protected]>
---
 build.gradle                                       |  1 +
 checkstyle/import-control.xml                      |  1 +
 .../group/GroupCoordinatorAdapter.scala            |  8 ++++
 .../server/metadata/BrokerMetadataPublisher.scala  | 43 +++++++++++--------
 .../metadata/BrokerMetadataPublisherTest.scala     | 50 ++++++++++++++++++++--
 .../kafka/coordinator/group/GroupCoordinator.java  | 13 ++++++
 6 files changed, 95 insertions(+), 21 deletions(-)

diff --git a/build.gradle b/build.gradle
index f7a071680b8..ac37efb6cc2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1231,6 +1231,7 @@ project(':group-coordinator') {
   dependencies {
     implementation project(':server-common')
     implementation project(':clients')
+    implementation project(':metadata')
     implementation libs.slf4jApi
 
     testImplementation project(':clients').sourceSets.test.output
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b4f679b004b..9e6b33ab9a6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -345,6 +345,7 @@
       <allow pkg="org.apache.kafka.common.message" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.requests" />
+      <allow pkg="org.apache.kafka.image"/>
       <allow pkg="org.apache.kafka.server.util"/>
     </subpackage>
   </subpackage>
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index e49ca2dc5cc..3211c160a07 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, 
TransactionResult}
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.server.util.FutureUtils
 
 import java.util
@@ -580,6 +581,13 @@ private[group] class GroupCoordinatorAdapter(
     coordinator.onResignation(groupMetadataPartitionIndex, 
groupMetadataPartitionLeaderEpoch)
   }
 
+  override def onNewMetadataImage(
+    newImage: MetadataImage,
+    delta: MetadataDelta
+  ): Unit = {
+    // The metadata image is not used in the old group coordinator.
+  }
+
   override def groupMetadataTopicConfigs(): Properties = {
     coordinator.offsetsTopicConfigs
   }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e7a81f1d1dc..38c3d1a8e13 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -96,7 +96,7 @@ object BrokerMetadataPublisher extends Logging {
 }
 
 class BrokerMetadataPublisher(
-  conf: KafkaConfig,
+  config: KafkaConfig,
   metadataCache: KRaftMetadataCache,
   logManager: LogManager,
   replicaManager: ReplicaManager,
@@ -109,14 +109,14 @@ class BrokerMetadataPublisher(
   fatalFaultHandler: FaultHandler,
   metadataPublishingFaultHandler: FaultHandler
 ) extends MetadataPublisher with Logging {
-  logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
+  logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
 
   import BrokerMetadataPublisher._
 
   /**
    * The broker ID.
    */
-  val brokerId: Int = conf.nodeId
+  val brokerId: Int = config.nodeId
 
   /**
    * True if this is the first time we have published metadata.
@@ -169,7 +169,7 @@ class BrokerMetadataPublisher(
           replicaManager.applyDelta(topicsDelta, newImage)
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error applying topics " +
-            s"delta in ${deltaName}", t)
+            s"delta in $deltaName", t)
         }
         try {
           // Update the group coordinator of local changes
@@ -181,7 +181,7 @@ class BrokerMetadataPublisher(
           )
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
-            s"coordinator with local changes in ${deltaName}", t)
+            s"coordinator with local changes in $deltaName", t)
         }
         try {
           // Update the transaction coordinator of local changes
@@ -192,7 +192,7 @@ class BrokerMetadataPublisher(
             txnCoordinator.onResignation)
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating txn " +
-            s"coordinator with local changes in ${deltaName}", t)
+            s"coordinator with local changes in $deltaName", t)
         }
         try {
           // Notify the group coordinator about deleted topics.
@@ -208,7 +208,7 @@ class BrokerMetadataPublisher(
           }
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
-            s"coordinator with deleted partitions in ${deltaName}", t)
+            s"coordinator with deleted partitions in $deltaName", t)
         }
       }
 
@@ -222,7 +222,7 @@ class BrokerMetadataPublisher(
         }
       } catch {
         case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
updating client " +
-          s"quotas in ${deltaName}", t)
+          s"quotas in $deltaName", t)
       }
 
       // Apply changes to SCRAM credentials.
@@ -246,7 +246,7 @@ class BrokerMetadataPublisher(
       // if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
       // we want to apply those changes in that order, not the reverse order! 
Otherwise
       // there could be a window during which incorrect authorization results 
are returned.
-      Option(delta.aclsDelta()).foreach( aclsDelta =>
+      Option(delta.aclsDelta()).foreach { aclsDelta =>
         _authorizer match {
           case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {
             try {
@@ -257,7 +257,7 @@ class BrokerMetadataPublisher(
               authorizer.loadSnapshot(newImage.acls().acls())
             } catch {
               case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
-                s"authorizer snapshot in ${deltaName}", t)
+                s"authorizer snapshot in $deltaName", t)
             }
           } else {
             try {
@@ -271,11 +271,20 @@ class BrokerMetadataPublisher(
                 })
             } catch {
               case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
-                s"authorizer changes in ${deltaName}", t)
+                s"authorizer changes in $deltaName", t)
             }
           }
           case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
-        })
+        }
+      }
+
+      try {
+        // Propagate the new image to the group coordinator.
+        groupCoordinator.onNewMetadataImage(newImage, delta)
+      } catch {
+        case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
updating group " +
+          s"coordinator with local changes in $deltaName", t)
+      }
 
       if (_firstPublish) {
         finishInitializingReplicaManager(newImage)
@@ -283,7 +292,7 @@ class BrokerMetadataPublisher(
       publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
     } catch {
       case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
-        s"publishing broker metadata from ${deltaName}", t)
+        s"publishing broker metadata from $deltaName", t)
     } finally {
       _firstPublish = false
     }
@@ -299,7 +308,7 @@ class BrokerMetadataPublisher(
   override def publishedOffset: Long = publishedOffsetAtomic.get()
 
   def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
-    conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+    config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
   }
 
   /**
@@ -356,7 +365,7 @@ class BrokerMetadataPublisher(
       // Make the LogCleaner available for reconfiguration. We can't do this 
prior to this
       // point because LogManager#startup creates the LogCleaner object, if
       // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
-      
Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
+      
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
LogManager", t)
     }
@@ -369,14 +378,14 @@ class BrokerMetadataPublisher(
     try {
       // Start the group coordinator.
       groupCoordinator.startup(() => 
metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
-        .getOrElse(conf.offsetsTopicPartitions))
+        .getOrElse(config.offsetsTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
GroupCoordinator", t)
     }
     try {
       // Start the transaction coordinator.
       txnCoordinator.startup(() => metadataCache.numPartitions(
-        
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
+        
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
     } catch {
       case t: Throwable => fatalFaultHandler.handleFault("Error starting 
TransactionCoordinator", t)
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 317da428888..f522d11d532 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,11 +17,14 @@
 
 package kafka.server.metadata
 
+import kafka.coordinator.transaction.TransactionCoordinator
+
 import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import kafka.log.UnifiedLog
-import kafka.server.{BrokerServer, KafkaConfig}
+import kafka.log.{LogManager, UnifiedLog}
+import kafka.security.CredentialProvider
+import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -30,7 +33,8 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
+import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, TopicImage, TopicsImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
 import org.apache.kafka.server.fault.FaultHandler
@@ -38,7 +42,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertNotNull, assertTrue
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
-import org.mockito.Mockito.doThrow
+import org.mockito.Mockito.{doThrow, mock, verify}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -263,4 +267,42 @@ class BrokerMetadataPublisherTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testNewImagePushedToGroupCoordinator(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
+    val metadataCache = new KRaftMetadataCache(0)
+    val logManager = mock(classOf[LogManager])
+    val replicaManager = mock(classOf[ReplicaManager])
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val txnCoordinator = mock(classOf[TransactionCoordinator])
+    val quotaManager = mock(classOf[ClientQuotaMetadataManager])
+    val configPublisher = mock(classOf[DynamicConfigPublisher])
+    val credentialProvider = mock(classOf[CredentialProvider])
+    val faultHandler = mock(classOf[FaultHandler])
+
+    val metadataPublisher = new BrokerMetadataPublisher(
+      config,
+      metadataCache,
+      logManager,
+      replicaManager,
+      groupCoordinator,
+      txnCoordinator,
+      quotaManager,
+      configPublisher,
+      None,
+      credentialProvider,
+      faultHandler,
+      faultHandler
+    )
+
+    val image = MetadataImage.EMPTY
+    val delta = new MetadataDelta.Builder()
+      .setImage(image)
+      .build()
+
+    metadataPublisher.publish(delta, image)
+
+    verify(groupCoordinator).onNewMetadataImage(image, delta)
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index f448346a116..643bbdadb4d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -42,6 +42,8 @@ import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
 
 import java.util.List;
 import java.util.OptionalInt;
@@ -300,6 +302,17 @@ public interface GroupCoordinator {
         OptionalInt groupMetadataPartitionLeaderEpoch
     );
 
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The metadata delta.
+     */
+    void onNewMetadataImage(
+        MetadataImage newImage,
+        MetadataDelta delta
+    );
+
     /**
      * Return the configuration properties of the internal group
      * metadata topic.

Reply via email to