This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14a9130f6fa KAFKA-17793: Improve kcontroller robustness against long
delays (#17502)
14a9130f6fa is described below
commit 14a9130f6fa31c10cb65cc500c101148d0410306
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Mon Oct 28 08:36:07 2024 -0700
KAFKA-17793: Improve kcontroller robustness against long delays (#17502)
As described in KIP-500, the Kafka controller monitors the liveness of each
broker in the cluster. It gathers this information from heartbeats sent from
the brokers themselves.
In some rare cases, the main controller thread may get blocked for several
seconds at a time. In the current code, this will result in the controller
being unable to update the last contact times for the brokers during this time.
This PR changes the controller heartbeat handling to be partially lockless.
Specifically, the last contact time for each broker will be updated locklessly
prior to the rest of the heartbeat handling. This will ensure that heartbeats
always get through.
Additionally, this PR adds a PeriodicTaskControlManager to better manage
periodic tasks. This should help handle the very common pattern where we want
to schedule a background task at some frequency. We also want the background
task to be immediately rescheduled if there is too much work to be done in one
event.
Reviewers: Liu Zeyu <[email protected]>, David Arthur <[email protected]>
---
.../kafka/controller/BrokerHeartbeatManager.java | 247 +++---------
.../kafka/controller/BrokerHeartbeatTracker.java | 146 +++++++
.../apache/kafka/controller/BrokerIdAndEpoch.java | 58 +++
.../kafka/controller/ClusterControlManager.java | 23 +-
.../controller/DelegationTokenControlManager.java | 13 +-
.../org/apache/kafka/controller/PeriodicTask.java | 73 ++++
.../controller/PeriodicTaskControlManager.java | 250 ++++++++++++
.../apache/kafka/controller/PeriodicTaskFlag.java | 28 ++
.../apache/kafka/controller/QuorumController.java | 441 ++++++++-------------
.../controller/ReplicationControlManager.java | 38 +-
.../errors/EventHandlerExceptionInfo.java | 3 +
.../errors/PeriodicControlTaskException.java | 27 ++
.../controller/BrokerHeartbeatManagerTest.java | 144 +------
.../controller/BrokerHeartbeatTrackerTest.java | 134 +++++++
.../controller/PeriodicTaskControlManagerTest.java | 300 ++++++++++++++
.../kafka/controller/QuorumControllerTest.java | 4 +-
.../controller/ReplicationControlManagerTest.java | 13 +-
.../errors/EventHandlerExceptionInfoTest.java | 40 ++
18 files changed, 1357 insertions(+), 625 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index e1734c52fbc..e63170ca5bf 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -42,11 +42,11 @@ import static
org.apache.kafka.controller.BrokerControlState.UNFENCED;
/**
- * The BrokerHeartbeatManager manages all the soft state associated with
broker heartbeats.
- * Soft state is state which does not appear in the metadata log. This state
includes
- * things like the last time each broker sent us a heartbeat. As of KIP-841,
the controlled
- * shutdown state is no longer treated as soft state and is persisted to the
metadata log on broker
- * controlled shutdown requests.
+ * The BrokerHeartbeatManager manages some of the soft state associated with
broker heartbeats.
+ * For example, it stores the last metadata offset which each broker reported.
It contains the
+ * BrokerHeartbeatTracker, which stores the last time we received a heartbeat
from each broker.
+ * In addition to storing this soft state, the BrokerHeartbeatManager
aggregates some information
+ * about brokers (such as whether they're fenced or not) into a single place.
*
* Only the active controller has a BrokerHeartbeatManager, since only the
active
* controller handles broker heartbeats. Standby controllers will create a
heartbeat
@@ -63,17 +63,15 @@ public class BrokerHeartbeatManager {
private final int id;
/**
- * The last time we received a heartbeat from this broker, in
monotonic nanoseconds.
- * When this field is updated, we also may have to update the broker's
position in
- * the unfenced list.
+ * True if this broker is fenced.
*/
- long lastContactNs;
+ private boolean fenced;
/**
* The last metadata offset which this broker reported. When this
field is updated,
* we may also have to update the broker's position in the active set.
*/
- long metadataOffset;
+ private long metadataOffset;
/**
* The offset at which the broker should complete its controlled
shutdown, or -1
@@ -82,23 +80,16 @@ public class BrokerHeartbeatManager {
*/
private long controlledShutdownOffset;
- /**
- * The previous entry in the unfenced list, or null if the broker is
not in that list.
- */
- private BrokerHeartbeatState prev;
-
- /**
- * The next entry in the unfenced list, or null if the broker is not
in that list.
- */
- private BrokerHeartbeatState next;
-
- BrokerHeartbeatState(int id) {
+ BrokerHeartbeatState(
+ int id,
+ boolean fenced,
+ long metadataOffset,
+ long controlledShutdownOffset
+ ) {
this.id = id;
- this.lastContactNs = 0;
- this.prev = null;
- this.next = null;
- this.metadataOffset = -1;
- this.controlledShutdownOffset = -1;
+ this.fenced = fenced;
+ this.metadataOffset = metadataOffset;
+ this.controlledShutdownOffset = controlledShutdownOffset;
}
/**
@@ -112,7 +103,18 @@ public class BrokerHeartbeatManager {
* Returns true only if the broker is fenced.
*/
boolean fenced() {
- return prev == null;
+ return fenced;
+ }
+
+ /**
+ * Get the last metadata offset that was reported.
+ */
+ long metadataOffset() {
+ return metadataOffset;
+ }
+
+ void setMetadataOffset(long metadataOffset) {
+ this.metadataOffset = metadataOffset;
}
/**
@@ -142,135 +144,42 @@ public class BrokerHeartbeatManager {
}
}
- static class BrokerHeartbeatStateList {
- /**
- * The head of the list of unfenced brokers. The list is sorted in
ascending order
- * of last contact time.
- */
- private final BrokerHeartbeatState head;
-
- BrokerHeartbeatStateList() {
- this.head = new BrokerHeartbeatState(-1);
- head.prev = head;
- head.next = head;
- }
-
- /**
- * Return the head of the list, or null if the list is empty.
- */
- BrokerHeartbeatState first() {
- BrokerHeartbeatState result = head.next;
- return result == head ? null : result;
- }
-
- /**
- * Add the broker to the list. We start looking for a place to put it
at the end
- * of the list.
- */
- void add(BrokerHeartbeatState broker) {
- BrokerHeartbeatState cur = head.prev;
- while (true) {
- if (cur == head || cur.lastContactNs <= broker.lastContactNs) {
- broker.next = cur.next;
- cur.next.prev = broker;
- broker.prev = cur;
- cur.next = broker;
- break;
- }
- cur = cur.prev;
- }
- }
-
- /**
- * Remove a broker from the list.
- */
- void remove(BrokerHeartbeatState broker) {
- if (broker.next == null) {
- throw new RuntimeException(broker + " is not in the list.");
- }
- broker.prev.next = broker.next;
- broker.next.prev = broker.prev;
- broker.prev = null;
- broker.next = null;
- }
-
- BrokerHeartbeatStateIterator iterator() {
- return new BrokerHeartbeatStateIterator(head);
- }
- }
-
- static class BrokerHeartbeatStateIterator implements
Iterator<BrokerHeartbeatState> {
- private final BrokerHeartbeatState head;
- private BrokerHeartbeatState cur;
-
- BrokerHeartbeatStateIterator(BrokerHeartbeatState head) {
- this.head = head;
- this.cur = head;
- }
-
- @Override
- public boolean hasNext() {
- return cur.next != head;
- }
-
- @Override
- public BrokerHeartbeatState next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- BrokerHeartbeatState result = cur.next;
- cur = cur.next;
- return result;
- }
- }
-
private final Logger log;
/**
- * The Kafka clock object to use.
- */
- private final Time time;
-
- /**
- * The broker session timeout in nanoseconds.
+ * Tracks the last time broker heartbeats were reported for each broker.
*/
- private final long sessionTimeoutNs;
+ private final BrokerHeartbeatTracker tracker;
/**
* Maps broker IDs to heartbeat states.
*/
private final HashMap<Integer, BrokerHeartbeatState> brokers;
- /**
- * The list of unfenced brokers, sorted by last contact time.
- */
- private final BrokerHeartbeatStateList unfenced;
-
/**
* The set of active brokers. A broker is active if it is unfenced, and
not shutting
* down.
*/
private final TreeSet<BrokerHeartbeatState> active;
- BrokerHeartbeatManager(LogContext logContext,
- Time time,
- long sessionTimeoutNs) {
+ BrokerHeartbeatManager(
+ LogContext logContext,
+ Time time,
+ long sessionTimeoutNs
+ ) {
this.log = logContext.logger(BrokerHeartbeatManager.class);
- this.time = time;
- this.sessionTimeoutNs = sessionTimeoutNs;
+ this.tracker = new BrokerHeartbeatTracker(time, sessionTimeoutNs);
this.brokers = new HashMap<>();
- this.unfenced = new BrokerHeartbeatStateList();
this.active = new TreeSet<>(MetadataOffsetComparator.INSTANCE);
}
- // VisibleForTesting
- Time time() {
- return time;
+ BrokerHeartbeatTracker tracker() {
+ return tracker;
}
// VisibleForTesting
- BrokerHeartbeatStateList unfenced() {
- return unfenced;
+ Time time() {
+ return tracker.time();
}
// VisibleForTesting
@@ -287,7 +196,6 @@ public class BrokerHeartbeatManager {
return OptionalLong.of(broker.controlledShutdownOffset);
}
-
/**
* Mark a broker as fenced.
*
@@ -296,7 +204,8 @@ public class BrokerHeartbeatManager {
void fence(int brokerId) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker != null) {
- untrack(broker);
+ broker.fenced = true;
+ active.remove(broker);
}
}
@@ -308,7 +217,7 @@ public class BrokerHeartbeatManager {
void remove(int brokerId) {
BrokerHeartbeatState broker = brokers.remove(brokerId);
if (broker != null) {
- untrack(broker);
+ active.remove(broker);
}
}
@@ -320,7 +229,6 @@ public class BrokerHeartbeatManager {
*/
private void untrack(BrokerHeartbeatState broker) {
if (!broker.fenced()) {
- unfenced.remove(broker);
if (!broker.shuttingDown()) {
active.remove(broker);
}
@@ -331,28 +239,12 @@ public class BrokerHeartbeatManager {
* Check if the given broker has a valid session.
*
* @param brokerId The broker ID to check.
+ * @param brokerEpoch The broker epoch to check.
*
* @return True if the given broker has a valid session.
*/
- boolean hasValidSession(int brokerId) {
- BrokerHeartbeatState broker = brokers.get(brokerId);
- if (broker == null) return false;
- return hasValidSession(broker);
- }
-
- /**
- * Check if the given broker has a valid session.
- *
- * @param broker The broker to check.
- *
- * @return True if the given broker has a valid session.
- */
- private boolean hasValidSession(BrokerHeartbeatState broker) {
- if (broker.fenced()) {
- return false;
- } else {
- return broker.lastContactNs + sessionTimeoutNs >=
time.nanoseconds();
- }
+ boolean hasValidSession(int brokerId, long brokerEpoch) {
+ return tracker.hasValidSession(new BrokerIdAndEpoch(brokerId,
brokerEpoch));
}
/**
@@ -366,7 +258,7 @@ public class BrokerHeartbeatManager {
BrokerHeartbeatState broker = brokers.get(brokerId);
long metadataOffset = -1L;
if (broker == null) {
- broker = new BrokerHeartbeatState(brokerId);
+ broker = new BrokerHeartbeatState(brokerId, fenced, -1L, -1L);
brokers.put(brokerId, broker);
} else if (broker.fenced() != fenced) {
metadataOffset = broker.metadataOffset;
@@ -388,18 +280,23 @@ public class BrokerHeartbeatManager {
// position in either of those data structures depends on values we are
// changing here. We will re-add it if necessary at the end of this
function.
untrack(broker);
- broker.lastContactNs = time.nanoseconds();
+ broker.fenced = fenced;
broker.metadataOffset = metadataOffset;
+ boolean isActive = false;
if (fenced) {
// If a broker is fenced, it leaves controlled shutdown. On its
next heartbeat,
// it will shut down immediately.
broker.controlledShutdownOffset = -1;
} else {
- unfenced.add(broker);
if (!broker.shuttingDown()) {
- active.add(broker);
+ isActive = true;
}
}
+ if (isActive) {
+ active.add(broker);
+ } else {
+ active.remove(broker);
+ }
}
long lowestActiveOffset() {
@@ -431,38 +328,6 @@ public class BrokerHeartbeatManager {
}
}
- /**
- * Return the time in monotonic nanoseconds at which we should check if a
broker
- * session needs to be expired.
- */
- long nextCheckTimeNs() {
- BrokerHeartbeatState broker = unfenced.first();
- if (broker == null) {
- return Long.MAX_VALUE;
- } else {
- return broker.lastContactNs + sessionTimeoutNs;
- }
- }
-
- /**
- * Check if the oldest broker to have heartbeated has already violated the
- * sessionTimeoutNs timeout and needs to be fenced.
- *
- * @return An Optional broker node id.
- */
- Optional<Integer> findOneStaleBroker() {
- BrokerHeartbeatStateIterator iterator = unfenced.iterator();
- if (iterator.hasNext()) {
- BrokerHeartbeatState broker = iterator.next();
- // The unfenced list is sorted on last contact time from each
- // broker. If the first broker is not stale, then none is.
- if (!hasValidSession(broker)) {
- return Optional.of(broker.id);
- }
- }
- return Optional.empty();
- }
-
Iterator<UsableBroker> usableBrokers(
Function<Integer, Optional<String>> idToRack
) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
new file mode 100644
index 00000000000..722030f07ae
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatTracker.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The BrokerheartbeatTracker stores the last time each broker sent a
heartbeat to us.
+ * This class will be present only on the active controller.
+ *
+ * UNLIKE MOST OF THE KAFKA CONTROLLER, THIS CLASS CAN BE ACCESSED FROM
MULTIPLE THREADS.
+ * Everything in here must be thread-safe. It is intended to be accessed
directly from the
+ * request handler thread pool. This ensures that the heartbeats always get
through, even
+ * if the main controller thread is busy.
+ */
+class BrokerHeartbeatTracker {
+ /**
+ * The clock to use.
+ */
+ private final Time time;
+
+ /**
+ * The broker session timeout in nanoseconds.
+ */
+ private final long sessionTimeoutNs;
+
+ /**
+ * Maps a broker ID and epoch to the last contact time in monotonic
nanoseconds.
+ */
+ private final ConcurrentHashMap<BrokerIdAndEpoch, Long> contactTimes;
+
+ BrokerHeartbeatTracker(Time time, long sessionTimeoutNs) {
+ this.time = time;
+ this.sessionTimeoutNs = sessionTimeoutNs;
+ this.contactTimes = new ConcurrentHashMap<>();
+ }
+
+ Time time() {
+ return time;
+ }
+
+ /**
+ * Update the contact time for the given broker ID and epoch to be the
current time.
+ *
+ * @param idAndEpoch The broker ID and epoch.
+ */
+ void updateContactTime(BrokerIdAndEpoch idAndEpoch) {
+ updateContactTime(idAndEpoch, time.nanoseconds());
+ }
+
+ /**
+ * Update the contact time for the given broker ID and epoch to be the
given time.
+ *
+ * @param idAndEpoch The broker ID and epoch.
+ * @param timeNs The monotonic time in nanoseconds.
+ */
+ void updateContactTime(BrokerIdAndEpoch idAndEpoch, long timeNs) {
+ contactTimes.put(idAndEpoch, timeNs);
+ }
+
+ /**
+ * Get the contact time for the given broker ID and epoch.
+ *
+ * @param idAndEpoch The broker ID and epoch.
+ * @return The contact time, or Optional.empty if none is
known.
+ */
+ OptionalLong contactTime(BrokerIdAndEpoch idAndEpoch) {
+ Long value = contactTimes.get(idAndEpoch);
+ if (value == null) return OptionalLong.empty();
+ return OptionalLong.of(value);
+ }
+
+ /**
+ * Remove either one or zero expired brokers from the map.
+ *
+ * @return The expired broker that was removed, or Optional.empty if
there was none.
+ */
+ Optional<BrokerIdAndEpoch> maybeRemoveExpired() {
+ return maybeRemoveExpired(time.nanoseconds());
+ }
+
+ /**
+ * Remove either one or zero expired brokers from the map.
+ *
+ * @param nowNs The current time in monotonic nanoseconds.
+ *
+ * @return The expired broker that was removed, or Optional.empty if
there was none.
+ */
+ Optional<BrokerIdAndEpoch> maybeRemoveExpired(long nowNs) {
+ Iterator<Entry<BrokerIdAndEpoch, Long>> iterator =
+ contactTimes.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<BrokerIdAndEpoch, Long> entry = iterator.next();
+ if (isExpired(entry.getValue(), nowNs)) {
+ iterator.remove();
+ return Optional.of(entry.getKey());
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Return true if the given time is outside the expiration window.
+ * If the timestamp has undergone 64-bit rollover, we will not expire
anything.
+ *
+ * @param timeNs The provided time in monotonic nanoseconds.
+ * @param nowNs The current time in monotonic nanoseconds.
+ * @return True if the timestamp is expired.
+ */
+ boolean isExpired(long timeNs, long nowNs) {
+ return (nowNs > timeNs) && (timeNs + sessionTimeoutNs < nowNs);
+ }
+
+ /**
+ * Return true if the given broker has a session whose time has not yet
expired.
+ *
+ * @param idAndEpoch The broker id and epoch.
+ * @return True only if the broker session was found and is
still valid.
+ */
+ boolean hasValidSession(BrokerIdAndEpoch idAndEpoch) {
+ Long timeNs = contactTimes.get(idAndEpoch);
+ if (timeNs == null) return false;
+ return !isExpired(timeNs, time.nanoseconds());
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java
new file mode 100644
index 00000000000..a0cf60d1f65
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerIdAndEpoch.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Objects;
+
+public class BrokerIdAndEpoch {
+ private final int id;
+ private final long epoch;
+
+ public BrokerIdAndEpoch(
+ int id,
+ long epoch
+ ) {
+ this.id = id;
+ this.epoch = epoch;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public long epoch() {
+ return epoch;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || (!(o instanceof BrokerIdAndEpoch))) return false;
+ BrokerIdAndEpoch other = (BrokerIdAndEpoch) o;
+ return id == other.id && epoch == other.epoch;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, epoch);
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerIdAndEpoch(id=" + id + ", epoch=" + epoch + ")";
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 0e263a548c8..c583906d4ee 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -240,7 +240,7 @@ public class ClusterControlManager {
/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
- private BrokerHeartbeatManager heartbeatManager;
+ private volatile BrokerHeartbeatManager heartbeatManager;
/**
* A future which is completed as soon as we have the given number of
brokers
@@ -356,7 +356,7 @@ public class ClusterControlManager {
Uuid prevIncarnationId = null;
if (existing != null) {
prevIncarnationId = existing.incarnationId();
- if (heartbeatManager.hasValidSession(brokerId)) {
+ if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another
broker is " +
"registered with that broker id.");
@@ -511,6 +511,22 @@ public class ClusterControlManager {
setMaxSupportedVersion(feature.maxSupportedVersion());
}
+ /**
+ * Track an incoming broker heartbeat. Unlike most functions, this one is
not called from the main
+ * controller thread, so it can only access local, volatile and atomic
data.
+ *
+ * @param brokerId The broker id to track.
+ * @param brokerEpoch The broker epoch to track.
+ *
+ * @returns True only if the ClusterControlManager is active.
+ */
+ boolean trackBrokerHeartbeat(int brokerId, long brokerEpoch) {
+ BrokerHeartbeatManager manager = heartbeatManager;
+ if (manager == null) return false;
+ manager.tracker().updateContactTime(new BrokerIdAndEpoch(brokerId,
brokerEpoch));
+ return true;
+ }
+
public OptionalLong registerBrokerRecordOffset(int brokerId) {
Long registrationOffset = registerBrokerRecordOffsets.get(brokerId);
if (registrationOffset != null) {
@@ -715,6 +731,9 @@ public class ClusterControlManager {
}
BrokerHeartbeatManager heartbeatManager() {
+ // We throw RuntimeException here rather than NotControllerException
because all the callers
+ // have already verified that we are active. For example,
ControllerWriteEvent.run verifies
+ // that we are the current active controller before running any
event-specific code.
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
index 8e002da5030..e9bfe10436c 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collections;
import java.util.List;
import javax.crypto.Mac;
@@ -59,6 +60,8 @@ import static
org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
* Manages DelegationTokens.
*/
public class DelegationTokenControlManager {
+ private static final int MAX_RECORDS_PER_EXPIRATION = 1000;
+
private final Time time = Time.SYSTEM;
static class Builder {
@@ -95,6 +98,7 @@ public class DelegationTokenControlManager {
DelegationTokenControlManager build() {
if (logContext == null) logContext = new LogContext();
+ if (tokenCache == null) tokenCache = new
DelegationTokenCache(Collections.emptySet());
return new DelegationTokenControlManager(
logContext,
tokenCache,
@@ -330,9 +334,9 @@ public class DelegationTokenControlManager {
}
// Periodic call to remove expired DelegationTokens
- public List<ApiMessageAndVersion> sweepExpiredDelegationTokens() {
+ public ControllerResult<Boolean> sweepExpiredDelegationTokens() {
long now = time.milliseconds();
- List<ApiMessageAndVersion> records = new ArrayList<>();
+ List<ApiMessageAndVersion> records = new ArrayList<>(0);
for (TokenInformation oldTokenInformation: tokenCache.tokens()) {
if ((oldTokenInformation.maxTimestamp() < now) ||
@@ -341,9 +345,12 @@ public class DelegationTokenControlManager {
oldTokenInformation.tokenId(),
oldTokenInformation.ownerAsString());
records.add(new ApiMessageAndVersion(new
RemoveDelegationTokenRecord().
setTokenId(oldTokenInformation.tokenId()), (short) 0));
+ if (records.size() >= MAX_RECORDS_PER_EXPIRATION) {
+ return ControllerResult.of(records, true);
+ }
}
}
- return records;
+ return ControllerResult.of(records, false);
}
public void replay(DelegationTokenRecord record) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
new file mode 100644
index 00000000000..8fae9c782b1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.EnumSet;
+import java.util.function.Supplier;
+
+class PeriodicTask {
+ /**
+ * The name of this periodic task.
+ */
+ private final String name;
+
+ /**
+ * The callback for this task. If ControllerResult.response is true, we
will schedule the
+ * task again after only a very short delay. This is useful if we only
finished part of the
+ * work we wanted to finish.
+ */
+ private final Supplier<ControllerResult<Boolean>> op;
+
+ /**
+ * The period of the task, in nanoseconds.
+ */
+ private final long periodNs;
+
+ /**
+ * The flags used by this periodic task.
+ */
+ private final EnumSet<PeriodicTaskFlag> flags;
+
+ PeriodicTask(
+ String name,
+ Supplier<ControllerResult<Boolean>> op,
+ long periodNs,
+ EnumSet<PeriodicTaskFlag> flags
+ ) {
+ this.name = name;
+ this.op = op;
+ this.periodNs = periodNs;
+ this.flags = flags;
+ }
+
+ String name() {
+ return name;
+ }
+
+ Supplier<ControllerResult<Boolean>> op() {
+ return op;
+ }
+
+ long periodNs() {
+ return periodNs;
+ }
+
+ EnumSet<PeriodicTaskFlag> flags() {
+ return flags;
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
new file mode 100644
index 00000000000..f51fb254888
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskControlManager.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.errors.PeriodicControlTaskException;
+
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+
+class PeriodicTaskControlManager {
+ static class Builder {
+ private LogContext logContext = null;
+ private Time time = Time.SYSTEM;
+ private QueueAccessor queueAccessor = null;
+
+ Builder setLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ Builder setTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
+ Builder setQueueAccessor(QueueAccessor queueAccessor) {
+ this.queueAccessor = queueAccessor;
+ return this;
+ }
+
+ PeriodicTaskControlManager build() {
+ if (logContext == null) logContext = new LogContext();
+ if (queueAccessor == null) throw new RuntimeException("You must
set queueAccessor");
+ return new PeriodicTaskControlManager(logContext,
+ time,
+ queueAccessor);
+ }
+ }
+
+ interface QueueAccessor {
+ void scheduleDeferred(
+ String tag,
+ long deadlineNs,
+ Supplier<ControllerResult<Void>> op
+ );
+
+ void cancelDeferred(String tag);
+ }
+
+ class PeriodicTaskOperation implements Supplier<ControllerResult<Void>> {
+ private final PeriodicTask task;
+
+ PeriodicTaskOperation(PeriodicTask task) {
+ this.task = task;
+ }
+
+ @Override
+ public ControllerResult<Void> get() {
+ long startNs = 0;
+ if (log.isDebugEnabled() ||
task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
+ startNs = time.nanoseconds();
+ }
+ ControllerResult<Boolean> result;
+ try {
+ result = task.op().get();
+ } catch (Exception e) {
+ // Reschedule the task after a lengthy delay.
+ reschedule(task, false, true);
+ // We wrap the exception in a PeriodicControlTaskException
before throwing it to ensure
+ // that it is handled correctly in
QuorumController::handleEventException. We want it to
+ // cause the metadata error metric to be incremented, but not
cause a controller failover.
+ throw new PeriodicControlTaskException(task.name() + ":
periodic task failed: " +
+ e.getMessage(), e);
+ }
+ if (log.isDebugEnabled() ||
task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
+ long endNs = time.nanoseconds();
+ long durationUs = NANOSECONDS.toMicros(endNs - startNs);
+ if (task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
+ log.info("Periodic task {} generated {} records in {}
microseconds.",
+ task.name(), result.records().size(), durationUs);
+ } else if (log.isDebugEnabled()) {
+ log.debug("Periodic task {} generated {} records in {}
microseconds.",
+ task.name(), result.records().size(), durationUs);
+ }
+ }
+ reschedule(task, result.response(), false);
+ if (result.isAtomic()) {
+ return ControllerResult.atomicOf(result.records(), null);
+ } else {
+ return ControllerResult.of(result.records(), null);
+ }
+ }
+ }
+
+ /**
+ * The slf4j logger.
+ */
+ private final Logger log;
+
+ /**
+ * The clock.
+ */
+ private final Time time;
+
+ /**
+ * Used to schedule events on the queue.
+ */
+ private final QueueAccessor queueAccessor;
+
+ /**
+ * True if the manager is active.
+ */
+ private boolean active;
+
+ /**
+ * The currently registered periodic tasks.
+ */
+ private final Map<String, PeriodicTask> tasks;
+
+ private PeriodicTaskControlManager(
+ LogContext logContext,
+ Time time,
+ QueueAccessor queueAccessor
+ ) {
+ this.log = logContext.logger(OffsetControlManager.class);
+ this.time = time;
+ this.queueAccessor = queueAccessor;
+ this.active = false;
+ this.tasks = new HashMap<>();
+ }
+
+ boolean active() {
+ return active;
+ }
+
+ void registerTask(PeriodicTask task) {
+ if (tasks.containsKey(task.name())) {
+ log.debug("Periodic task {} is already registered.", task.name());
+ return;
+ }
+ tasks.put(task.name(), task);
+ log.info("Registering periodic task {} to run every {} ms",
task.name(),
+ NANOSECONDS.toMillis(task.periodNs()));
+ reschedule(task, false, false);
+ }
+
+ void unregisterTask(String taskName) {
+ PeriodicTask task = tasks.remove(taskName);
+ if (task == null) {
+ log.debug("Periodic task {} is already unregistered.", taskName);
+ return;
+ }
+ log.info("Unregistering periodic task {}", taskName);
+ reschedule(task, false, false);
+ }
+
+ private long nextDelayTimeNs(PeriodicTask task, boolean immediate, boolean
error) {
+ if (immediate) {
+ // The current implementation of KafkaEventQueue always picks from
the deferred
+ // collection of operations before picking from the non-deferred
collection of
+ // operations. This can result in some unfairness if deferred
operation are
+ // scheduled for immediate execution. This delays them by a small
amount of time.
+ return MILLISECONDS.toNanos(10);
+ } else if (error) {
+ // If the periodic task hit an error, reschedule it in 5 minutes.
This is to avoid
+ // scenarios where we spin in a tight loop hitting errors, but
still give the task
+ // a chance to succeed.
+ return MINUTES.toNanos(5);
+ } else {
+ // Otherwise, use the designated period.
+ return task.periodNs();
+ }
+ }
+
+ private void reschedule(PeriodicTask task, boolean immediate, boolean
error) {
+ if (!active) {
+ log.trace("cancelling {} because we are inactive.", task.name());
+ queueAccessor.cancelDeferred(task.name());
+ } else if (tasks.containsKey(task.name())) {
+ long nextDelayTimeNs = nextDelayTimeNs(task, immediate, error);
+ long nextRunTimeNs = time.nanoseconds() + nextDelayTimeNs;
+ log.trace("rescheduling {} in {} ns (immediate = {}, error = {})",
+ task.name(), nextDelayTimeNs, immediate);
+ queueAccessor.scheduleDeferred(task.name(),
+ nextRunTimeNs,
+ new PeriodicTaskOperation(task));
+ } else {
+ log.trace("cancelling {} because it does not appear in the task
map.", task.name());
+ queueAccessor.cancelDeferred(task.name());
+ }
+ }
+
+ /**
+ * Called when the QuorumController becomes active.
+ */
+ void activate() {
+ if (active) {
+ throw new RuntimeException("Can't activate already active
PeriodicTaskControlManager.");
+ }
+ active = true;
+ for (PeriodicTask task : tasks.values()) {
+ reschedule(task, false, false);
+ }
+ String[] taskNames = tasks.keySet().toArray(new String[0]);
+ Arrays.sort(taskNames);
+ log.info("Activated periodic tasks: {}", String.join(", ", taskNames));
+ }
+
+ /**
+ * Called when the QuorumController becomes inactive.
+ */
+ void deactivate() {
+ if (!active) {
+ return;
+ }
+ active = false;
+ for (PeriodicTask task : tasks.values()) {
+ reschedule(task, false, false);
+ }
+ String[] taskNames = tasks.keySet().toArray(new String[0]);
+ Arrays.sort(taskNames);
+ log.info("Deactivated periodic tasks: {}", String.join(", ",
taskNames));
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskFlag.java
b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskFlag.java
new file mode 100644
index 00000000000..da26f5bb137
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/PeriodicTaskFlag.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+/**
+ * The flags to use for a periodic task.
+ */
+enum PeriodicTaskFlag {
+ /**
+ * Set if we want to log the name and execution time on each run.
+ */
+ VERBOSE;
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index e49dbc249e9..16ffdfea296 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
-import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
@@ -129,6 +128,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -149,6 +149,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static
org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
@@ -218,7 +219,7 @@ public final class QuorumController implements Controller {
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
private long delegationTokenExpiryTimeMs;
- private long delegationTokenExpiryCheckIntervalMs;
+ private long delegationTokenExpiryCheckIntervalMs =
TimeUnit.MINUTES.toMillis(5);
private long uncleanLeaderElectionCheckIntervalMs =
TimeUnit.MINUTES.toMillis(5);
private String interBrokerListenerName = "PLAINTEXT";
@@ -496,6 +497,32 @@ public final class QuorumController implements Controller {
}
}
+ class PeriodicTaskControlManagerQueueAccessor implements
PeriodicTaskControlManager.QueueAccessor {
+ @Override
+ public void scheduleDeferred(
+ String tag,
+ long deadlineNs,
+ Supplier<ControllerResult<Void>> op
+ ) {
+ EnumSet<ControllerOperationFlag> flags =
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME);
+ queue.scheduleDeferred(tag,
+ new EarliestDeadlineFunction(deadlineNs),
+ new ControllerWriteEvent<>(tag,
+ new ControllerWriteOperation<Void>() {
+ @Override
+ public ControllerResult<Void>
generateRecordsAndResult() {
+ return op.get();
+ }
+ },
+ flags));
+ }
+
+ @Override
+ public void cancelDeferred(String tag) {
+ queue.cancelDeferred(tag);
+ }
+ }
+
private OptionalInt latestController() {
return raftClient.leaderAndEpoch().leaderId();
}
@@ -811,13 +838,6 @@ public final class QuorumController implements Controller {
"reaches offset {}.", this, resultAndOffset.offset());
}
- // After every controller write event, schedule a leader rebalance
if there are any topic partition
- // with leader that is not the preferred leader.
- maybeScheduleNextBalancePartitionLeaders();
-
- // Schedule a new unclean leader election if there are partitions
that do not have a leader.
- maybeScheduleNextElectUncleanLeaders();
-
// Remember the latest offset and future if it is not already
completed
if (!future.isDone()) {
deferredEventQueue.add(resultAndOffset.offset(), this);
@@ -1140,10 +1160,7 @@ public final class QuorumController implements
Controller {
// periodic tasks here. At this point, all the records we
generated in
// generateRecordsAndResult have been applied, so we have the
correct value for
// metadata.version and other in-memory state.
- maybeScheduleNextExpiredDelegationTokenSweep();
- maybeScheduleNextBalancePartitionLeaders();
- maybeScheduleNextElectUncleanLeaders();
- maybeScheduleNextWriteNoOpRecord();
+ periodicControl.activate();
}
}
@@ -1159,255 +1176,12 @@ public final class QuorumController implements
Controller {
newWrongControllerException(OptionalInt.empty()));
offsetControl.deactivate();
clusterControl.deactivate();
- cancelMaybeFenceReplicas();
- cancelMaybeBalancePartitionLeaders();
- cancelMaybeNextElectUncleanLeaders();
- cancelNextWriteNoOpRecord();
+ periodicControl.deactivate();
} catch (Throwable e) {
fatalFaultHandler.handleFault("exception while renouncing
leadership", e);
}
}
- private <T> void scheduleDeferredWriteEvent(
- String name,
- long deadlineNs,
- ControllerWriteOperation<T> op,
- EnumSet<ControllerOperationFlag> flags
- ) {
- if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
- throw new RuntimeException("deferred events should not update the
queue time.");
- }
- ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op,
flags);
- queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs),
event);
- event.future.exceptionally(e -> {
- if (ControllerExceptions.isTimeoutException(e)) {
- log.error("Cancelling deferred write event {} because the
event queue " +
- "is now closed.", name);
- return null;
- } else if (e instanceof NotControllerException) {
- log.debug("Cancelling deferred write event {} because this
controller " +
- "is no longer active.", name);
- return null;
- }
- log.error("Unexpected exception while executing deferred write
event {}. " +
- "Rescheduling for a minute from now.", name, e);
- scheduleDeferredWriteEvent(name,
- deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op,
flags);
- return null;
- });
- }
-
- static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
-
- private void rescheduleMaybeFenceStaleBrokers() {
- long nextCheckTimeNs =
clusterControl.heartbeatManager().nextCheckTimeNs();
- if (nextCheckTimeNs == Long.MAX_VALUE) {
- cancelMaybeFenceReplicas();
- return;
- }
- scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs,
- () -> {
- ControllerResult<Void> result =
replicationControl.maybeFenceOneStaleBroker();
- // This following call ensures that if there are multiple
brokers that
- // are currently stale, then fencing for them is scheduled
immediately
- rescheduleMaybeFenceStaleBrokers();
- return result;
- },
- EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
- }
-
- private void cancelMaybeFenceReplicas() {
- queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
- }
-
- private static final String MAYBE_BALANCE_PARTITION_LEADERS =
"maybeBalancePartitionLeaders";
-
- private void maybeScheduleNextBalancePartitionLeaders() {
- if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
- leaderImbalanceCheckIntervalNs.isPresent() &&
- replicationControl.arePartitionLeadersImbalanced()) {
-
- log.debug(
- "Scheduling write event for {} because scheduled ({}),
checkIntervalNs ({}) and isImbalanced ({})",
- MAYBE_BALANCE_PARTITION_LEADERS,
- imbalancedScheduled,
- leaderImbalanceCheckIntervalNs,
- replicationControl.arePartitionLeadersImbalanced()
- );
-
- ControllerWriteEvent<Boolean> event = new
ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
- long startTimeNs = time.nanoseconds();
- ControllerResult<Boolean> result =
replicationControl.maybeBalancePartitionLeaders();
- long endTimeNs = time.nanoseconds();
- long durationNs = endTimeNs - startTimeNs;
- log.info("maybeBalancePartitionLeaders: generated {} records
in {} microseconds.{}",
- result.records().size(), NANOSECONDS.toMicros(durationNs),
- result.response() ? " Rescheduling immediately." : "");
-
- // reschedule the operation after the
leaderImbalanceCheckIntervalNs interval.
- // Mark the imbalance event as completed and reschedule if
necessary
- if (result.response()) {
- imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY;
- } else {
- imbalancedScheduled = ImbalanceSchedule.DEFERRED;
- }
-
- // Note that rescheduling this event here is not required
because MAYBE_BALANCE_PARTITION_LEADERS
- // is a ControllerWriteEvent. ControllerWriteEvent always
calls this method after the records
- // generated by a ControllerWriteEvent have been applied.
-
- return result;
- }, EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
-
- long delayNs = time.nanoseconds();
- if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
- delayNs += leaderImbalanceCheckIntervalNs.getAsLong();
- } else {
- // The current implementation of KafkaEventQueue always picks
from the deferred collection of operations
- // before picking from the non-deferred collection of
operations. This can result in some unfairness if
- // deferred operation are scheduled for immediate execution.
This delays them by a small amount of time.
- delayNs += NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
- }
-
- queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new
EarliestDeadlineFunction(delayNs), event);
-
- imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
- }
- }
-
- private void cancelMaybeBalancePartitionLeaders() {
- imbalancedScheduled = ImbalanceSchedule.DEFERRED;
- queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
- }
-
- private static final String MAYBE_ELECT_UNCLEAN_LEADERS =
"maybeElectUncleanLeaders";
-
- private void maybeScheduleNextElectUncleanLeaders() {
- if (uncleanScheduled != ImbalanceSchedule.SCHEDULED &&
- replicationControl.areSomePartitionsLeaderless()) {
- log.debug(
- "Scheduling write event for {} because scheduled ({}), and
areSomePartitionsLeaderless ({})",
- MAYBE_ELECT_UNCLEAN_LEADERS,
- uncleanScheduled,
- replicationControl.areSomePartitionsLeaderless()
- );
-
- ControllerWriteEvent<Boolean> event = new
ControllerWriteEvent<>(MAYBE_ELECT_UNCLEAN_LEADERS, () -> {
- long startTimeNs = time.nanoseconds();
- ControllerResult<Boolean> result =
replicationControl.maybeElectUncleanLeaders();
- long endTimeNs = time.nanoseconds();
- long durationNs = endTimeNs - startTimeNs;
- log.info("maybeElectUncleanLeaders: generated {} records in {}
microseconds.{}",
- result.records().size(),
NANOSECONDS.toMicros(durationNs),
- result.response() ? " Rescheduling immediately." : "");
- if (result.response()) {
- uncleanScheduled = ImbalanceSchedule.IMMEDIATELY;
- } else {
- uncleanScheduled = ImbalanceSchedule.DEFERRED;
- }
- return result;
- }, EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
-
- long delayNs = time.nanoseconds();
- if (uncleanScheduled == ImbalanceSchedule.DEFERRED) {
- delayNs += uncleanLeaderElectionCheckIntervalNs;
- } else {
- // The current implementation of KafkaEventQueue always picks
from the deferred collection of operations
- // before picking from the non-deferred collection of
operations. This can result in some unfairness if
- // deferred operation are scheduled for immediate execution.
This delays them by a small amount of time.
- delayNs += NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
- }
- queue.scheduleDeferred(MAYBE_ELECT_UNCLEAN_LEADERS, new
EarliestDeadlineFunction(delayNs), event);
- uncleanScheduled = ImbalanceSchedule.SCHEDULED;
- }
- }
-
-
- private void cancelMaybeNextElectUncleanLeaders() {
- uncleanScheduled = ImbalanceSchedule.DEFERRED;
- queue.cancelDeferred(MAYBE_ELECT_UNCLEAN_LEADERS);
- }
-
- private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
-
- private void maybeScheduleNextWriteNoOpRecord() {
- if (!noOpRecordScheduled &&
- maxIdleIntervalNs.isPresent() &&
- featureControl.metadataVersion().isNoOpRecordSupported()) {
-
- log.debug(
- "Scheduling write event for {} because maxIdleIntervalNs ({})
and metadataVersion ({})",
- WRITE_NO_OP_RECORD,
- maxIdleIntervalNs.getAsLong(),
- featureControl.metadataVersion()
- );
-
- ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
- WRITE_NO_OP_RECORD,
- () -> {
- noOpRecordScheduled = false;
- maybeScheduleNextWriteNoOpRecord();
-
- return ControllerResult.of(
- Collections.singletonList(new ApiMessageAndVersion(new
NoOpRecord(), (short) 0)),
- null
- );
- },
- EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME)
- );
-
- long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
- queue.scheduleDeferred(WRITE_NO_OP_RECORD, new
EarliestDeadlineFunction(delayNs), event);
- noOpRecordScheduled = true;
- }
- }
-
- private void cancelNextWriteNoOpRecord() {
- noOpRecordScheduled = false;
- queue.cancelDeferred(WRITE_NO_OP_RECORD);
- }
-
- private static final String SWEEP_EXPIRED_DELEGATION_TOKENS =
"sweepExpiredDelegationTokens";
-
- private void maybeScheduleNextExpiredDelegationTokenSweep() {
- if (featureControl.metadataVersion().isDelegationTokenSupported() &&
- delegationTokenControlManager.isEnabled()) {
-
- log.debug(
- "Scheduling write event for {} because DelegationTokens are
enabled.",
- SWEEP_EXPIRED_DELEGATION_TOKENS
- );
-
- ControllerWriteEvent<Void> event = new ControllerWriteEvent<>(
- SWEEP_EXPIRED_DELEGATION_TOKENS,
- () -> {
- maybeScheduleNextExpiredDelegationTokenSweep();
-
- return ControllerResult.of(
-
delegationTokenControlManager.sweepExpiredDelegationTokens(), null);
- },
- EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME)
- );
-
- long delayNs = time.nanoseconds() +
- NANOSECONDS.convert(delegationTokenExpiryCheckIntervalMs,
TimeUnit.MILLISECONDS);
- queue.scheduleDeferred(SWEEP_EXPIRED_DELEGATION_TOKENS,
- new EarliestDeadlineFunction(delayNs), event);
- }
- }
-
- private void handleFeatureControlChange() {
- // The feature control maybe have changed. On the active controller
cancel or schedule noop
- // record writes accordingly.
- if (isActiveController()) {
- if (featureControl.metadataVersion().isNoOpRecordSupported()) {
- maybeScheduleNextWriteNoOpRecord();
- } else {
- cancelNextWriteNoOpRecord();
- }
- }
- }
-
/**
* Apply the metadata record to its corresponding in-memory state(s)
*
@@ -1458,7 +1232,6 @@ public final class QuorumController implements Controller
{
break;
case FEATURE_LEVEL_RECORD:
featureControl.replay((FeatureLevelRecord) message);
- handleFeatureControlChange();
break;
case CLIENT_QUOTA_RECORD:
clientQuotaControlManager.replay((ClientQuotaRecord) message);
@@ -1589,6 +1362,16 @@ public final class QuorumController implements
Controller {
*/
private final QuorumClusterFeatureSupportDescriber clusterSupportDescriber;
+ /**
+ * Handles changes to the event queue for PeriodicTaskControlManager.
+ */
+ private final PeriodicTaskControlManagerQueueAccessor queueAccessor;
+
+ /**
+ * Controls periodic tasks.
+ */
+ private final PeriodicTaskControlManager periodicControl;
+
/**
* An object which stores the controller's view of the cluster.
* This must be accessed only by the event queue thread.
@@ -1621,7 +1404,6 @@ public final class QuorumController implements Controller
{
/**
* Manages DelegationTokens, if there are any.
*/
- private final long delegationTokenExpiryCheckIntervalMs;
private final DelegationTokenControlManager delegationTokenControlManager;
/**
@@ -1661,11 +1443,6 @@ public final class QuorumController implements
Controller {
*/
private final OptionalLong leaderImbalanceCheckIntervalNs;
- /**
- * How log to delay between appending NoOpRecord to the log.
- */
- private final OptionalLong maxIdleIntervalNs;
-
private enum ImbalanceSchedule {
// The leader balancing operation has been scheduled
SCHEDULED,
@@ -1685,11 +1462,6 @@ public final class QuorumController implements
Controller {
*/
private ImbalanceSchedule uncleanScheduled = ImbalanceSchedule.DEFERRED;
- /**
- * Tracks if the write of the NoOpRecord has been scheduled.
- */
- private boolean noOpRecordScheduled = false;
-
/**
* The bootstrap metadata to use for initialization if needed.
*/
@@ -1697,11 +1469,6 @@ public final class QuorumController implements
Controller {
private final boolean eligibleLeaderReplicasEnabled;
- /**
- * The number of nanoseconds between unclean leader election checks.
- */
- private final long uncleanLeaderElectionCheckIntervalNs;
-
/**
* The maximum number of records per batch to allow.
*/
@@ -1777,6 +1544,12 @@ public final class QuorumController implements
Controller {
setSnapshotRegistry(snapshotRegistry).
build();
this.clusterSupportDescriber = new
QuorumClusterFeatureSupportDescriber();
+ this.queueAccessor = new PeriodicTaskControlManagerQueueAccessor();
+ this.periodicControl = new PeriodicTaskControlManager.Builder().
+ setLogContext(logContext).
+ setTime(time).
+ setQueueAccessor(queueAccessor).
+ build();
this.featureControl = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(quorumFeatures).
@@ -1806,7 +1579,6 @@ public final class QuorumController implements Controller
{
setClusterControlManager(clusterControl).
build();
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
- this.maxIdleIntervalNs = maxIdleIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
@@ -1823,7 +1595,6 @@ public final class QuorumController implements Controller
{
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
- this.delegationTokenExpiryCheckIntervalMs =
delegationTokenExpiryCheckIntervalMs;
this.delegationTokenControlManager = new
DelegationTokenControlManager.Builder().
setLogContext(logContext).
setTokenCache(tokenCache).
@@ -1845,8 +1616,15 @@ public final class QuorumController implements
Controller {
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
- this.uncleanLeaderElectionCheckIntervalNs =
-
TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs);
+ if (maxIdleIntervalNs.isPresent()) {
+ registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
+ }
+ registerMaybeFenceStaleBroker(sessionTimeoutNs);
+ if (leaderImbalanceCheckIntervalNs.isPresent()) {
+ registerElectPreferred(leaderImbalanceCheckIntervalNs.getAsLong());
+ }
+
registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(uncleanLeaderElectionCheckIntervalMs));
+
registerExpireDelegationTokens(MILLISECONDS.toNanos(delegationTokenExpiryCheckIntervalMs));
log.info("Creating new QuorumController with clusterId {}.{}",
clusterId,
@@ -1855,6 +1633,100 @@ public final class QuorumController implements
Controller {
this.raftClient.register(metaLogListener);
}
+ /**
+ * Register the writeNoOpRecord task.
+ *
+ * This task periodically writes a NoOpRecord to the metadata log, if the
MetadataVersion
+ * supports it.
+ *
+ * @param maxIdleIntervalNs The period at which to write the
NoOpRecord.
+ */
+ private void registerWriteNoOpRecord(long maxIdleIntervalNs) {
+ periodicControl.registerTask(new PeriodicTask("writeNoOpRecord",
+ () -> {
+ ArrayList<ApiMessageAndVersion> records = new ArrayList<>(1);
+ if (featureControl.metadataVersion().isNoOpRecordSupported()) {
+ records.add(new ApiMessageAndVersion(new NoOpRecord(),
(short) 0));
+ }
+ return ControllerResult.of(records, false);
+ },
+ maxIdleIntervalNs,
+ EnumSet.noneOf(PeriodicTaskFlag.class)));
+ }
+
+ /**
+ * Calculate what the period should be for the maybeFenceStaleBroker task.
+ *
+ * We sample 8 times per broker timeout period, so we'll generally fence a
broker in no more
+ * than 112.5% of the given broker session timeout.
+ *
+ * @param sessionTimeoutNs The configured broker session timeout
period in nanoseconds.
+ *
+ * @return The period for the maybeFenceStaleBroker
task in nanoseconds.
+ */
+ static long maybeFenceStaleBrokerPeriodNs(long sessionTimeoutNs) {
+ return Math.max(TimeUnit.MILLISECONDS.toNanos(1), sessionTimeoutNs /
8);
+ }
+
+ /**
+ * Register the maybeFenceStaleBroker task.
+ *
+ * This task periodically checks to see if there is a stale broker that
needs to
+ * be fenced. It will only ever remove one stale broker at a time.
+ *
+ * @param sessionTimeoutNs The broker session timeout in nanoseconds.
+ */
+ private void registerMaybeFenceStaleBroker(long sessionTimeoutNs) {
+ periodicControl.registerTask(new PeriodicTask("maybeFenceStaleBroker",
+ replicationControl::maybeFenceOneStaleBroker,
+ maybeFenceStaleBrokerPeriodNs(sessionTimeoutNs),
+ EnumSet.noneOf(PeriodicTaskFlag.class)));
+ }
+
+ /**
+ * Register the electPreferred task.
+ *
+ * This task periodically checks to see if partitions with leaders other
+ * than the preferred leader can be switched to have the preferred leader.
+ *
+ * @param checkIntervalNs The check interval in nanoseconds.
+ */
+ private void registerElectPreferred(long checkIntervalNs) {
+ periodicControl.registerTask(new PeriodicTask("electPreferred",
+ replicationControl::maybeBalancePartitionLeaders,
+ checkIntervalNs,
+ EnumSet.of(PeriodicTaskFlag.VERBOSE)));
+ }
+
+ /**
+ * Register the electUnclean task.
+ *
+ * This task periodically checks to see if partitions with no leaders can
be
+ * have a new leader elected uncleanly.
+ *
+ * @param checkIntervalNs The check interval in nanoseconds.
+ */
+ private void registerElectUnclean(long checkIntervalNs) {
+ periodicControl.registerTask(new PeriodicTask("electUnclean",
+ replicationControl::maybeElectUncleanLeaders,
+ checkIntervalNs,
+ EnumSet.of(PeriodicTaskFlag.VERBOSE)));
+ }
+
+ /**
+ * Register the delegation token expiration task.
+ *
+ * This task periodically expires delegation tokens.
+ *
+ * @param checkIntervalNs
+ */
+ private void registerExpireDelegationTokens(long checkIntervalNs) {
+ periodicControl.registerTask(new PeriodicTask("expireDelegationTokens",
+ delegationTokenControlManager::sweepExpiredDelegationTokens,
+ checkIntervalNs,
+ EnumSet.of(PeriodicTaskFlag.VERBOSE)));
+ }
+
@Override
public CompletableFuture<AlterPartitionResponseData> alterPartition(
ControllerRequestContext context,
@@ -2070,6 +1942,17 @@ public final class QuorumController implements
Controller {
ControllerRequestContext context,
BrokerHeartbeatRequestData request
) {
+ // We start by updating the broker heartbeat in a lockless data
structure.
+ // We do this first so that if the main controller thread is
backlogged, the
+ // last contact time update still gets through.
+ if (!clusterControl.trackBrokerHeartbeat(request.brokerId(),
request.brokerEpoch())) {
+ // Normally, ControllerWriteOperation would automatically check if
the controller is
+ // active. But since we're doing this outside of the main
controller thread, we have to
+ // do our own check here, and handle the case where we are
inactive.
+ throw
ControllerExceptions.newWrongControllerException(latestController());
+ }
+ // The next part takes place in the main controller thread and may
involve generating
+ // metadata records.
return appendWriteEvent("processBrokerHeartbeat", context.deadlineNs(),
new ControllerWriteOperation<BrokerHeartbeatReply>() {
private final int brokerId = request.brokerId();
@@ -2090,7 +1973,6 @@ public final class QuorumController implements Controller
{
ControllerResult<BrokerHeartbeatReply> result =
replicationControl.
processBrokerHeartbeat(request,
offsetForRegisterBrokerRecord.getAsLong());
inControlledShutdown =
result.response().inControlledShutdown();
- rescheduleMaybeFenceStaleBrokers();
return result;
}
@@ -2123,7 +2005,6 @@ public final class QuorumController implements Controller
{
ControllerResult<BrokerRegistrationReply> result =
clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures,
Long.MAX_VALUE));
- rescheduleMaybeFenceStaleBrokers();
return result;
},
EnumSet.noneOf(ControllerOperationFlag.class));
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 25897f74903..da53ec4594b 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -1597,18 +1597,34 @@ public class ReplicationControlManager {
return ControllerResult.of(records, null);
}
- ControllerResult<Void> maybeFenceOneStaleBroker() {
- List<ApiMessageAndVersion> records = new ArrayList<>();
+ ControllerResult<Boolean> maybeFenceOneStaleBroker() {
BrokerHeartbeatManager heartbeatManager =
clusterControl.heartbeatManager();
- heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
- // Even though multiple brokers can go stale at a time, we will
process
- // fencing one at a time so that the effect of fencing each broker
is visible
- // to the system prior to processing the next one
- log.info("Fencing broker {} because its session has timed out.",
brokerId);
- handleBrokerFenced(brokerId, records);
- heartbeatManager.fence(brokerId);
- });
- return ControllerResult.of(records, null);
+ Optional<BrokerIdAndEpoch> idAndEpoch =
heartbeatManager.tracker().maybeRemoveExpired();
+ if (!idAndEpoch.isPresent()) {
+ log.debug("No stale brokers found.");
+ return ControllerResult.of(Collections.emptyList(), false);
+ }
+ int id = idAndEpoch.get().id();
+ long epoch = idAndEpoch.get().epoch();
+ if (!clusterControl.brokerRegistrations().containsKey(id)) {
+ log.info("Removing heartbeat tracker entry for unknown broker {}
at epoch {}.",
+ id, epoch);
+ heartbeatManager.remove(id);
+ return ControllerResult.of(Collections.emptyList(), true);
+ } else if (clusterControl.brokerRegistrations().get(id).epoch() !=
epoch) {
+ log.info("Removing heartbeat tracker entry for broker {} at
previous epoch {}. " +
+ "Current epoch is {}", id, epoch,
+ clusterControl.brokerRegistrations().get(id).epoch());
+ return ControllerResult.of(Collections.emptyList(), true);
+ }
+ // Even though multiple brokers can go stale at a time, we will process
+ // fencing one at a time so that the effect of fencing each broker is
visible
+ // to the system prior to processing the next one.
+ log.info("Fencing broker {} at epoch {} because its session has timed
out.", id, epoch);
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ handleBrokerFenced(id, records);
+ heartbeatManager.fence(id);
+ return ControllerResult.of(records, true);
}
boolean arePartitionLeadersImbalanced() {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
index 3a49e412ea9..fdd428495c1 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
@@ -86,6 +86,9 @@ public final class EventHandlerExceptionInfo {
return new EventHandlerExceptionInfo(false, false, internal,
new PolicyViolationException("Unable to perform excessively
large batch " +
"operation."));
+ } else if (internal instanceof PeriodicControlTaskException) {
+ // This exception is a periodic task which failed.
+ return new EventHandlerExceptionInfo(true, false, internal);
} else if (internal instanceof InterruptedException) {
// The controller event queue has been interrupted. This normally
only happens during
// a JUnit test that has hung. The test framework sometimes sends
an InterruptException
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/errors/PeriodicControlTaskException.java
b/metadata/src/main/java/org/apache/kafka/controller/errors/PeriodicControlTaskException.java
new file mode 100644
index 00000000000..011de5a7053
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/controller/errors/PeriodicControlTaskException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+/**
+ * An exception indicating that a periodic task managed by
PeriodicTaskControlManager failed.
+ */
+public class PeriodicControlTaskException extends RuntimeException {
+ public PeriodicControlTaskException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index 34f8a0f4b7d..e6bb0940916 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -21,8 +21,6 @@ import
org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatState;
-import
org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator;
-import
org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList;
import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator;
import org.apache.kafka.metadata.placement.UsableBroker;
@@ -43,7 +41,6 @@ import static
org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW;
import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -60,109 +57,31 @@ public class BrokerHeartbeatManagerTest {
public void testHasValidSession() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
MockTime time = (MockTime) manager.time();
- assertFalse(manager.hasValidSession(0));
+ assertFalse(manager.hasValidSession(0, 100L));
for (int brokerId = 0; brokerId < 3; brokerId++) {
manager.register(brokerId, true);
}
+ manager.tracker().updateContactTime(new BrokerIdAndEpoch(0, 100L));
manager.touch(0, false, 0);
time.sleep(5);
+ manager.tracker().updateContactTime(new BrokerIdAndEpoch(1, 100L));
manager.touch(1, false, 0);
+ manager.tracker().updateContactTime(new BrokerIdAndEpoch(2, 200L));
manager.touch(2, false, 0);
- assertTrue(manager.hasValidSession(0));
- assertTrue(manager.hasValidSession(1));
- assertTrue(manager.hasValidSession(2));
- assertFalse(manager.hasValidSession(3));
- time.sleep(6);
- assertFalse(manager.hasValidSession(0));
- assertTrue(manager.hasValidSession(1));
- assertTrue(manager.hasValidSession(2));
- assertFalse(manager.hasValidSession(3));
- manager.remove(2);
- assertFalse(manager.hasValidSession(2));
- manager.remove(1);
- assertFalse(manager.hasValidSession(1));
- }
-
- @Test
- public void testFindOneStaleBroker() {
- BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
- MockTime time = (MockTime) manager.time();
- assertFalse(manager.hasValidSession(0));
- for (int brokerId = 0; brokerId < 3; brokerId++) {
- manager.register(brokerId, true);
- }
- manager.touch(0, false, 0);
- time.sleep(5);
- manager.touch(1, false, 0);
- time.sleep(1);
- manager.touch(2, false, 0);
-
- Iterator<BrokerHeartbeatState> iter = manager.unfenced().iterator();
- assertEquals(0, iter.next().id());
- assertEquals(1, iter.next().id());
- assertEquals(2, iter.next().id());
- assertFalse(iter.hasNext());
- assertEquals(Optional.empty(), manager.findOneStaleBroker());
-
- time.sleep(5);
- assertEquals(Optional.of(0), manager.findOneStaleBroker());
- manager.fence(0);
- assertEquals(Optional.empty(), manager.findOneStaleBroker());
- iter = manager.unfenced().iterator();
- assertEquals(1, iter.next().id());
- assertEquals(2, iter.next().id());
- assertFalse(iter.hasNext());
-
- time.sleep(20);
- assertEquals(Optional.of(1), manager.findOneStaleBroker());
- manager.fence(1);
- assertEquals(Optional.of(2), manager.findOneStaleBroker());
- manager.fence(2);
-
- assertEquals(Optional.empty(), manager.findOneStaleBroker());
- iter = manager.unfenced().iterator();
- assertFalse(iter.hasNext());
- }
-
- @Test
- public void testNextCheckTimeNs() {
- BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
- MockTime time = (MockTime) manager.time();
- assertEquals(Long.MAX_VALUE, manager.nextCheckTimeNs());
- for (int brokerId = 0; brokerId < 4; brokerId++) {
- manager.register(brokerId, true);
- }
- manager.touch(0, false, 0);
- time.sleep(2);
- manager.touch(1, false, 0);
- time.sleep(1);
- manager.touch(2, false, 0);
- time.sleep(1);
- manager.touch(3, false, 0);
- assertEquals(Optional.empty(), manager.findOneStaleBroker());
- assertEquals(10_000_000, manager.nextCheckTimeNs());
- time.sleep(7);
- assertEquals(10_000_000, manager.nextCheckTimeNs());
- assertEquals(Optional.of(0), manager.findOneStaleBroker());
- manager.fence(0);
- assertEquals(12_000_000, manager.nextCheckTimeNs());
-
- time.sleep(3);
- assertEquals(Optional.of(1), manager.findOneStaleBroker());
- manager.fence(1);
- assertEquals(Optional.of(2), manager.findOneStaleBroker());
- manager.fence(2);
-
- assertEquals(14_000_000, manager.nextCheckTimeNs());
+ assertTrue(manager.hasValidSession(0, 100L));
+ assertFalse(manager.hasValidSession(0, 200L));
+ assertTrue(manager.hasValidSession(1, 100L));
+ assertTrue(manager.hasValidSession(2, 200L));
+ assertFalse(manager.hasValidSession(3, 300L));
}
@Test
public void testMetadataOffsetComparator() {
TreeSet<BrokerHeartbeatState> set =
new
TreeSet<>(BrokerHeartbeatManager.MetadataOffsetComparator.INSTANCE);
- BrokerHeartbeatState broker1 = new BrokerHeartbeatState(1);
- BrokerHeartbeatState broker2 = new BrokerHeartbeatState(2);
- BrokerHeartbeatState broker3 = new BrokerHeartbeatState(3);
+ BrokerHeartbeatState broker1 = new BrokerHeartbeatState(1, false, -1L,
-1L);
+ BrokerHeartbeatState broker2 = new BrokerHeartbeatState(2, false, -1L,
-1L);
+ BrokerHeartbeatState broker3 = new BrokerHeartbeatState(3, false, -1L,
-1L);
set.add(broker1);
set.add(broker2);
set.add(broker3);
@@ -175,9 +94,9 @@ public class BrokerHeartbeatManagerTest {
assertTrue(set.remove(broker2));
assertTrue(set.remove(broker3));
assertTrue(set.isEmpty());
- broker1.metadataOffset = 800;
- broker2.metadataOffset = 400;
- broker3.metadataOffset = 100;
+ broker1.setMetadataOffset(800);
+ broker2.setMetadataOffset(400);
+ broker3.setMetadataOffset(100);
set.add(broker1);
set.add(broker2);
set.add(broker3);
@@ -254,39 +173,6 @@ public class BrokerHeartbeatManagerTest {
assertEquals(OptionalLong.of(101),
manager.controlledShutdownOffset(3));
}
- @Test
- public void testBrokerHeartbeatStateList() {
- BrokerHeartbeatStateList list = new BrokerHeartbeatStateList();
- assertNull(list.first());
- BrokerHeartbeatStateIterator iterator = list.iterator();
- assertFalse(iterator.hasNext());
- BrokerHeartbeatState broker0 = new BrokerHeartbeatState(0);
- broker0.lastContactNs = 200;
- BrokerHeartbeatState broker1 = new BrokerHeartbeatState(1);
- broker1.lastContactNs = 100;
- BrokerHeartbeatState broker2 = new BrokerHeartbeatState(2);
- broker2.lastContactNs = 50;
- BrokerHeartbeatState broker3 = new BrokerHeartbeatState(3);
- broker3.lastContactNs = 150;
- list.add(broker0);
- list.add(broker1);
- list.add(broker2);
- list.add(broker3);
- assertEquals(broker2, list.first());
- iterator = list.iterator();
- assertEquals(broker2, iterator.next());
- assertEquals(broker1, iterator.next());
- assertEquals(broker3, iterator.next());
- assertEquals(broker0, iterator.next());
- assertFalse(iterator.hasNext());
- list.remove(broker1);
- iterator = list.iterator();
- assertEquals(broker2, iterator.next());
- assertEquals(broker3, iterator.next());
- assertEquals(broker0, iterator.next());
- assertFalse(iterator.hasNext());
- }
-
@Test
public void testCalculateNextBrokerState() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatTrackerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatTrackerTest.java
new file mode 100644
index 00000000000..d66ed8d6547
--- /dev/null
+++
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatTrackerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(40)
+public class BrokerHeartbeatTrackerTest {
+ private static BrokerHeartbeatTracker newBrokerHeartbeatTracker() {
+ MockTime time = new MockTime(0, 1_000_000, 0);
+ return new BrokerHeartbeatTracker(time, 10_000_000);
+ }
+
+ private static final Set<BrokerIdAndEpoch> TEST_BROKERS;
+
+ static {
+ Set<BrokerIdAndEpoch> brokers = new HashSet<>();
+ Arrays.asList(
+ new BrokerIdAndEpoch(0, 0L),
+ new BrokerIdAndEpoch(1, 100L),
+ new BrokerIdAndEpoch(2, 200L)
+ ).forEach(brokers::add);
+ TEST_BROKERS = Collections.unmodifiableSet(brokers);
+ }
+
+ @Test
+ public void testUpdateContactTime() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ assertEquals(OptionalLong.empty(), tracker.contactTime(new
BrokerIdAndEpoch(1, 100L)));
+ tracker.updateContactTime(new BrokerIdAndEpoch(1, 100L));
+ assertEquals(OptionalLong.of(0L), tracker.contactTime(new
BrokerIdAndEpoch(1, 100L)));
+ }
+
+ @Test
+ public void testMaybeRemoveExpiredWithEmptyTracker() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ assertEquals(Optional.empty(), tracker.maybeRemoveExpired());
+ }
+
+ @Test
+ public void testMaybeRemoveExpiredWithAllUpToDate() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ TEST_BROKERS.forEach(tracker::updateContactTime);
+ assertEquals(Optional.empty(), tracker.maybeRemoveExpired());
+ }
+
+ @Test
+ public void testMaybeRemoveExpiredWithAllExpired() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ TEST_BROKERS.forEach(tracker::updateContactTime);
+ tracker.time().sleep(11);
+ Set<BrokerIdAndEpoch> expired = new HashSet<>();
+ Optional<BrokerIdAndEpoch> idAndEpoch;
+ do {
+ idAndEpoch = tracker.maybeRemoveExpired();
+ idAndEpoch.ifPresent(expired::add);
+ } while (idAndEpoch.isPresent());
+ assertEquals(TEST_BROKERS, expired);
+ }
+
+ @Test
+ public void testHasValidSessionIsTrueForKnownBroker() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ TEST_BROKERS.forEach(tracker::updateContactTime);
+ assertTrue(tracker.hasValidSession(new BrokerIdAndEpoch(2, 200L)));
+ }
+
+ @Test
+ public void testHasValidSessionIsFalseForUnknownBroker() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ TEST_BROKERS.forEach(tracker::updateContactTime);
+ assertFalse(tracker.hasValidSession(new BrokerIdAndEpoch(3, 300L)));
+ }
+
+ @Test
+ public void testHasValidSessionIsFalseForUnknownBrokerEpoch() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ TEST_BROKERS.forEach(tracker::updateContactTime);
+ assertFalse(tracker.hasValidSession(new BrokerIdAndEpoch(2, 100L)));
+ }
+
+ @Test
+ public void testIsExpiredIsFalseForTheCurrentTime() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ assertFalse(tracker.isExpired(456, 456));
+ }
+
+ @Test
+ public void testIsExpiredIsFalseForTenNanosecondsAfter() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ assertFalse(tracker.isExpired(456, 466));
+ }
+
+ @Test
+ public void testIsExpiredIsTrueAfterExpirationTime() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ assertTrue(tracker.isExpired(456, 456 + 10_000_001));
+ }
+
+ @Test
+ public void testIsExpiredIsFalseForPreviousTime() {
+ BrokerHeartbeatTracker tracker = newBrokerHeartbeatTracker();
+ assertFalse(tracker.isExpired(456, 0));
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
new file mode 100644
index 00000000000..1defdec760c
--- /dev/null
+++
b/metadata/src/test/java/org/apache/kafka/controller/PeriodicTaskControlManagerTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(10)
+public class PeriodicTaskControlManagerTest {
+ static class FakePeriodicTask {
+ final AtomicInteger numCalls;
+ final AtomicBoolean continuation = new AtomicBoolean(false);
+ final PeriodicTask task;
+ final AtomicBoolean shouldFail = new AtomicBoolean(false);
+
+ FakePeriodicTask(
+ String name,
+ long periodNs
+ ) {
+ this.numCalls = new AtomicInteger();
+ this.task = new PeriodicTask(name,
+ () -> {
+ numCalls.addAndGet(1);
+ if (shouldFail.getAndSet(false)) {
+ throw new NullPointerException("uh oh");
+ }
+ return ControllerResult.of(Collections.emptyList(),
+ continuation.getAndSet(false));
+ },
+ periodNs,
+ EnumSet.noneOf(PeriodicTaskFlag.class));
+ }
+ }
+
+ static class TrackedTask {
+ final String tag;
+ final long deadlineNs;
+ final Supplier<ControllerResult<Void>> op;
+
+ TrackedTask(
+ String tag,
+ long deadlineNs,
+ Supplier<ControllerResult<Void>> op
+ ) {
+ this.tag = tag;
+ this.deadlineNs = deadlineNs;
+ this.op = op;
+ }
+ }
+
+ static class PeriodicTaskControlManagerTestEnv implements
PeriodicTaskControlManager.QueueAccessor {
+ final MockTime time;
+
+ final PeriodicTaskControlManager manager;
+
+ final TreeMap<Long, List<TrackedTask>> tasks;
+
+ int numCalls = 10_000;
+
+ PeriodicTaskControlManagerTestEnv() {
+ this.time = new MockTime(0, 0, 0);
+ this.manager = new PeriodicTaskControlManager.Builder().
+ setTime(time).
+ setQueueAccessor(this).
+ build();
+ this.tasks = new TreeMap<>();
+ }
+
+ @Override
+ public void scheduleDeferred(
+ String tag,
+ long deadlineNs,
+ Supplier<ControllerResult<Void>> op
+ ) {
+ if (numCalls <= 0) {
+ throw new RuntimeException("too many deferred calls.");
+ }
+ numCalls--;
+ cancelDeferred(tag);
+ TrackedTask task = new TrackedTask(tag, deadlineNs, op);
+ tasks.computeIfAbsent(deadlineNs, __ -> new
ArrayList<>()).add(task);
+ }
+
+ @Override
+ public void cancelDeferred(String tag) {
+ Iterator<Map.Entry<Long, List<TrackedTask>>> iter =
tasks.entrySet().iterator();
+ boolean foundTask = false;
+ while (iter.hasNext() && (!foundTask)) {
+ Map.Entry<Long, List<TrackedTask>> entry = iter.next();
+ Iterator<TrackedTask> taskIter = entry.getValue().iterator();
+ while (taskIter.hasNext()) {
+ TrackedTask task = taskIter.next();
+ if (task.tag.equals(tag)) {
+ taskIter.remove();
+ foundTask = true;
+ break;
+ }
+ }
+ if (entry.getValue().isEmpty()) {
+ iter.remove();
+ }
+ }
+ }
+
+ int numDeferred() {
+ int count = 0;
+ for (List<TrackedTask> taskList : tasks.values()) {
+ count += taskList.size();
+ }
+ return count;
+ }
+
+ void advanceTime(long ms) {
+ time.sleep(ms);
+ while (true) {
+ Iterator<Map.Entry<Long, List<TrackedTask>>> iter =
tasks.entrySet().iterator();
+ if (!iter.hasNext()) {
+ return;
+ }
+ Map.Entry<Long, List<TrackedTask>> entry = iter.next();
+ if (time.nanoseconds() < entry.getKey()) {
+ return;
+ }
+ if (!entry.getValue().isEmpty()) {
+ Iterator<TrackedTask> taskIter =
entry.getValue().iterator();
+ TrackedTask task = taskIter.next();
+ taskIter.remove();
+ try {
+ task.op.get();
+ } catch (Exception e) {
+ // discard exception
+ }
+ continue;
+ }
+ iter.remove();
+ }
+ }
+ }
+
+ @Test
+ public void testActivate() {
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ assertFalse(env.manager.active());
+ env.manager.activate();
+ assertTrue(env.manager.active());
+ assertEquals(0, env.numDeferred());
+ }
+
+ @Test
+ public void testDeactivate() {
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ assertFalse(env.manager.active());
+ env.manager.activate();
+ env.manager.deactivate();
+ assertFalse(env.manager.active());
+ assertEquals(0, env.numDeferred());
+ }
+
+ @Test
+ public void testRegisterTaskWhenDeactivated() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.registerTask(foo.task);
+ assertEquals(0, env.numDeferred());
+ }
+
+ @Test
+ public void testRegisterTaskWhenActivated() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.activate();
+ env.manager.registerTask(foo.task);
+ assertEquals(1, env.numDeferred());
+ }
+
+ @Test
+ public void testRegisterTaskWhenActivatedThenDeactivate() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.activate();
+ env.manager.registerTask(foo.task);
+ env.manager.deactivate();
+ assertEquals(0, env.numDeferred());
+ }
+
+ @Test
+ public void testRegisterTaskAndAdvanceTime() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ FakePeriodicTask bar = new FakePeriodicTask("bar",
MILLISECONDS.toNanos(50));
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.activate();
+ env.manager.registerTask(foo.task);
+ env.manager.registerTask(bar.task);
+ assertEquals(2, env.numDeferred());
+ env.advanceTime(50);
+ assertEquals(0, foo.numCalls.get());
+ assertEquals(1, bar.numCalls.get());
+ assertEquals(2, env.numDeferred());
+ env.advanceTime(50);
+ assertEquals(1, foo.numCalls.get());
+ assertEquals(2, bar.numCalls.get());
+ assertEquals(2, env.numDeferred());
+ env.manager.deactivate();
+ }
+
+ @Test
+ public void testContinuation() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ FakePeriodicTask bar = new FakePeriodicTask("bar",
MILLISECONDS.toNanos(50));
+ bar.continuation.set(true);
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.activate();
+ env.manager.registerTask(foo.task);
+ env.manager.registerTask(bar.task);
+ assertEquals(2, env.numDeferred());
+ env.advanceTime(50);
+ assertEquals(0, foo.numCalls.get());
+ assertEquals(1, bar.numCalls.get());
+ assertEquals(2, env.numDeferred());
+ env.advanceTime(10);
+ assertEquals(2, bar.numCalls.get());
+ env.advanceTime(40);
+ assertEquals(1, foo.numCalls.get());
+ assertEquals(2, bar.numCalls.get());
+ assertEquals(2, env.numDeferred());
+ env.advanceTime(10);
+ assertEquals(3, bar.numCalls.get());
+ env.manager.deactivate();
+ }
+
+ @Test
+ public void testRegisterTaskAndUnregister() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ FakePeriodicTask bar = new FakePeriodicTask("bar",
MILLISECONDS.toNanos(50));
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.activate();
+ env.manager.registerTask(foo.task);
+ env.manager.registerTask(bar.task);
+ assertEquals(2, env.numDeferred());
+ env.advanceTime(50);
+ assertEquals(0, foo.numCalls.get());
+ assertEquals(1, bar.numCalls.get());
+ env.manager.unregisterTask(foo.task.name());
+ assertEquals(1, env.numDeferred());
+ env.manager.unregisterTask(bar.task.name());
+ assertEquals(0, env.numDeferred());
+ env.advanceTime(200);
+ assertEquals(0, foo.numCalls.get());
+ assertEquals(1, bar.numCalls.get());
+ env.manager.deactivate();
+ }
+
+ @Test
+ public void testReschedulingAfterFailure() {
+ FakePeriodicTask foo = new FakePeriodicTask("foo",
MILLISECONDS.toNanos(100));
+ foo.shouldFail.set(true);
+ PeriodicTaskControlManagerTestEnv env = new
PeriodicTaskControlManagerTestEnv();
+ env.manager.activate();
+ env.manager.registerTask(foo.task);
+ assertEquals(1, env.numDeferred());
+ env.advanceTime(100);
+ assertEquals(1, foo.numCalls.get());
+ env.advanceTime(300000);
+ assertEquals(2, foo.numCalls.get());
+ env.manager.deactivate();
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a8985d880bc..12dfba3d7ce 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -622,8 +622,8 @@ public class QuorumControllerTest {
@Test
public void testNoOpRecordWriteAfterTimeout() throws Throwable {
- long maxIdleIntervalNs = 1_000;
- long maxReplicationDelayMs = 60_000;
+ long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100);
+ long maxReplicationDelayMs = 1_000;
try (
LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
build();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 1fb688f676e..e4f045b1fa2 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -448,6 +448,7 @@ public class ReplicationControlManagerTest {
void unfenceBrokers(Integer... brokerIds) {
for (int brokerId : brokerIds) {
+ clusterControl.trackBrokerHeartbeat(brokerId,
defaultBrokerEpoch(brokerId));
ControllerResult<BrokerHeartbeatReply> result =
replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
setBrokerId(brokerId).setBrokerEpoch(defaultBrokerEpoch(brokerId)).
@@ -494,12 +495,11 @@ public class ReplicationControlManagerTest {
.collect(Collectors.toSet());
unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
- Optional<Integer> staleBroker =
clusterControl.heartbeatManager().findOneStaleBroker();
- while (staleBroker.isPresent()) {
- ControllerResult<Void> fenceResult =
replicationControl.maybeFenceOneStaleBroker();
+ ControllerResult<Boolean> fenceResult;
+ do {
+ fenceResult = replicationControl.maybeFenceOneStaleBroker();
replay(fenceResult.records());
- staleBroker =
clusterControl.heartbeatManager().findOneStaleBroker();
- }
+ } while (fenceResult.response().booleanValue());
assertEquals(brokerIds, clusterControl.fencedBrokerIds());
}
@@ -2844,8 +2844,7 @@ public class ReplicationControlManagerTest {
filter(broker -> broker.id() == 0).findFirst();
assertTrue(state.isPresent());
assertEquals(0, state.get().id());
- assertEquals(100000000L, state.get().lastContactNs);
- assertEquals(123, state.get().metadataOffset);
+ assertEquals(123, state.get().metadataOffset());
}
@Test
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
index f854456cfad..14f644c5f9f 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
@@ -19,10 +19,12 @@ package org.apache.kafka.controller.errors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -44,6 +46,14 @@ public class EventHandlerExceptionInfoTest {
private static final EventHandlerExceptionInfo REJECTED_EXECUTION =
EventHandlerExceptionInfo.fromInternal(new
RejectedExecutionException(), OptionalInt::empty);
+ private static final EventHandlerExceptionInfo BOUNDED_LIST_TOO_LONG =
+ EventHandlerExceptionInfo.fromInternal(new
BoundedListTooLongException("too long"), OptionalInt::empty);
+
+ private static final EventHandlerExceptionInfo PERIODIC_FAILURE =
+ EventHandlerExceptionInfo.fromInternal(
+ new PeriodicControlTaskException("foo: task failed: null pointer.",
+ new NullPointerException()), OptionalInt::empty);
+
private static final EventHandlerExceptionInfo INTERRUPTED =
EventHandlerExceptionInfo.fromInternal(
new InterruptedException(),
@@ -87,6 +97,34 @@ public class EventHandlerExceptionInfoTest {
REJECTED_EXECUTION.failureMessage(123, OptionalLong.empty(), true,
456L));
}
+ @Test
+ public void testBoundedListTooLongExceptionInfo() {
+ assertEquals(new EventHandlerExceptionInfo(false, false,
+ new BoundedListTooLongException("too long"),
+ new PolicyViolationException("Unable to perform excessively large
batch operation.")),
+ BOUNDED_LIST_TOO_LONG);
+ }
+
+ @Test
+ public void testBoundedListTooLongExceptionFailureMessage() {
+ assertEquals("event failed with BoundedListTooLongException (treated
as PolicyViolationException) " +
+ "in 234 microseconds. Exception message: too long",
+ BOUNDED_LIST_TOO_LONG.failureMessage(123,
OptionalLong.of(234L), true, 456L));
+ }
+
+ @Test
+ public void testPeriodicControlTaskExceptionInfo() {
+ assertEquals(new EventHandlerExceptionInfo(true, false,
+ new PeriodicControlTaskException("foo: task failed: null
pointer.", new NullPointerException())),
+ PERIODIC_FAILURE);
+ }
+
+ @Test
+ public void testPeriodicControlTaskExceptionFailureMessage() {
+ assertEquals("event failed with PeriodicControlTaskException in 234
microseconds.",
+ PERIODIC_FAILURE.failureMessage(123, OptionalLong.of(234L),
true, 456L));
+ }
+
@Test
public void testInterruptedExceptionInfo() {
assertEquals(new EventHandlerExceptionInfo(true, true,
@@ -164,6 +202,8 @@ public class EventHandlerExceptionInfoTest {
public void testIsNotTimeoutException() {
assertFalse(TOPIC_EXISTS.isTimeoutException());
assertFalse(REJECTED_EXECUTION.isTimeoutException());
+ assertFalse(BOUNDED_LIST_TOO_LONG.isTimeoutException());
+ assertFalse(PERIODIC_FAILURE.isTimeoutException());
assertFalse(INTERRUPTED.isTimeoutException());
assertFalse(NULL_POINTER.isTimeoutException());
assertFalse(NOT_LEADER.isTimeoutException());