This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ff55c316aa KAFKA-18106: Generate LeaderAndIsrUpdates on unclean
shutdown (#18045)
0ff55c316aa is described below
commit 0ff55c316aab9b54adcb1cfc4d3d3dbedc279270
Author: David Mao <[email protected]>
AuthorDate: Thu Dec 5 16:19:05 2024 -0800
KAFKA-18106: Generate LeaderAndIsrUpdates on unclean shutdown (#18045)
Generate LeaderAndISR change records when a broker re-registers and the
quorum controller detects an unclean shutdown.
This is necessary to ensure that we perform the expected partition state
transitions, eg: bumping leader epochs and so on.
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/controller/ClusterControlManager.java | 6 +-
.../apache/kafka/controller/QuorumController.java | 20 ++++-
.../controller/ReplicationControlManager.java | 16 ++--
.../kafka/controller/QuorumControllerTest.java | 98 +++++++++++++++++++++-
4 files changed, 128 insertions(+), 12 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 8576b14ebc2..43edafb77f4 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -454,7 +454,11 @@ public class ClusterControlManager {
}
heartbeatManager.register(brokerId, record.fenced());
- return ControllerResult.atomicOf(records, new
BrokerRegistrationReply(record.brokerEpoch()));
+ // A broker registration that cleans up a previous incarnation's
unclean shutdown may generate a large number of records.
+ // It is safe to return these records as a non-atomic batch as long as
the registration record is added last.
+ // This ensures that in case of a controller failure, the broker will
re-register and the new controller
+ // can retry the unclean shutdown cleanup.
+ return ControllerResult.of(records, new
BrokerRegistrationReply(record.brokerEpoch()));
}
ControllerResult<Void>
registerController(ControllerRegistrationRequestData request) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index bd98f7c8096..aaffb9084ef 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -207,6 +207,7 @@ public final class QuorumController implements Controller {
private OptionalLong leaderImbalanceCheckIntervalNs =
OptionalLong.empty();
private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
private long sessionTimeoutNs =
ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
+ private OptionalLong fenceStaleBrokerIntervalNs = OptionalLong.empty();
private QuorumControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
private Optional<AlterConfigPolicy> alterConfigPolicy =
Optional.empty();
@@ -301,6 +302,11 @@ public final class QuorumController implements Controller {
return this;
}
+ public Builder setFenceStaleBrokerIntervalNs(long
fenceStaleBrokerIntervalNs) {
+ this.fenceStaleBrokerIntervalNs =
OptionalLong.of(fenceStaleBrokerIntervalNs);
+ return this;
+ }
+
public Builder setMetrics(QuorumControllerMetrics controllerMetrics) {
this.controllerMetrics = controllerMetrics;
return this;
@@ -414,6 +420,7 @@ public final class QuorumController implements Controller {
leaderImbalanceCheckIntervalNs,
maxIdleIntervalNs,
sessionTimeoutNs,
+ fenceStaleBrokerIntervalNs,
controllerMetrics,
createTopicPolicy,
alterConfigPolicy,
@@ -1456,6 +1463,7 @@ public final class QuorumController implements Controller
{
OptionalLong leaderImbalanceCheckIntervalNs,
OptionalLong maxIdleIntervalNs,
long sessionTimeoutNs,
+ OptionalLong fenceStaleBrokerIntervalNs,
QuorumControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
Optional<AlterConfigPolicy> alterConfigPolicy,
@@ -1569,7 +1577,11 @@ public final class QuorumController implements
Controller {
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
- registerMaybeFenceStaleBroker(sessionTimeoutNs);
+ if (fenceStaleBrokerIntervalNs.isPresent()) {
+
registerMaybeFenceStaleBroker(fenceStaleBrokerIntervalNs.getAsLong());
+ } else {
+
registerMaybeFenceStaleBroker(maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs));
+ }
if (leaderImbalanceCheckIntervalNs.isPresent()) {
registerElectPreferred(leaderImbalanceCheckIntervalNs.getAsLong());
}
@@ -1630,12 +1642,12 @@ public final class QuorumController implements
Controller {
* This task periodically checks to see if there is a stale broker that
needs to
* be fenced. It will only ever remove one stale broker at a time.
*
- * @param sessionTimeoutNs The broker session timeout in nanoseconds.
+ * @param fenceStaleBrokerIntervalNs The interval to check for stale
brokers in nanoseconds
*/
- private void registerMaybeFenceStaleBroker(long sessionTimeoutNs) {
+ private void registerMaybeFenceStaleBroker(long
fenceStaleBrokerIntervalNs) {
periodicControl.registerTask(new PeriodicTask("maybeFenceStaleBroker",
replicationControl::maybeFenceOneStaleBroker,
- maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs),
+ fenceStaleBrokerIntervalNs,
EnumSet.noneOf(PeriodicTaskFlag.class)));
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index a995068eccd..6bbbd03b48c 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1422,11 +1422,17 @@ public class ReplicationControlManager {
* @param records The record list to append to.
*/
void handleBrokerUncleanShutdown(int brokerId, List<ApiMessageAndVersion>
records) {
- if (!featureControl.metadataVersion().isElrSupported()) return;
- generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER,
NO_LEADER, brokerId, records,
- brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
- generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER,
NO_LEADER, brokerId, records,
- brokersToElrs.partitionsWithBrokerInElr(brokerId));
+ if (featureControl.metadataVersion().isElrSupported()) {
+ // ELR is enabled, generate unclean shutdown partition change
records
+ generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown",
NO_LEADER, NO_LEADER, brokerId, records,
+ brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+ generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown",
NO_LEADER, NO_LEADER, brokerId, records,
+ brokersToElrs.partitionsWithBrokerInElr(brokerId));
+ } else {
+ // ELR is not enabled, handle the unclean shutdown as if the
broker was fenced
+ generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown",
brokerId, NO_LEADER, NO_LEADER, records,
+ brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+ }
}
/**
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 44c701f2eec..5b8cb44f0a6 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -166,6 +166,7 @@ import static
org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -353,7 +354,7 @@ public class QuorumControllerTest {
}
@Test
- public void testUncleanShutdownBroker() throws Throwable {
+ public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
short replicationFactor = (short) allBrokers.size();
long sessionTimeoutMillis = 500;
@@ -363,7 +364,6 @@ public class QuorumControllerTest {
build();
QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
-
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1,
"test-provided bootstrap ELR enabled")).
build()
) {
@@ -482,6 +482,100 @@ public class QuorumControllerTest {
}
}
+ @Test
+ public void testUncleanShutdownElrDisabled() throws Exception {
+ List<Integer> allBrokers = Arrays.asList(1, 2, 3);
+ short replicationFactor = (short) allBrokers.size();
+ long sessionTimeoutMillis = 500;
+
+ try (
+ LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ build();
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv)
+ .setControllerBuilderInitializer(controllerBuilder ->
+
controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15)))
+ .setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis))
+
.setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV0,
"test-provided bootstrap ELR not supported"))
+ .build()
+ ) {
+ ListenerCollection listeners = new ListenerCollection();
+ listeners.add(new
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+ QuorumController active = controlEnv.activeController();
+ Map<Integer, Long> brokerEpochs = new HashMap<>();
+ BrokerRegistrationRequestData.FeatureCollection features =
+ brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_4_0_IV0,
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_0.featureLevel()));
+ for (Integer brokerId : allBrokers) {
+ CompletableFuture<BrokerRegistrationReply> reply =
active.registerBroker(
+ anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
+ new BrokerRegistrationRequestData().
+ setBrokerId(brokerId).
+ setClusterId(active.clusterId()).
+ setFeatures(features).
+ setIncarnationId(Uuid.randomUuid()).
+
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
+ setListeners(listeners));
+ brokerEpochs.put(brokerId, reply.get().epoch());
+ }
+
+ // Brokers are only registered and should still be fenced
+ allBrokers.forEach(brokerId ->
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced")
+ );
+
+ // Unfence all brokers and create a topic foo
+ sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers,
brokerEpochs);
+ CreateTopicsRequestData createTopicsRequestData = new
CreateTopicsRequestData().setTopics(
+ new CreatableTopicCollection(Collections.singleton(
+ new CreatableTopic().setName("foo").setNumPartitions(1).
+ setReplicationFactor(replicationFactor)).iterator()));
+ CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
+ ANONYMOUS_CONTEXT, createTopicsRequestData,
+ Collections.singleton("foo")).get();
+ assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+ Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
+
+ // wait for brokers to become inactive
+ active.time().sleep(sessionTimeoutMillis);
+
+ // unclean shutdown for each replica
+ for (int i = 0; i < (int) replicationFactor; i++) {
+ // Verify that ELR is disabled
+ PartitionRegistration partition =
active.replicationControl().getPartition(topicIdFoo, 0);
+ assertEquals(0, partition.elr.length, partition.toString());
+ assertEquals(0, partition.lastKnownElr.length,
partition.toString());
+
+ boolean lastStandingIsr = i == (replicationFactor - 1);
+ int prevLeader = partition.leader;
+ int prevLeaderEpoch = partition.leaderEpoch;
+ // Unclean shutdown should remove the broker from the ISR and
reassign leadership
+ active.registerBroker(
+ anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
+ new BrokerRegistrationRequestData().
+ setBrokerId(prevLeader).
+ setClusterId(active.clusterId()).
+ setFeatures(features).
+ setIncarnationId(Uuid.randomUuid()).
+
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
+ setListeners(listeners)).get();
+ partition =
active.replicationControl().getPartition(topicIdFoo, 0);
+ // leader should always change, leader epoch should always be
incremented
+ int currentLeader = partition.leader;
+ int currentLeaderEpoch = partition.leaderEpoch;
+ assertNotEquals(currentLeader, prevLeader);
+ assertNotEquals(currentLeaderEpoch, prevLeaderEpoch);
+ // if the broker is not the last standing ISR, it should be
removed from the ISR
+ if (lastStandingIsr) {
+ assertArrayEquals(new int[]{prevLeader}, partition.isr);
+ assertEquals(NO_LEADER, currentLeader);
+ } else {
+ List<Integer> isr =
Arrays.stream(partition.isr).boxed().toList();
+ assertFalse(isr.contains(prevLeader));
+ }
+ }
+ }
+ }
+
@Test
public void testBalancePartitionLeaders() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);