[ 
https://issues.apache.org/jira/browse/KAFKA-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-15670:
------------------------------
    Description: 
During ZK migrating to KRaft, before entering dual-write mode, the KRaft 
controller will send RPCs (i.e. UpdateMetadataRequest, LeaderAndIsrRequest, and 
StopReplicaRequest) to the brokers. Currently, we use the inter broker listener 
to send the RPC to brokers from the controller. Although these RPCs are used 
for ZK brokers, in our case, the sender is actually KRaft controller. In KRaft 
mode, the controller should talk with brokers via `controller.listener.names`, 
not `inter.broker.listener.names`. 

This issue can also be fixed by adding `inter.broker.listener.names` config in 
KRaft controller configuration file, but it would be surprised that the KRaft 
controller config should contain `inter.broker.listener.names`.

 
The error while sending RPCs to brokers while the broker doesn't contain 
PLAINTEXT listener.
{code:java}
[2023-10-23 17:12:36,788] ERROR Encountered zk migration fault: Unhandled error 
in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.LoggingFaultHandler)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
PLAINTEXT not found for broker 0
        at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
        at scala.Option.getOrElse(Option.scala:201)
        at kafka.cluster.Broker.node(Broker.scala:93)
        at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
        at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
        at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:97)
        at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:97)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:168)
        at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:97)
        at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:217)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:723)

{code}

  was:
During ZK migrating to KRaft, before entering dual-write mode, the KRaft 
controller will send RPCs (i.e. UpdateMetadataRequest, LeaderAndIsrRequest, and 
StopReplicaRequest) to the brokers. Currently, we use the inter broker listener 
to send the RPC to brokers from the controller. Although these RPCs are used 
for ZK brokers, in our case, the sender is actually KRaft controller. In KRaft 
mode, the controller should talk with brokers via `controller.listener.names`, 
not `inter.broker.listener.names`. It would be surprised that the KRaft 
controller config should contain `inter.broker.listener.names`.

 
{code:java}
[2023-10-23 17:12:36,788] ERROR Encountered zk migration fault: Unhandled error 
in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.LoggingFaultHandler)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
PLAINTEXT not found for broker 0
        at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
        at scala.Option.getOrElse(Option.scala:201)
        at kafka.cluster.Broker.node(Broker.scala:93)
        at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
        at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
        at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:97)
        at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:97)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:168)
        at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:97)
        at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:217)
        at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:723)

{code}


> KRaft controller using wrong listener to send RPC to brokers in dual-write 
> mode 
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-15670
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15670
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.6.0
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Major
>
> During ZK migrating to KRaft, before entering dual-write mode, the KRaft 
> controller will send RPCs (i.e. UpdateMetadataRequest, LeaderAndIsrRequest, 
> and StopReplicaRequest) to the brokers. Currently, we use the inter broker 
> listener to send the RPC to brokers from the controller. Although these RPCs 
> are used for ZK brokers, in our case, the sender is actually KRaft 
> controller. In KRaft mode, the controller should talk with brokers via 
> `controller.listener.names`, not `inter.broker.listener.names`. 
> This issue can also be fixed by adding `inter.broker.listener.names` config 
> in KRaft controller configuration file, but it would be surprised that the 
> KRaft controller config should contain `inter.broker.listener.names`.
>  
> The error while sending RPCs to brokers while the broker doesn't contain 
> PLAINTEXT listener.
> {code:java}
> [2023-10-23 17:12:36,788] ERROR Encountered zk migration fault: Unhandled 
> error in SendRPCsToBrokersEvent 
> (org.apache.kafka.server.fault.LoggingFaultHandler)
> kafka.common.BrokerEndPointNotAvailableException: End point with listener 
> name PLAINTEXT not found for broker 0
>       at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
>       at scala.Option.getOrElse(Option.scala:201)
>       at kafka.cluster.Broker.node(Broker.scala:93)
>       at 
> kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
>       at 
> kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
>       at 
> kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:97)
>       at 
> kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:97)
>       at scala.collection.immutable.Set$Set1.foreach(Set.scala:168)
>       at 
> kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:97)
>       at 
> kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:217)
>       at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:723)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to