This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 788cc11f455 KAFKA-14462; [3/N] Add `onNewMetadataImage` to
`GroupCoordinator` interface (#13357)
788cc11f455 is described below
commit 788cc11f4554f515e4002992936c36b68c989f2d
Author: David Jacot <[email protected]>
AuthorDate: Wed Mar 8 08:52:01 2023 +0100
KAFKA-14462; [3/N] Add `onNewMetadataImage` to `GroupCoordinator` interface
(#13357)
The new group coordinator needs to access cluster metadata (e.g. topics,
partitions, etc.) and it needs a mechanism to be notified when the metadata
changes (e.g. to trigger a rebalance). In KRaft clusters, the easiest is to
subscribe to metadata changes via the MetadataPublisher.
Reviewers: Justine Olshan <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../group/GroupCoordinatorAdapter.scala | 8 ++++
.../server/metadata/BrokerMetadataPublisher.scala | 43 +++++++++++--------
.../metadata/BrokerMetadataPublisherTest.scala | 50 ++++++++++++++++++++--
.../kafka/coordinator/group/GroupCoordinator.java | 13 ++++++
6 files changed, 95 insertions(+), 21 deletions(-)
diff --git a/build.gradle b/build.gradle
index f7a071680b8..ac37efb6cc2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1231,6 +1231,7 @@ project(':group-coordinator') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
+ implementation project(':metadata')
implementation libs.slf4jApi
testImplementation project(':clients').sourceSets.test.output
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b4f679b004b..9e6b33ab9a6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -345,6 +345,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.util"/>
</subpackage>
</subpackage>
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index e49ca2dc5cc..3211c160a07 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext,
TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.util.FutureUtils
import java.util
@@ -580,6 +581,13 @@ private[group] class GroupCoordinatorAdapter(
coordinator.onResignation(groupMetadataPartitionIndex,
groupMetadataPartitionLeaderEpoch)
}
+ override def onNewMetadataImage(
+ newImage: MetadataImage,
+ delta: MetadataDelta
+ ): Unit = {
+ // The metadata image is not used in the old group coordinator.
+ }
+
override def groupMetadataTopicConfigs(): Properties = {
coordinator.offsetsTopicConfigs
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index e7a81f1d1dc..38c3d1a8e13 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -96,7 +96,7 @@ object BrokerMetadataPublisher extends Logging {
}
class BrokerMetadataPublisher(
- conf: KafkaConfig,
+ config: KafkaConfig,
metadataCache: KRaftMetadataCache,
logManager: LogManager,
replicaManager: ReplicaManager,
@@ -109,14 +109,14 @@ class BrokerMetadataPublisher(
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler
) extends MetadataPublisher with Logging {
- logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
+ logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] "
import BrokerMetadataPublisher._
/**
* The broker ID.
*/
- val brokerId: Int = conf.nodeId
+ val brokerId: Int = config.nodeId
/**
* True if this is the first time we have published metadata.
@@ -169,7 +169,7 @@ class BrokerMetadataPublisher(
replicaManager.applyDelta(topicsDelta, newImage)
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error applying topics " +
- s"delta in ${deltaName}", t)
+ s"delta in $deltaName", t)
}
try {
// Update the group coordinator of local changes
@@ -181,7 +181,7 @@ class BrokerMetadataPublisher(
)
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating group " +
- s"coordinator with local changes in ${deltaName}", t)
+ s"coordinator with local changes in $deltaName", t)
}
try {
// Update the transaction coordinator of local changes
@@ -192,7 +192,7 @@ class BrokerMetadataPublisher(
txnCoordinator.onResignation)
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating txn " +
- s"coordinator with local changes in ${deltaName}", t)
+ s"coordinator with local changes in $deltaName", t)
}
try {
// Notify the group coordinator about deleted topics.
@@ -208,7 +208,7 @@ class BrokerMetadataPublisher(
}
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating group " +
- s"coordinator with deleted partitions in ${deltaName}", t)
+ s"coordinator with deleted partitions in $deltaName", t)
}
}
@@ -222,7 +222,7 @@ class BrokerMetadataPublisher(
}
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error
updating client " +
- s"quotas in ${deltaName}", t)
+ s"quotas in $deltaName", t)
}
// Apply changes to SCRAM credentials.
@@ -246,7 +246,7 @@ class BrokerMetadataPublisher(
// if the user created a DENY ALL acl and then created an ALLOW ACL for
topic foo,
// we want to apply those changes in that order, not the reverse order!
Otherwise
// there could be a window during which incorrect authorization results
are returned.
- Option(delta.aclsDelta()).foreach( aclsDelta =>
+ Option(delta.aclsDelta()).foreach { aclsDelta =>
_authorizer match {
case Some(authorizer: ClusterMetadataAuthorizer) => if
(aclsDelta.isSnapshotDelta) {
try {
@@ -257,7 +257,7 @@ class BrokerMetadataPublisher(
authorizer.loadSnapshot(newImage.acls().acls())
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error loading " +
- s"authorizer snapshot in ${deltaName}", t)
+ s"authorizer snapshot in $deltaName", t)
}
} else {
try {
@@ -271,11 +271,20 @@ class BrokerMetadataPublisher(
})
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error loading " +
- s"authorizer changes in ${deltaName}", t)
+ s"authorizer changes in $deltaName", t)
}
}
case _ => // No ClusterMetadataAuthorizer is configured. There is
nothing to do.
- })
+ }
+ }
+
+ try {
+ // Propagate the new image to the group coordinator.
+ groupCoordinator.onNewMetadataImage(newImage, delta)
+ } catch {
+ case t: Throwable => metadataPublishingFaultHandler.handleFault("Error
updating group " +
+ s"coordinator with local changes in $deltaName", t)
+ }
if (_firstPublish) {
finishInitializingReplicaManager(newImage)
@@ -283,7 +292,7 @@ class BrokerMetadataPublisher(
publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
- s"publishing broker metadata from ${deltaName}", t)
+ s"publishing broker metadata from $deltaName", t)
} finally {
_firstPublish = false
}
@@ -299,7 +308,7 @@ class BrokerMetadataPublisher(
override def publishedOffset: Long = publishedOffsetAtomic.get()
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
- conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+ config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}
/**
@@ -356,7 +365,7 @@ class BrokerMetadataPublisher(
// Make the LogCleaner available for reconfiguration. We can't do this
prior to this
// point because LogManager#startup creates the LogCleaner object, if
// log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
-
Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
+
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting
LogManager", t)
}
@@ -369,14 +378,14 @@ class BrokerMetadataPublisher(
try {
// Start the group coordinator.
groupCoordinator.startup(() =>
metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
- .getOrElse(conf.offsetsTopicPartitions))
+ .getOrElse(config.offsetsTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting
GroupCoordinator", t)
}
try {
// Start the transaction coordinator.
txnCoordinator.startup(() => metadataCache.numPartitions(
-
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
+
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions))
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error starting
TransactionCoordinator", t)
}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 317da428888..f522d11d532 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,11 +17,14 @@
package kafka.server.metadata
+import kafka.coordinator.transaction.TransactionCoordinator
+
import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import kafka.log.UnifiedLog
-import kafka.server.{BrokerServer, KafkaConfig}
+import kafka.log.{LogManager, UnifiedLog}
+import kafka.security.CredentialProvider
+import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -30,7 +33,8 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
+import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest, TopicImage, TopicsImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.server.fault.FaultHandler
@@ -38,7 +42,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals,
assertNotNull, assertTrue
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
-import org.mockito.Mockito.doThrow
+import org.mockito.Mockito.{doThrow, mock, verify}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -263,4 +267,42 @@ class BrokerMetadataPublisherTest {
cluster.close()
}
}
+
+ @Test
+ def testNewImagePushedToGroupCoordinator(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
+ val metadataCache = new KRaftMetadataCache(0)
+ val logManager = mock(classOf[LogManager])
+ val replicaManager = mock(classOf[ReplicaManager])
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val txnCoordinator = mock(classOf[TransactionCoordinator])
+ val quotaManager = mock(classOf[ClientQuotaMetadataManager])
+ val configPublisher = mock(classOf[DynamicConfigPublisher])
+ val credentialProvider = mock(classOf[CredentialProvider])
+ val faultHandler = mock(classOf[FaultHandler])
+
+ val metadataPublisher = new BrokerMetadataPublisher(
+ config,
+ metadataCache,
+ logManager,
+ replicaManager,
+ groupCoordinator,
+ txnCoordinator,
+ quotaManager,
+ configPublisher,
+ None,
+ credentialProvider,
+ faultHandler,
+ faultHandler
+ )
+
+ val image = MetadataImage.EMPTY
+ val delta = new MetadataDelta.Builder()
+ .setImage(image)
+ .build()
+
+ metadataPublisher.publish(delta, image)
+
+ verify(groupCoordinator).onNewMetadataImage(image, delta)
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index f448346a116..643bbdadb4d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -42,6 +42,8 @@ import
org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import java.util.List;
import java.util.OptionalInt;
@@ -300,6 +302,17 @@ public interface GroupCoordinator {
OptionalInt groupMetadataPartitionLeaderEpoch
);
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The metadata delta.
+ */
+ void onNewMetadataImage(
+ MetadataImage newImage,
+ MetadataDelta delta
+ );
+
/**
* Return the configuration properties of the internal group
* metadata topic.