This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 5cfd248f8e4 KAFKA-13959: Controller should unfence Broker with busy
metadata log (#12274)
5cfd248f8e4 is described below
commit 5cfd248f8e4be267ced8fbf200db45ec041a93e9
Author: dengziming <[email protected]>
AuthorDate: Sat Aug 13 00:06:24 2022 +0800
KAFKA-13959: Controller should unfence Broker with busy metadata log
(#12274)
The reason for KAFKA-13959 is a little complex, the two keys to this
problem are:
KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We
rely on fetchPurgatory to complete a FetchRequest, in details, if
FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a
FetchResponse. The follower needs to send one more FetchRequest to get the HW.
Here are the event sequences:
1. When starting the leader(active controller) LEO=m+1(m is the offset of
the last record), leader HW=m(because we need more than half of the voters to
reach m+1)
2. Follower (standby controller) and observer (broker) send
FetchRequest(fetchOffset=m)
2.1. leader receives FetchRequest, set leader HW=m and waits 500ms
before send FetchResponse
2.2. leader send FetchResponse(HW=m)
3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
3. Leader append NoOpRecord, LEO=m+2. leader HW=m
4. Looping 1-4
If we change MAX_FETCH_WAIT_MS=200 (less than half of
MetadataMaxIdleIntervalMs), this problem can be solved temporarily.
We plan to improve this problem in 2 ways, firstly, in this PR, we change
the controller to unfence a broker when the broker's high-watermark has reached
the broker registration record for that broker. Secondly, we will propagate the
HWM to the replicas as quickly as possible in KAFKA-14145.
Reviewers: Luke Chen <[email protected]>, José Armando García Sancio
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../kafka/controller/BrokerHeartbeatManager.java | 26 ++++++++---------
.../kafka/controller/ClusterControlManager.java | 25 ++++++++++++++--
.../apache/kafka/controller/QuorumController.java | 27 +++++++++++++-----
.../controller/ReplicationControlManager.java | 6 ++--
.../controller/ClusterControlManagerTest.java | 19 +++++++------
.../controller/ProducerIdControlManagerTest.java | 2 +-
.../org/apache/kafka/metadata/RecordTestUtils.java | 33 +++++++++++++---------
.../kafka/raft/internals/FuturePurgatory.java | 4 +--
9 files changed, 92 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4e253047ee6..860056f9a3e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -81,7 +81,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
- val MetadataMaxIdleIntervalMs = 5000
+ val MetadataMaxIdleIntervalMs = 500
/** KRaft mode configs */
val EmptyNodeId: Int = -1
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index f31df917d76..428f1c5833e 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -511,17 +511,17 @@ public class BrokerHeartbeatManager {
/**
* Calculate the next broker state for a broker that just sent a heartbeat
request.
*
- * @param brokerId The broker id.
- * @param request The incoming heartbeat request.
- * @param lastCommittedOffset The last committed offset of the quorum
controller.
- * @param hasLeaderships A callback which evaluates to true if the
broker leads
- * at least one partition.
+ * @param brokerId The broker id.
+ * @param request The incoming heartbeat request.
+ * @param registerBrokerRecordOffset The offset of the broker's {@link
org.apache.kafka.common.metadata.RegisterBrokerRecord}.
+ * @param hasLeaderships A callback which evaluates to true
if the broker leads
+ * at least one partition.
*
- * @return The current and next broker states.
+ * @return The current and next broker states.
*/
BrokerControlStates calculateNextBrokerState(int brokerId,
BrokerHeartbeatRequestData
request,
- long lastCommittedOffset,
+ long
registerBrokerRecordOffset,
Supplier<Boolean>
hasLeaderships) {
BrokerHeartbeatState broker = brokers.getOrDefault(brokerId,
new BrokerHeartbeatState(brokerId));
@@ -533,17 +533,17 @@ public class BrokerHeartbeatManager {
"shutdown.", brokerId);
return new BrokerControlStates(currentState, SHUTDOWN_NOW);
} else if (!request.wantFence()) {
- if (request.currentMetadataOffset() >=
lastCommittedOffset) {
+ if (request.currentMetadataOffset() >=
registerBrokerRecordOffset) {
log.info("The request from broker {} to unfence has
been granted " +
- "because it has caught up with the last
committed metadata " +
- "offset {}.", brokerId, lastCommittedOffset);
+ "because it has caught up with the offset of
it's register " +
+ "broker record {}.", brokerId,
registerBrokerRecordOffset);
return new BrokerControlStates(currentState, UNFENCED);
} else {
if (log.isDebugEnabled()) {
log.debug("The request from broker {} to unfence
cannot yet " +
- "be granted because it has not caught up with
the last " +
- "committed metadata offset {}. It is still at
offset {}.",
- brokerId, lastCommittedOffset,
request.currentMetadataOffset());
+ "be granted because it has not caught up with
the offset of " +
+ "it's register broker record {}. It is still
at offset {}.",
+ brokerId, registerBrokerRecordOffset,
request.currentMetadataOffset());
}
return new BrokerControlStates(currentState, FENCED);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 235f077cfff..d30f4324217 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -217,6 +218,14 @@ public class ClusterControlManager {
*/
private final TimelineHashMap<Integer, BrokerRegistration>
brokerRegistrations;
+ /**
+ * Save the offset of each broker registration record, we will only
unfence a
+ * broker when its high watermark has reached its broker registration
record,
+ * this is not necessarily the exact offset of each broker registration
record
+ * but should not be smaller than it.
+ */
+ private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;
+
/**
* A reference to the controller's metrics registry.
*/
@@ -255,6 +264,7 @@ public class ClusterControlManager {
this.sessionTimeoutNs = sessionTimeoutNs;
this.replicaPlacer = replicaPlacer;
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.registerBrokerRecordOffsets = new
TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
@@ -366,7 +376,15 @@ public class ClusterControlManager {
return ControllerResult.atomicOf(records, new
BrokerRegistrationReply(brokerEpoch));
}
- public void replay(RegisterBrokerRecord record) {
+ public OptionalLong registerBrokerRecordOffset(int brokerId) {
+ if (registerBrokerRecordOffsets.containsKey(brokerId)) {
+ return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
+ }
+ return OptionalLong.empty();
+ }
+
+ public void replay(RegisterBrokerRecord record, long offset) {
+ registerBrokerRecordOffsets.put(record.brokerId(), offset);
int brokerId = record.brokerId();
List<Endpoint> listeners = new ArrayList<>();
for (BrokerEndpoint endpoint : record.endPoints()) {
@@ -401,14 +419,15 @@ public class ClusterControlManager {
}
public void replay(UnregisterBrokerRecord record) {
+ registerBrokerRecordOffsets.remove(record.brokerId());
int brokerId = record.brokerId();
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) {
throw new RuntimeException(String.format("Unable to replay %s: no
broker " +
- "registration found for that id", record.toString()));
+ "registration found for that id", record));
} else if (registration.epoch() != record.brokerEpoch()) {
throw new RuntimeException(String.format("Unable to replay %s: no
broker " +
- "registration with that epoch found", record.toString()));
+ "registration with that epoch found", record));
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ef87248f134..3fee25841ba 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
@@ -759,7 +760,7 @@ public final class QuorumController implements Controller {
int i = 1;
for (ApiMessageAndVersion message : result.records()) {
try {
- replay(message.message(), Optional.empty());
+ replay(message.message(), Optional.empty(),
writeOffset + result.records().size());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply
%s record, which was " +
"%d of %d record(s) in the batch following last
writeOffset %d.",
@@ -883,7 +884,7 @@ public final class QuorumController implements Controller {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
- replay(message.message(),
Optional.empty());
+ replay(message.message(),
Optional.empty(), offset);
} catch (Throwable e) {
String failureMessage =
String.format("Unable to apply %s record on standby " +
"controller, which was %d of %d
record(s) in the batch with baseOffset %d.",
@@ -938,7 +939,7 @@ public final class QuorumController implements Controller {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
- replay(message.message(),
Optional.of(reader.snapshotId()));
+ replay(message.message(),
Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
} catch (Throwable e) {
String failureMessage = String.format("Unable
to apply %s record " +
"from snapshot %s on standby
controller, which was %d of " +
@@ -1305,12 +1306,19 @@ public final class QuorumController implements
Controller {
}
}
- @SuppressWarnings("unchecked")
- private void replay(ApiMessage message, Optional<OffsetAndEpoch>
snapshotId) {
+ /**
+ * Apply the metadata record to its corresponding in-memory state(s)
+ *
+ * @param message The metadata record
+ * @param snapshotId The snapshotId if this record is from a
snapshot
+ * @param batchLastOffset The offset of the last record in the log
batch, or the lastContainedLogOffset
+ * if this record is from a snapshot, this is
used along with RegisterBrokerRecord
+ */
+ private void replay(ApiMessage message, Optional<OffsetAndEpoch>
snapshotId, long batchLastOffset) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
- clusterControl.replay((RegisterBrokerRecord) message);
+ clusterControl.replay((RegisterBrokerRecord) message,
batchLastOffset);
break;
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
@@ -1874,8 +1882,13 @@ public final class QuorumController implements
Controller {
@Override
public ControllerResult<BrokerHeartbeatReply>
generateRecordsAndResult() {
+ OptionalLong offsetForRegisterBrokerRecord =
clusterControl.registerBrokerRecordOffset(brokerId);
+ if (!offsetForRegisterBrokerRecord.isPresent()) {
+ throw new StaleBrokerEpochException(
+ String.format("Receive a heartbeat from broker %d
before registration", brokerId));
+ }
ControllerResult<BrokerHeartbeatReply> result =
replicationControl.
- processBrokerHeartbeat(request, lastCommittedOffset);
+ processBrokerHeartbeat(request,
offsetForRegisterBrokerRecord.getAsLong());
inControlledShutdown =
result.response().inControlledShutdown();
rescheduleMaybeFenceStaleBrokers();
return result;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index bf3a679d2ce..4ffb339967c 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1355,13 +1355,13 @@ public class ReplicationControlManager {
}
ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
- BrokerHeartbeatRequestData request, long lastCommittedOffset) {
+ BrokerHeartbeatRequestData request, long
registerBrokerRecordOffset) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
BrokerHeartbeatManager heartbeatManager =
clusterControl.heartbeatManager();
BrokerControlStates states =
heartbeatManager.calculateNextBrokerState(brokerId,
- request, lastCommittedOffset, () ->
brokersToIsrs.hasLeaderships(brokerId));
+ request, registerBrokerRecordOffset, () ->
brokersToIsrs.hasLeaderships(brokerId));
List<ApiMessageAndVersion> records = new ArrayList<>();
if (states.current() != states.next()) {
switch (states.next()) {
@@ -1382,7 +1382,7 @@ public class ReplicationControlManager {
heartbeatManager.touch(brokerId,
states.next().fenced(),
request.currentMetadataOffset());
- boolean isCaughtUp = request.currentMetadataOffset() >=
lastCommittedOffset;
+ boolean isCaughtUp = request.currentMetadataOffset() >=
registerBrokerRecordOffset;
BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
states.next().fenced(),
states.next().inControlledShutdown(),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 97d6c883772..e47def81e6d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -96,7 +96,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
clusterControl.checkBrokerEpoch(1, 100);
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(1, 101));
@@ -165,19 +165,20 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertFalse(clusterControl.unfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
brokerRecord.setInControlledShutdown(false);
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertFalse(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
+ assertEquals(100L,
clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
brokerRecord.setFenced(false);
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertTrue(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
@@ -217,7 +218,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertTrue(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
@@ -341,17 +342,19 @@ public class ClusterControlManagerTest {
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
assertEquals(new BrokerRegistration(1, 100,
Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"),
Collections.singletonMap("PLAINTEXT",
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT,
"example.com", 9092)),
Collections.emptyMap(), Optional.of("arack"), true, false),
clusterControl.brokerRegistrations().get(1));
+ assertEquals(100L,
clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
setBrokerId(1).
setBrokerEpoch(100);
clusterControl.replay(unregisterRecord);
assertFalse(clusterControl.brokerRegistrations().containsKey(1));
+
assertFalse(clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).isPresent());
}
@ParameterizedTest
@@ -382,7 +385,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
UnfenceBrokerRecord unfenceRecord =
new UnfenceBrokerRecord().setId(i).setEpoch(100);
clusterControl.replay(unfenceRecord);
@@ -442,7 +445,7 @@ public class ClusterControlManagerTest {
setPort((short) 9092 + i).
setName("PLAINTEXT").
setHost("example.com"));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
}
for (int i = 0; i < 2; i++) {
UnfenceBrokerRecord unfenceBrokerRecord =
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index ccdd3a5b233..80c5c505ae0 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -76,7 +76,7 @@ public class ProducerIdControlManagerTest {
setPort((short) 9092).
setName("PLAINTEXT").
setHost(String.format("broker-%02d.example.org", i)));
- clusterControl.replay(brokerRecord);
+ clusterControl.replay(brokerRecord, 100L);
}
this.producerIdControlManager = new
ProducerIdControlManager(clusterControl, snapshotRegistry);
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index f5a8da5f8a2..c21bdb54478 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -57,20 +57,25 @@ public class RecordTestUtils {
for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
ApiMessage record = recordAndVersion.message();
try {
- Method method = target.getClass().getMethod("replay",
record.getClass());
- method.invoke(target, record);
- } catch (NoSuchMethodException e) {
try {
- Method method = target.getClass().getMethod("replay",
- record.getClass(),
- Optional.class);
- method.invoke(target, record, Optional.empty());
- } catch (NoSuchMethodException t) {
- // ignore
- } catch (InvocationTargetException t) {
- throw new RuntimeException(t);
- } catch (IllegalAccessException t) {
- throw new RuntimeException(t);
+ Method method = target.getClass().getMethod("replay",
record.getClass());
+ method.invoke(target, record);
+ } catch (NoSuchMethodException e) {
+ try {
+ Method method = target.getClass().getMethod("replay",
+ record.getClass(),
+ Optional.class);
+ method.invoke(target, record, Optional.empty());
+ } catch (NoSuchMethodException t) {
+ try {
+ Method method =
target.getClass().getMethod("replay",
+ record.getClass(),
+ long.class);
+ method.invoke(target, record, 0L);
+ } catch (NoSuchMethodException i) {
+ // ignore
+ }
+ }
}
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
@@ -119,7 +124,7 @@ public class RecordTestUtils {
* @param delta the metadata delta on which to replay the records
* @param highestOffset highest offset from the list of record batches
* @param highestEpoch highest epoch from the list of record batches
- * @param recordsAndVersions list of batches of records
+ * @param batches list of batches of records
*/
public static void replayAllBatches(
MetadataDelta delta,
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
index b37fb3a3847..e5dceeaa0c3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java
@@ -56,8 +56,8 @@ public interface FuturePurgatory<T extends Comparable<T>> {
CompletableFuture<Long> await(T threshold, long maxWaitTimeMs);
/**
- * Complete awaiting futures whose associated values are larger than the
given threshold value.
- * The completion callbacks will be triggered from the calling thread.
+ * Complete awaiting futures whose threshold value from {@link
FuturePurgatory#await} are smaller
+ * than the given threshold value. The completion callbacks will be
triggered from the calling thread.
*
* @param value the threshold value used to determine which
futures can be completed
* @param currentTimeMs the current time in milliseconds that will be
passed to