This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ff55c316aa KAFKA-18106: Generate LeaderAndIsrUpdates on unclean 
shutdown (#18045)
0ff55c316aa is described below

commit 0ff55c316aab9b54adcb1cfc4d3d3dbedc279270
Author: David Mao <[email protected]>
AuthorDate: Thu Dec 5 16:19:05 2024 -0800

    KAFKA-18106: Generate LeaderAndIsrUpdates on unclean shutdown (#18045)
    
    Generate LeaderAndISR change records when a broker re-registers and the 
quorum controller detects an unclean shutdown.
    
    This is necessary to ensure that we perform the expected partition state 
transitions, eg: bumping leader epochs and so on.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../kafka/controller/ClusterControlManager.java    |  6 +-
 .../apache/kafka/controller/QuorumController.java  | 20 ++++-
 .../controller/ReplicationControlManager.java      | 16 ++--
 .../kafka/controller/QuorumControllerTest.java     | 98 +++++++++++++++++++++-
 4 files changed, 128 insertions(+), 12 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 8576b14ebc2..43edafb77f4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -454,7 +454,11 @@ public class ClusterControlManager {
         }
         heartbeatManager.register(brokerId, record.fenced());
 
-        return ControllerResult.atomicOf(records, new 
BrokerRegistrationReply(record.brokerEpoch()));
+        // A broker registration that cleans up a previous incarnation's 
unclean shutdown may generate a large number of records.
+        // It is safe to return these records as a non-atomic batch as long as 
the registration record is added last.
+        // This ensures that in case of a controller failure, the broker will 
re-register and the new controller
+        // can retry the unclean shutdown cleanup.
+        return ControllerResult.of(records, new 
BrokerRegistrationReply(record.brokerEpoch()));
     }
 
     ControllerResult<Void> 
registerController(ControllerRegistrationRequestData request) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index bd98f7c8096..aaffb9084ef 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -207,6 +207,7 @@ public final class QuorumController implements Controller {
         private OptionalLong leaderImbalanceCheckIntervalNs = 
OptionalLong.empty();
         private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
         private long sessionTimeoutNs = 
ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
+        private OptionalLong fenceStaleBrokerIntervalNs = OptionalLong.empty();
         private QuorumControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = 
Optional.empty();
         private Optional<AlterConfigPolicy> alterConfigPolicy = 
Optional.empty();
@@ -301,6 +302,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setFenceStaleBrokerIntervalNs(long 
fenceStaleBrokerIntervalNs) {
+            this.fenceStaleBrokerIntervalNs = 
OptionalLong.of(fenceStaleBrokerIntervalNs);
+            return this;
+        }
+
         public Builder setMetrics(QuorumControllerMetrics controllerMetrics) {
             this.controllerMetrics = controllerMetrics;
             return this;
@@ -414,6 +420,7 @@ public final class QuorumController implements Controller {
                     leaderImbalanceCheckIntervalNs,
                     maxIdleIntervalNs,
                     sessionTimeoutNs,
+                    fenceStaleBrokerIntervalNs,
                     controllerMetrics,
                     createTopicPolicy,
                     alterConfigPolicy,
@@ -1456,6 +1463,7 @@ public final class QuorumController implements Controller 
{
         OptionalLong leaderImbalanceCheckIntervalNs,
         OptionalLong maxIdleIntervalNs,
         long sessionTimeoutNs,
+        OptionalLong fenceStaleBrokerIntervalNs,
         QuorumControllerMetrics controllerMetrics,
         Optional<CreateTopicPolicy> createTopicPolicy,
         Optional<AlterConfigPolicy> alterConfigPolicy,
@@ -1569,7 +1577,11 @@ public final class QuorumController implements 
Controller {
         if (maxIdleIntervalNs.isPresent()) {
             registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
         }
-        registerMaybeFenceStaleBroker(sessionTimeoutNs);
+        if (fenceStaleBrokerIntervalNs.isPresent()) {
+            
registerMaybeFenceStaleBroker(fenceStaleBrokerIntervalNs.getAsLong());
+        } else {
+            
registerMaybeFenceStaleBroker(maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs));
+        }
         if (leaderImbalanceCheckIntervalNs.isPresent()) {
             registerElectPreferred(leaderImbalanceCheckIntervalNs.getAsLong());
         }
@@ -1630,12 +1642,12 @@ public final class QuorumController implements 
Controller {
      * This task periodically checks to see if there is a stale broker that 
needs to
      * be fenced. It will only ever remove one stale broker at a time.
      *
-     * @param sessionTimeoutNs      The broker session timeout in nanoseconds.
+     * @param fenceStaleBrokerIntervalNs The interval to check for stale 
brokers in nanoseconds
      */
-    private void registerMaybeFenceStaleBroker(long sessionTimeoutNs) {
+    private void registerMaybeFenceStaleBroker(long 
fenceStaleBrokerIntervalNs) {
         periodicControl.registerTask(new PeriodicTask("maybeFenceStaleBroker",
             replicationControl::maybeFenceOneStaleBroker,
-            maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs),
+            fenceStaleBrokerIntervalNs,
             EnumSet.noneOf(PeriodicTaskFlag.class)));
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index a995068eccd..6bbbd03b48c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1422,11 +1422,17 @@ public class ReplicationControlManager {
      * @param records       The record list to append to.
      */
     void handleBrokerUncleanShutdown(int brokerId, List<ApiMessageAndVersion> 
records) {
-        if (!featureControl.metadataVersion().isElrSupported()) return;
-        generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, 
NO_LEADER, brokerId, records,
-            brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
-        generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, 
NO_LEADER, brokerId, records,
-            brokersToElrs.partitionsWithBrokerInElr(brokerId));
+        if (featureControl.metadataVersion().isElrSupported()) {
+            // ELR is enabled, generate unclean shutdown partition change 
records
+            generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", 
NO_LEADER, NO_LEADER, brokerId, records,
+                brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+            generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", 
NO_LEADER, NO_LEADER, brokerId, records,
+                brokersToElrs.partitionsWithBrokerInElr(brokerId));
+        } else {
+            // ELR is not enabled, handle the unclean shutdown as if the 
broker was fenced
+            generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", 
brokerId, NO_LEADER, NO_LEADER, records,
+                brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+        }
     }
 
     /**
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 44c701f2eec..5b8cb44f0a6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -166,6 +166,7 @@ import static 
org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -353,7 +354,7 @@ public class QuorumControllerTest {
     }
 
     @Test
-    public void testUncleanShutdownBroker() throws Throwable {
+    public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
         List<Integer> allBrokers = Arrays.asList(1, 2, 3);
         short replicationFactor = (short) allBrokers.size();
         long sessionTimeoutMillis = 500;
@@ -363,7 +364,6 @@ public class QuorumControllerTest {
                 build();
             QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
-
                 
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, 
"test-provided bootstrap ELR enabled")).
                 build()
         ) {
@@ -482,6 +482,100 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testUncleanShutdownElrDisabled() throws Exception {
+        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
+        short replicationFactor = (short) allBrokers.size();
+        long sessionTimeoutMillis = 500;
+
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv)
+                .setControllerBuilderInitializer(controllerBuilder ->
+                    
controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15)))
+                .setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis))
+                
.setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV0,
 "test-provided bootstrap ELR not supported"))
+                .build()
+        ) {
+            ListenerCollection listeners = new ListenerCollection();
+            listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = new HashMap<>();
+            BrokerRegistrationRequestData.FeatureCollection features =
+                brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_4_0_IV0,
+                    Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_0.featureLevel()));
+            for (Integer brokerId : allBrokers) {
+                CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                    anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(brokerId).
+                        setClusterId(active.clusterId()).
+                        setFeatures(features).
+                        setIncarnationId(Uuid.randomUuid()).
+                        
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
+                        setListeners(listeners));
+                brokerEpochs.put(brokerId, reply.get().epoch());
+            }
+
+            // Brokers are only registered and should still be fenced
+            allBrokers.forEach(brokerId ->
+                assertFalse(active.clusterControl().isUnfenced(brokerId), 
"Broker " + brokerId + " should have been fenced")
+            );
+
+            // Unfence all brokers and create a topic foo
+            sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, 
brokerEpochs);
+            CreateTopicsRequestData createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                new CreatableTopicCollection(Collections.singleton(
+                    new CreatableTopic().setName("foo").setNumPartitions(1).
+                        setReplicationFactor(replicationFactor)).iterator()));
+            CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                ANONYMOUS_CONTEXT, createTopicsRequestData,
+                Collections.singleton("foo")).get();
+            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+            Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+
+            // wait for brokers to become inactive
+            active.time().sleep(sessionTimeoutMillis);
+
+            // unclean shutdown for each replica
+            for (int i = 0; i < (int) replicationFactor; i++) {
+                // Verify that ELR is disabled
+                PartitionRegistration partition = 
active.replicationControl().getPartition(topicIdFoo, 0);
+                assertEquals(0, partition.elr.length, partition.toString());
+                assertEquals(0, partition.lastKnownElr.length, 
partition.toString());
+
+                boolean lastStandingIsr = i == (replicationFactor - 1);
+                int prevLeader = partition.leader;
+                int prevLeaderEpoch = partition.leaderEpoch;
+                // Unclean shutdown should remove the broker from the ISR and 
reassign leadership
+                active.registerBroker(
+                    anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(prevLeader).
+                        setClusterId(active.clusterId()).
+                        setFeatures(features).
+                        setIncarnationId(Uuid.randomUuid()).
+                        
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
+                        setListeners(listeners)).get();
+                partition = 
active.replicationControl().getPartition(topicIdFoo, 0);
+                // leader should always change, leader epoch should always be 
incremented
+                int currentLeader = partition.leader;
+                int currentLeaderEpoch = partition.leaderEpoch;
+                assertNotEquals(currentLeader, prevLeader);
+                assertNotEquals(currentLeaderEpoch, prevLeaderEpoch);
+                // if the broker is not the last standing ISR, it should be 
removed from the ISR
+                if (lastStandingIsr) {
+                    assertArrayEquals(new int[]{prevLeader}, partition.isr);
+                    assertEquals(NO_LEADER, currentLeader);
+                } else {
+                    List<Integer> isr = 
Arrays.stream(partition.isr).boxed().toList();
+                    assertFalse(isr.contains(prevLeader));
+                }
+            }
+        }
+    }
+
     @Test
     public void testBalancePartitionLeaders() throws Throwable {
         List<Integer> allBrokers = Arrays.asList(1, 2, 3);

Reply via email to