This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 130bf1054b0 MINOR: some minor cleanups in the quorum controller.
(#17819)
130bf1054b0 is described below
commit 130bf1054b037d3fbc941dbf9e6d1276a8edfbad
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Mon Nov 18 11:15:38 2024 -0800
MINOR: some minor cleanups in the quorum controller. (#17819)
BrokerHeartbeatManager.java: fix an outdated comment.
Move an inefficient test method that is O(num_brokers) from
ClusterControlManager.java into ReplicationControlManagerTest.java, so that it
doesn't accidentally get used in production code.
Remove QuorumController.ImbalanceSchedule, etc. since it is no longer used.
Move the initialization of OffsetControlManager later in the
QuorumController constructor and add a comment explaining why it should come
last. This doesn't fix any bugs currently, but it's a good practice for the
future.
Reviewers: Mickael Maison <[email protected]>
---
.../kafka/controller/BrokerHeartbeatManager.java | 3 +-
.../kafka/controller/ClusterControlManager.java | 9 -----
.../apache/kafka/controller/QuorumController.java | 43 ++++++----------------
.../controller/ReplicationControlManagerTest.java | 13 ++++++-
4 files changed, 24 insertions(+), 44 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index e63170ca5bf..2762d36f487 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -75,8 +75,7 @@ public class BrokerHeartbeatManager {
/**
* The offset at which the broker should complete its controlled
shutdown, or -1
- * if the broker is not performing a controlled shutdown. When this
field is
- * updated, we also have to update the broker's position in the
shuttingDown set.
+ * if the broker is not performing a controlled shutdown.
*/
private long controlledShutdownOffset;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index c583906d4ee..580967d97a5 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -70,7 +70,6 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -327,14 +326,6 @@ public class ClusterControlManager {
return brokerRegistrations;
}
- Set<Integer> fencedBrokerIds() {
- return brokerRegistrations.values()
- .stream()
- .filter(BrokerRegistration::fenced)
- .map(BrokerRegistration::id)
- .collect(Collectors.toSet());
- }
-
/**
* Process an incoming broker registration request.
*/
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index a541be68f35..71e43b55b25 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1431,35 +1431,14 @@ public final class QuorumController implements
Controller {
*/
private volatile int curClaimEpoch;
- /**
- * How long to delay partition leader balancing operations.
- */
- private final OptionalLong leaderImbalanceCheckIntervalNs;
-
- private enum ImbalanceSchedule {
- // The leader balancing operation has been scheduled
- SCHEDULED,
- // If the leader balancing operation should be scheduled, schedule it
with a delay
- DEFERRED,
- // If the leader balancing operation should be scheduled, schedule it
immediately
- IMMEDIATELY
- }
-
- /**
- * Tracks the scheduling state for partition leader balancing operations.
- */
- private final ImbalanceSchedule imbalancedScheduled =
ImbalanceSchedule.DEFERRED;
-
- /**
- * Tracks the scheduling state for unclean leader election operations.
- */
- private final ImbalanceSchedule uncleanScheduled =
ImbalanceSchedule.DEFERRED;
-
/**
* The bootstrap metadata to use for initialization if needed.
*/
private final BootstrapMetadata bootstrapMetadata;
+ /**
+ * True if the KIP-966 eligible leader replicas feature is enabled.
+ */
private final boolean eligibleLeaderReplicasEnabled;
/**
@@ -1515,12 +1494,6 @@ public final class QuorumController implements
Controller {
this.controllerMetrics = controllerMetrics;
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.deferredEventQueue = new DeferredEventQueue(logContext);
- this.offsetControl = new OffsetControlManager.Builder().
- setLogContext(logContext).
- setSnapshotRegistry(snapshotRegistry).
- setMetrics(controllerMetrics).
- setTime(time).
- build();
this.resourceExists = new ConfigResourceExistenceChecker();
this.configurationControl = new ConfigurationControlManager.Builder().
setLogContext(logContext).
@@ -1571,7 +1544,6 @@ public final class QuorumController implements Controller
{
setSnapshotRegistry(snapshotRegistry).
setClusterControlManager(clusterControl).
build();
- this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
@@ -1619,6 +1591,15 @@ public final class QuorumController implements
Controller {
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));
+ // OffsetControlManager must be initialized last, because its
constructor will take the
+ // initial in-memory snapshot of all extant timeline data structures.
+ this.offsetControl = new OffsetControlManager.Builder().
+ setLogContext(logContext).
+ setSnapshotRegistry(snapshotRegistry).
+ setMetrics(controllerMetrics).
+ setTime(time).
+ build();
+
log.info("Creating new QuorumController with clusterId {}.{}",
clusterId,
eligibleLeaderReplicasEnabled ? " Eligible leader replicas
enabled." : "");
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 194c295de28..c53f068e9d6 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -501,7 +501,7 @@ public class ReplicationControlManagerTest {
replay(fenceResult.records());
} while (fenceResult.response().booleanValue());
- assertEquals(brokerIds, clusterControl.fencedBrokerIds());
+ assertEquals(brokerIds, fencedBrokerIds());
}
long currentBrokerEpoch(int brokerId) {
@@ -525,6 +525,15 @@ public class ReplicationControlManagerTest {
replay(result.records());
return result;
}
+
+ Set<Integer> fencedBrokerIds() {
+ return clusterControl.brokerRegistrations().values()
+ .stream()
+ .filter(BrokerRegistration::fenced)
+ .map(BrokerRegistration::id)
+ .collect(Collectors.toSet());
+ }
+
}
static CreateTopicsResponseData withoutConfigs(CreateTopicsResponseData
data) {
@@ -2443,7 +2452,7 @@ public class ReplicationControlManagerTest {
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2,
1}}).topicId();
- assertTrue(ctx.clusterControl.fencedBrokerIds().isEmpty());
+ assertTrue(ctx.fencedBrokerIds().isEmpty());
ctx.fenceBrokers(Set.of(2, 3));
PartitionRegistration partition0 = replication.getPartition(fooId, 0);