This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a8cdbaf4b30 KAFKA-18138: The controller must add all extant brokers to
BrokerHeartbeatTracker when activating (#18009)
a8cdbaf4b30 is described below
commit a8cdbaf4b30a119c66a9a85f6cab32610cd17fa9
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Dec 3 07:33:52 2024 -0800
KAFKA-18138: The controller must add all extant brokers to
BrokerHeartbeatTracker when activating (#18009)
The controller must add all extant brokers to BrokerHeartbeatTracker when
activating. Otherwise, we
could end up in a situation where a broker fails exactly as a controller
failover occurs, and we
never fence it.
Also, fix a bug where the slf4j logger object in PeriodicTaskControlManager
was initialized as
though it belonged to OffsetControlManager.
Reviewers: David Mao <[email protected]>, David Arthur <[email protected]>
---
.../kafka/controller/ClusterControlManager.java | 3 ++
.../controller/PeriodicTaskControlManager.java | 2 +-
.../controller/ClusterControlManagerTest.java | 32 +++++++++++++++++++++-
3 files changed, 35 insertions(+), 2 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 580967d97a5..8576b14ebc2 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -306,8 +306,11 @@ public class ClusterControlManager {
*/
public void activate() {
heartbeatManager = new BrokerHeartbeatManager(logContext, time,
sessionTimeoutNs);
+ long nowNs = time.nanoseconds();
for (BrokerRegistration registration : brokerRegistrations.values()) {
heartbeatManager.register(registration.id(),
registration.fenced());
+ heartbeatManager.tracker().updateContactTime(
+ new BrokerIdAndEpoch(registration.id(), registration.epoch()),
nowNs);
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
index a184a0f4f04..821fa47df20 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
@@ -148,7 +148,7 @@ class PeriodicTaskControlManager {
Time time,
QueueAccessor queueAccessor
) {
- this.log = logContext.logger(OffsetControlManager.class);
+ this.log = logContext.logger(PeriodicTaskControlManager.class);
this.time = time;
this.queueAccessor = queueAccessor;
this.active = false;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 0646d4aaa56..cc147c0ce6d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -71,6 +71,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
@@ -768,7 +769,8 @@ public class ClusterControlManagerTest {
void registerNewBrokerWithDirs(ClusterControlManager clusterControl, int
brokerId, List<Uuid> dirs) {
BrokerRegistrationRequestData data = new
BrokerRegistrationRequestData().setBrokerId(brokerId)
.setClusterId(clusterControl.clusterId())
- .setIncarnationId(Uuid.randomUuid()).setLogDirs(dirs);
+ .setIncarnationId(new Uuid(brokerId, brokerId))
+ .setLogDirs(dirs);
FinalizedControllerFeatures finalizedFeatures = new
FinalizedControllerFeatures(Collections.emptyMap(), 456L);
ControllerResult<BrokerRegistrationReply> result =
clusterControl.registerBroker(data, 123L, finalizedFeatures);
RecordTestUtils.replayAll(clusterControl, result.records());
@@ -852,4 +854,32 @@ public class ClusterControlManagerTest {
clusterControl.brokerRegistrations().get(1).epoch());
}
}
+
+ @Test
+ public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
+ MockTime time = new MockTime(0L, 20L, 1000L);
+ ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
+ setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+ setFeatureControlManager(new
FeatureControlManager.Builder().build()).
+ setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
+ setTime(time).
+ build();
+ clusterControl.replay(new RegisterBrokerRecord().
+ setBrokerEpoch(100).
+ setBrokerId(0).
+ setLogDirs(asList(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))),
10002);
+ clusterControl.replay(new RegisterBrokerRecord().
+ setBrokerEpoch(123).
+ setBrokerId(1).
+ setLogDirs(asList(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))),
10005);
+ clusterControl.activate();
+ assertEquals(OptionalLong.of(1000L),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(0, 100)));
+ assertEquals(OptionalLong.of(1000L),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(1, 123)));
+ assertEquals(OptionalLong.empty(),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(1, 124)));
+ assertEquals(OptionalLong.empty(),
clusterControl.heartbeatManager().tracker().
+ contactTime(new BrokerIdAndEpoch(2, 100)));
+ }
}