cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1171934758
##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -16,71 +16,79 @@
*/
package kafka.zk
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering,
VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig,
ZkAdminManager}
import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient,
ZkTopicMigrationClient}
import kafka.zookeeper._
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient,
MigrationClientAuthException, MigrationClientException,
ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor,
TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code,
NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException,
NoAuthException, SessionClosedRequireAuthException}
import java.util
import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationClient {
+
val MaxBatchSize = 100
-}
-/**
- * Migration client in KRaft controller responsible for handling communication
to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use
KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something
usable by the caller.
- */
-class ZkMigrationClient(
- zkClient: KafkaZkClient,
- zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+ def apply(
+ zkClient: KafkaZkClient,
+ zkConfigEncoder: PasswordEncoder
+ ): ZkMigrationClient = {
+ val topicClient = new ZkTopicMigrationClient(zkClient)
+ val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+ val aclClient = new ZkAclMigrationClient(zkClient)
+ new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+ }
/**
* Wrap a function such that any KeeperExceptions is captured and converted
to a MigrationClientException.
* Any authentication related exception is converted to a
MigrationClientAuthException which may be treated
* differently by the caller.
*/
@throws(classOf[MigrationClientException])
- private def wrapZkException[T](fn: => T): T = {
+ def wrapZkException[T](fn: => T): T = {
try {
fn
} catch {
- case e @ (_: MigrationClientException | _: MigrationClientAuthException)
=> throw e
- case e @ (_: AuthFailedException | _: NoAuthException | _:
SessionClosedRequireAuthException) =>
+ case e@(_: MigrationClientException | _: MigrationClientAuthException)
=> throw e
Review Comment:
are these whitespace changes needed?
##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -16,71 +16,79 @@
*/
package kafka.zk
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering,
VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig,
ZkAdminManager}
import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient,
ZkTopicMigrationClient}
import kafka.zookeeper._
import org.apache.kafka.common.acl.AccessControlEntry
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient,
MigrationClientAuthException, MigrationClientException,
ZkMigrationLeadershipState}
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor,
TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code,
NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException,
NoAuthException, SessionClosedRequireAuthException}
import java.util
import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationClient {
+
val MaxBatchSize = 100
-}
-/**
- * Migration client in KRaft controller responsible for handling communication
to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use
KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something
usable by the caller.
- */
-class ZkMigrationClient(
- zkClient: KafkaZkClient,
- zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+ def apply(
+ zkClient: KafkaZkClient,
+ zkConfigEncoder: PasswordEncoder
+ ): ZkMigrationClient = {
+ val topicClient = new ZkTopicMigrationClient(zkClient)
+ val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+ val aclClient = new ZkAclMigrationClient(zkClient)
+ new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+ }
/**
* Wrap a function such that any KeeperExceptions is captured and converted
to a MigrationClientException.
* Any authentication related exception is converted to a
MigrationClientAuthException which may be treated
* differently by the caller.
*/
@throws(classOf[MigrationClientException])
- private def wrapZkException[T](fn: => T): T = {
+ def wrapZkException[T](fn: => T): T = {
try {
fn
} catch {
- case e @ (_: MigrationClientException | _: MigrationClientAuthException)
=> throw e
- case e @ (_: AuthFailedException | _: NoAuthException | _:
SessionClosedRequireAuthException) =>
+ case e@(_: MigrationClientException | _: MigrationClientAuthException)
=> throw e
+ case e@(_: AuthFailedException | _: NoAuthException | _:
SessionClosedRequireAuthException) =>
Review Comment:
is this whitespace change needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]