This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a8cdbaf4b30 KAFKA-18138: The controller must add all extant brokers to 
BrokerHeartbeatTracker when activating (#18009)
a8cdbaf4b30 is described below

commit a8cdbaf4b30a119c66a9a85f6cab32610cd17fa9
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Dec 3 07:33:52 2024 -0800

    KAFKA-18138: The controller must add all extant brokers to 
BrokerHeartbeatTracker when activating (#18009)
    
    The controller must add all extant brokers to BrokerHeartbeatTracker when 
activating. Otherwise, we
    could end up in a situation where a broker fails exactly as a controller 
failover occurs, and we
    never fence it.
    
    Also, fix a bug where the slf4j logger object in PeriodicTaskControlManager 
was initialized as
    though it belonged to OffsetControlManager.
    
    Reviewers: David Mao <[email protected]>, David Arthur <[email protected]>
---
 .../kafka/controller/ClusterControlManager.java    |  3 ++
 .../controller/PeriodicTaskControlManager.java     |  2 +-
 .../controller/ClusterControlManagerTest.java      | 32 +++++++++++++++++++++-
 3 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 580967d97a5..8576b14ebc2 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -306,8 +306,11 @@ public class ClusterControlManager {
      */
     public void activate() {
         heartbeatManager = new BrokerHeartbeatManager(logContext, time, 
sessionTimeoutNs);
+        long nowNs = time.nanoseconds();
         for (BrokerRegistration registration : brokerRegistrations.values()) {
             heartbeatManager.register(registration.id(), 
registration.fenced());
+            heartbeatManager.tracker().updateContactTime(
+                new BrokerIdAndEpoch(registration.id(), registration.epoch()), 
nowNs);
         }
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
index a184a0f4f04..821fa47df20 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
@@ -148,7 +148,7 @@ class PeriodicTaskControlManager {
         Time time,
         QueueAccessor queueAccessor
     ) {
-        this.log = logContext.logger(OffsetControlManager.class);
+        this.log = logContext.logger(PeriodicTaskControlManager.class);
         this.time = time;
         this.queueAccessor = queueAccessor;
         this.active = false;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 0646d4aaa56..cc147c0ce6d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -71,6 +71,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
@@ -768,7 +769,8 @@ public class ClusterControlManagerTest {
     void registerNewBrokerWithDirs(ClusterControlManager clusterControl, int 
brokerId, List<Uuid> dirs) {
         BrokerRegistrationRequestData data = new 
BrokerRegistrationRequestData().setBrokerId(brokerId)
                 .setClusterId(clusterControl.clusterId())
-                .setIncarnationId(Uuid.randomUuid()).setLogDirs(dirs);
+                .setIncarnationId(new Uuid(brokerId, brokerId))
+                .setLogDirs(dirs);
         FinalizedControllerFeatures finalizedFeatures = new 
FinalizedControllerFeatures(Collections.emptyMap(), 456L);
         ControllerResult<BrokerRegistrationReply> result = 
clusterControl.registerBroker(data, 123L, finalizedFeatures);
         RecordTestUtils.replayAll(clusterControl, result.records());
@@ -852,4 +854,32 @@ public class ClusterControlManagerTest {
                     clusterControl.brokerRegistrations().get(1).epoch());
         }
     }
+
+    @Test
+    public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
+        MockTime time = new MockTime(0L, 20L, 1000L);
+        ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
+            setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+            setFeatureControlManager(new 
FeatureControlManager.Builder().build()).
+            setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
+            setTime(time).
+            build();
+        clusterControl.replay(new RegisterBrokerRecord().
+            setBrokerEpoch(100).
+            setBrokerId(0).
+            setLogDirs(asList(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))), 
10002);
+        clusterControl.replay(new RegisterBrokerRecord().
+            setBrokerEpoch(123).
+            setBrokerId(1).
+            setLogDirs(asList(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))), 
10005);
+        clusterControl.activate();
+        assertEquals(OptionalLong.of(1000L), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(0, 100)));
+        assertEquals(OptionalLong.of(1000L), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(1, 123)));
+        assertEquals(OptionalLong.empty(), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(1, 124)));
+        assertEquals(OptionalLong.empty(), 
clusterControl.heartbeatManager().tracker().
+            contactTime(new BrokerIdAndEpoch(2, 100)));
+    }
 }

Reply via email to