This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d480c4aa6e KAFKA-13841: Fix a case where we were unable to place on 
fenced brokers in KRaft mode (#12075)
d480c4aa6e is described below

commit d480c4aa6e513e36050d8e067931de2270525d18
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Apr 21 14:58:02 2022 -0700

    KAFKA-13841: Fix a case where we were unable to place on fenced brokers in 
KRaft mode (#12075)
    
    This PR fixes a case where we were unable to place on fenced brokers In 
KRaft mode. Specifically,
    if we had a broker registration in the metadata log, but no associated 
heartbeat, previously the
    HeartbeatManager would not track the fenced broker. This PR fixes this by 
adding this logic to the
    metadata log replay path in ClusterControlManager.
    
    Reviewers: David Arthur <[email protected]>, dengziming 
<[email protected]>
---
 .../kafka/controller/BrokerHeartbeatManager.java   | 16 +++++++++++++++
 .../kafka/controller/ClusterControlManager.java    | 14 +++++++------
 .../controller/ReplicationControlManager.java      |  2 +-
 .../controller/ReplicationControlManagerTest.java  | 24 +++++++++++++++++++++-
 4 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index 2e71b76fcb..f31df917d7 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -342,6 +342,22 @@ public class BrokerHeartbeatManager {
         }
     }
 
+    /**
+     * Register this broker if we haven't already, and make sure its fencing 
state is
+     * correct.
+     *
+     * @param brokerId          The broker ID.
+     * @param fenced            True only if the broker is currently fenced.
+     */
+    void register(int brokerId, boolean fenced) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker == null) {
+            touch(brokerId, fenced, -1);
+        } else if (broker.fenced() != fenced) {
+            touch(brokerId, fenced, broker.metadataOffset);
+        }
+    }
+
     /**
      * Update broker state, including lastContactNs.
      *
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index add0a53a76..538e8a13eb 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -302,7 +302,6 @@ public class ClusterControlManager {
                 if (!existing.incarnationId().equals(request.incarnationId())) 
{
                     // Remove any existing session for the old broker 
incarnation.
                     heartbeatManager.remove(brokerId);
-                    existing = null;
                 }
             }
         }
@@ -334,11 +333,7 @@ public class ClusterControlManager {
                 setMaxSupportedVersion(feature.maxSupportedVersion()));
         }
 
-        if (existing == null) {
-            heartbeatManager.touch(brokerId, true, -1);
-        } else {
-            heartbeatManager.touch(brokerId, existing.fenced(), -1);
-        }
+        heartbeatManager.register(brokerId, record.fenced());
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record,
@@ -366,6 +361,10 @@ public class ClusterControlManager {
                     record.incarnationId(), listeners, features,
                     Optional.ofNullable(record.rack()), record.fenced()));
         updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
+        if (heartbeatManager != null) {
+            if (prevRegistration != null) heartbeatManager.remove(brokerId);
+            heartbeatManager.register(brokerId, record.fenced());
+        }
         if (prevRegistration == null) {
             log.info("Registered new broker: {}", record);
         } else if 
(prevRegistration.incarnationId().equals(record.incarnationId())) {
@@ -385,6 +384,7 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no 
broker " +
                 "registration with that epoch found", record.toString()));
         } else {
+            if (heartbeatManager != null) heartbeatManager.remove(brokerId);
             brokerRegistrations.remove(brokerId);
             updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unregistered broker: {}", record);
@@ -401,6 +401,7 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no 
broker " +
                 "registration with that epoch found", record.toString()));
         } else {
+            if (heartbeatManager != null) heartbeatManager.register(brokerId, 
true);
             brokerRegistrations.put(brokerId, 
registration.cloneWithFencing(true));
             updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Fenced broker: {}", record);
@@ -417,6 +418,7 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no 
broker " +
                 "registration with that epoch found", record.toString()));
         } else {
+            if (heartbeatManager != null) heartbeatManager.register(brokerId, 
false);
             brokerRegistrations.put(brokerId, 
registration.cloneWithFencing(false));
             updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unfenced broker: {}", record);
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index c9d6a5997f..dfa3009ea2 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -355,7 +355,7 @@ public class ReplicationControlManager {
     /**
      * A ClusterDescriber which supplies cluster information to our 
ReplicaPlacer.
      */
-    private final KRaftClusterDescriber clusterDescriber = new 
KRaftClusterDescriber();
+    final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
 
     private ReplicationControlManager(
         SnapshotRegistry snapshotRegistry,
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0bfb6da7a2..356a627223 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.LeaderRecoveryState;
@@ -73,6 +74,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -251,7 +253,7 @@ public class ReplicationControlManagerTest {
         void registerBrokers(Integer... brokerIds) throws Exception {
             for (int brokerId : brokerIds) {
                 RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-                    setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
+                    setBrokerEpoch(brokerId + 
100).setBrokerId(brokerId).setRack(null);
                 brokerRecord.endPoints().add(new 
RegisterBrokerRecord.BrokerEndpoint().
                     setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
                     setPort((short) 9092 + brokerId).
@@ -1725,4 +1727,24 @@ public class ReplicationControlManagerTest {
         return response;
     }
 
+    @Test
+    public void testKRaftClusterDescriber() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(2, 3, 4);
+        ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+        ctx.createTestTopic("bar", new int[][]{
+            new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
+        KRaftClusterDescriber describer = replication.clusterDescriber;
+        HashSet<UsableBroker> brokers = new HashSet<>();
+        describer.usableBrokers().forEachRemaining(broker -> 
brokers.add(broker));
+        assertEquals(new HashSet<>(Arrays.asList(
+            new UsableBroker(0, Optional.empty(), true),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false),
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false))), brokers);
+    }
 }

Reply via email to