This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b49130b928 KAFKA-19355 Remove interBrokerListenerName from
ClusterControlManager (#19866)
8b49130b928 is described below
commit 8b49130b928553ad8ee24188b591c2d006d9a394
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Mon Jun 2 01:18:15 2025 +0800
KAFKA-19355 Remove interBrokerListenerName from ClusterControlManager
(#19866)
Following the removal of the ZK-to-KRaft migration code in commit
85bfdf4, controller-to-broker communication is now handled by the
control-plane listener (`controller.listener.names`). The
`interBrokerListenerName` parameter in `ClusterControlManager` is no
longer referenced on the controller side and can be safely removed as
dead code.
Reviewers: Lan Ding <[email protected]>, Ken Huang <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/server/ControllerServer.scala | 1 -
.../org/apache/kafka/controller/ClusterControlManager.java | 13 -------------
.../java/org/apache/kafka/controller/QuorumController.java | 8 --------
3 files changed, 22 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index ff6821b1bb8..e8427fa7e53 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -252,7 +252,6 @@ class ControllerServer(
setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
- setInterBrokerListenerName(config.interBrokerListenerName.value()).
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
setControllerPerformanceAlwaysLogThresholdMs(config.controllerPerformanceAlwaysLogThresholdMs)
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 64394b07586..14fce5ca0f3 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -92,7 +92,6 @@ public class ClusterControlManager {
private ReplicaPlacer replicaPlacer = null;
private FeatureControlManager featureControl = null;
private BrokerShutdownHandler brokerShutdownHandler = null;
- private String interBrokerListenerName = "PLAINTEXT";
private QuorumControllerMetrics metrics = null;
Builder setLogContext(LogContext logContext) {
@@ -135,10 +134,6 @@ public class ClusterControlManager {
return this;
}
- Builder setInterBrokerListenerName(String interBrokerListenerName) {
- this.interBrokerListenerName = interBrokerListenerName;
- return this;
- }
Builder setMetrics(QuorumControllerMetrics metrics) {
this.metrics = metrics;
@@ -175,7 +170,6 @@ public class ClusterControlManager {
replicaPlacer,
featureControl,
brokerShutdownHandler,
- interBrokerListenerName,
metrics
);
}
@@ -265,11 +259,6 @@ public class ClusterControlManager {
private final BrokerShutdownHandler brokerShutdownHandler;
- /**
- * The statically configured inter-broker listener name.
- */
- private final String interBrokerListenerName;
-
/**
* Maps controller IDs to controller registrations.
*/
@@ -294,7 +283,6 @@ public class ClusterControlManager {
ReplicaPlacer replicaPlacer,
FeatureControlManager featureControl,
BrokerShutdownHandler brokerShutdownHandler,
- String interBrokerListenerName,
QuorumControllerMetrics metrics
) {
this.logContext = logContext;
@@ -311,7 +299,6 @@ public class ClusterControlManager {
this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry,
0);
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokerShutdownHandler = brokerShutdownHandler;
- this.interBrokerListenerName = interBrokerListenerName;
this.metrics = metrics;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 365e9503dcb..7dee1e3cd3d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -220,7 +220,6 @@ public final class QuorumController implements Controller {
private long delegationTokenExpiryTimeMs;
private long delegationTokenExpiryCheckIntervalMs =
TimeUnit.MINUTES.toMillis(5);
private long uncleanLeaderElectionCheckIntervalMs =
TimeUnit.MINUTES.toMillis(5);
- private String interBrokerListenerName = "PLAINTEXT";
public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
@@ -381,10 +380,6 @@ public final class QuorumController implements Controller {
return this;
}
- public Builder setInterBrokerListenerName(String
interBrokerListenerName) {
- this.interBrokerListenerName = interBrokerListenerName;
- return this;
- }
public QuorumController build() throws Exception {
if (raftClient == null) {
@@ -443,7 +438,6 @@ public final class QuorumController implements Controller {
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs,
uncleanLeaderElectionCheckIntervalMs,
- interBrokerListenerName,
controllerPerformanceSamplePeriodMs,
controllerPerformanceAlwaysLogThresholdMs
);
@@ -1488,7 +1482,6 @@ public final class QuorumController implements Controller
{
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
long uncleanLeaderElectionCheckIntervalMs,
- String interBrokerListenerName,
long controllerPerformanceSamplePeriodMs,
long controllerPerformanceAlwaysLogThresholdMs
) {
@@ -1530,7 +1523,6 @@ public final class QuorumController implements Controller
{
setReplicaPlacer(replicaPlacer).
setFeatureControlManager(featureControl).
setBrokerShutdownHandler(this::handleBrokerShutdown).
- setInterBrokerListenerName(interBrokerListenerName).
setMetrics(controllerMetrics).
build();
this.configurationControl = new ConfigurationControlManager.Builder().