This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a03a71d7b5f KAFKA-15357: Aggregate and propagate assignments
a03a71d7b5f is described below
commit a03a71d7b5f123304d98f0de07cd56c2534d381a
Author: Igor Soarez <[email protected]>
AuthorDate: Mon Sep 11 22:44:40 2023 +0100
KAFKA-15357: Aggregate and propagate assignments
A new AssignmentsManager accumulates, batches, and sends KIP-858
assignment events to the Controller. Assignments are sent via
AssignReplicasToDirs requests.
Move QuorumTestHarness.formatDirectories into TestUtils so it can be
used in other test contexts.
Fix a bug in ControllerRegistration.java where the wrong version of the
record was being generated in ControllerRegistration.toRecord.
Reviewers: Colin P. McCabe <[email protected]>, Proven Provenzano
<[email protected]>, Omnia G H Ibrahim <[email protected]>
---
checkstyle/import-control-core.xml | 1 +
.../requests/AssignReplicasToDirsRequest.java | 7 +
.../main/java/kafka/server/AssignmentsManager.java | 394 +++++++++++++++++++++
.../server/builders/ReplicaManagerBuilder.java | 10 +-
core/src/main/scala/kafka/cluster/Partition.scala | 6 +
.../src/main/scala/kafka/server/BrokerServer.scala | 34 +-
.../main/scala/kafka/server/ReplicaManager.scala | 56 ++-
.../java/kafka/server/AssignmentsManagerTest.java | 234 ++++++++++++
.../kafka/server/QuorumTestHarness.scala | 27 +-
.../server/ReplicaManagerConcurrencyTest.scala | 8 +
.../unit/kafka/server/StopReplicaRequestTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 33 +-
.../kafka/metadata/ControllerRegistration.java | 2 +-
.../java/org/apache/kafka/queue/EventQueue.java | 19 +
.../kafka/server/common/DirectoryEventHandler.java | 44 +++
15 files changed, 839 insertions(+), 38 deletions(-)
diff --git a/checkstyle/import-control-core.xml
b/checkstyle/import-control-core.xml
index 849c45e5b19..4430b8ec9dd 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -36,6 +36,7 @@
<allow pkg="kafka.utils" />
<allow pkg="kafka.serializer" />
<allow pkg="org.apache.kafka.common" />
+ <allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the
global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable
-->
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
index 4edd726850a..5941181ed81 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
@@ -27,6 +27,13 @@ import java.nio.ByteBuffer;
public class AssignReplicasToDirsRequest extends AbstractRequest {
+ /**
+ * The maximum number of assignments to be included in a single request.
+ * This limit was chosen based on the maximum size of
AssignReplicasToDirsRequest for
+ * 10 different directory IDs, so that it still fits in a single TCP
packet. i.e. 64KB.
+ */
+ public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250;
+
public static class Builder extends
AbstractRequest.Builder<AssignReplicasToDirsRequest> {
private final AssignReplicasToDirsRequestData data;
diff --git a/core/src/main/java/kafka/server/AssignmentsManager.java
b/core/src/main/java/kafka/server/AssignmentsManager.java
new file mode 100644
index 00000000000..ade476bcab2
--- /dev/null
+++ b/core/src/main/java/kafka/server/AssignmentsManager.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+ private static final Logger log =
LoggerFactory.getLogger(AssignmentsManager.class);
+
+ /**
+ * Assignments are dispatched to the controller this long after
+ * being submitted to {@link AssignmentsManager}, if there
+ * is no request in flight already.
+ * The interval is reset when a new assignment is submitted.
+ * If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
+ * is reached, we ignore this interval and dispatch immediately.
+ */
+ private static final long DISPATCH_INTERVAL_NS =
TimeUnit.MILLISECONDS.toNanos(500);
+
+ private static final long MAX_BACKOFF_INTERVAL_MS =
TimeUnit.SECONDS.toNanos(10);
+
+ private final Time time;
+ private final NodeToControllerChannelManager channelManager;
+ private final int brokerId;
+ private final Supplier<Long> brokerEpochSupplier;
+ private final KafkaEventQueue eventQueue;
+
+ // These variables should only be mutated from the KafkaEventQueue thread
+ private Map<TopicIdPartition, AssignmentEvent> inflight = null;
+ private Map<TopicIdPartition, AssignmentEvent> pending = new HashMap<>();
+ private final ExponentialBackoff resendExponentialBackoff =
+ new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+ private int failedAttempts = 0;
+
+ public AssignmentsManager(Time time,
+ NodeToControllerChannelManager channelManager,
+ int brokerId,
+ Supplier<Long> brokerEpochSupplier) {
+ this.time = time;
+ this.channelManager = channelManager;
+ this.brokerId = brokerId;
+ this.brokerEpochSupplier = brokerEpochSupplier;
+ this.eventQueue = new KafkaEventQueue(time,
+ new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+ "broker-" + brokerId + "-directory-assignments-manager-");
+ }
+
+ public void close() throws InterruptedException {
+ eventQueue.close();
+ channelManager.shutdown();
+ }
+
+ public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+ eventQueue.append(new AssignmentEvent(time.nanoseconds(),
topicPartition, dirId));
+ }
+
+ // only for testing
+ void wakeup() {
+ eventQueue.wakeup();
+ }
+
+ /**
+ * Base class for all the events handled by {@link AssignmentsManager}.
+ */
+ private abstract static class Event implements EventQueue.Event {
+ /**
+ * Override the default behavior in
+ * {@link EventQueue.Event#handleException}
+ * which swallows the exception.
+ */
+ @Override
+ public void handleException(Throwable e) {
+ log.error("Unexpected error handling {}", this, e);
+ }
+ }
+
+ /**
+ * Handles new generated assignments, to be propagated to the controller.
+ * Assignment events may be handled out of order, so for any two assignment
+ * events for the same topic partition, the one with the oldest timestamp
is
+ * disregarded.
+ */
+ private class AssignmentEvent extends Event {
+ final long timestampNs;
+ final TopicIdPartition partition;
+ final Uuid dirId;
+ AssignmentEvent(long timestampNs, TopicIdPartition partition, Uuid
dirId) {
+ this.timestampNs = timestampNs;
+ this.partition = partition;
+ this.dirId = dirId;
+ }
+ @Override
+ public void run() throws Exception {
+ AssignmentEvent existing = pending.getOrDefault(partition, null);
+ if (existing != null && existing.timestampNs > timestampNs) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dropping assignment {} because it's older than
{}", this, existing);
+ }
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Received new assignment {}", this);
+ }
+ pending.put(partition, this);
+ if (inflight == null || inflight.isEmpty()) {
+ scheduleDispatch();
+ }
+ }
+ @Override
+ public String toString() {
+ return "AssignmentEvent{" +
+ "timestampNs=" + timestampNs +
+ ", partition=" + partition +
+ ", dirId=" + dirId +
+ '}';
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AssignmentEvent that = (AssignmentEvent) o;
+ return timestampNs == that.timestampNs &&
+ Objects.equals(partition, that.partition) &&
+ Objects.equals(dirId, that.dirId);
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestampNs, partition, dirId);
+ }
+ }
+
+ /**
+ * Gathers pending assignments and pushes them to the controller in a
{@link AssignReplicasToDirsRequest}.
+ */
+ private class DispatchEvent extends Event {
+ static final String TAG = "dispatch";
+ @Override
+ public void run() throws Exception {
+ if (inflight != null) {
+ throw new IllegalStateException("Bug. Should not be
dispatching while there are assignments in flight");
+ }
+ if (pending.isEmpty()) {
+ log.trace("No pending assignments, no-op dispatch");
+ return;
+ }
+ Collection<AssignmentEvent> events = pending.values();
+ pending = new HashMap<>();
+ inflight = new HashMap<>();
+ for (AssignmentEvent event : events) {
+ if (inflight.size() <
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
+ inflight.put(event.partition, event);
+ } else {
+ pending.put(event.partition, event);
+ }
+ }
+ if (!pending.isEmpty()) {
+ log.warn("Too many assignments ({}) to fit in one call,
sending only {} and queueing the rest",
+
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST + pending.size(),
+
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST);
+ }
+ Map<TopicIdPartition, Uuid> assignment =
inflight.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().dirId));
+ if (log.isDebugEnabled()) {
+ log.debug("Dispatching {} assignments: {}",
assignment.size(), assignment);
+ }
+ channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
+ buildRequestData(brokerId, brokerEpochSupplier.get(),
assignment)),
+ new AssignReplicasToDirsRequestCompletionHandler());
+ }
+ }
+
+ /**
+ * Handles the response to a dispatched {@link
AssignReplicasToDirsRequest}.
+ */
+ private class AssignmentResponseEvent extends Event {
+ private final ClientResponse response;
+ public AssignmentResponseEvent(ClientResponse response) {
+ this.response = response;
+ }
+ @Override
+ public void run() throws Exception {
+ if (inflight == null) {
+ throw new IllegalStateException("Bug. Cannot not be handling a
client response if there is are no assignments in flight");
+ }
+ if (responseIsError(response)) {
+ requeueAllAfterFailure();
+ } else {
+ failedAttempts = 0;
+ AssignReplicasToDirsResponseData data =
((AssignReplicasToDirsResponse) response.responseBody()).data();
+ Set<AssignmentEvent> failed = filterFailures(data, inflight);
+ log.warn("Re-queueing assignments: {}", failed);
+ if (!failed.isEmpty()) {
+ for (AssignmentEvent event : failed) {
+ pending.put(event.partition, event);
+ }
+ }
+ inflight = null;
+ if (!pending.isEmpty()) {
+ scheduleDispatch();
+ }
+ }
+ }
+ }
+
+ /**
+ * Callback for a {@link AssignReplicasToDirsRequest}.
+ */
+ private class AssignReplicasToDirsRequestCompletionHandler extends
ControllerRequestCompletionHandler {
+ @Override
+ public void onTimeout() {
+ log.warn("Request to controller timed out");
+ appendResponseEvent(null);
+ }
+ @Override
+ public void onComplete(ClientResponse response) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received controller response: {}", response);
+ }
+ appendResponseEvent(response);
+ }
+ void appendResponseEvent(ClientResponse response) {
+ eventQueue.prepend(new AssignmentResponseEvent(response));
+ }
+ }
+
+ private void scheduleDispatch() {
+ if (pending.size() <
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
+ scheduleDispatch(DISPATCH_INTERVAL_NS);
+ } else {
+ log.debug("Too many pending assignments, dispatching immediately");
+ eventQueue.enqueue(EventQueue.EventInsertionType.APPEND,
DispatchEvent.TAG + "-immediate",
+ new EventQueue.NoDeadlineFunction(), new DispatchEvent());
+ }
+ }
+
+ private void scheduleDispatch(long delayNs) {
+ if (log.isTraceEnabled()) {
+ log.debug("Scheduling dispatch in {}ns", delayNs);
+ }
+ eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED,
DispatchEvent.TAG,
+ new EventQueue.LatestDeadlineFunction(time.nanoseconds() +
delayNs), new DispatchEvent());
+ }
+
+ private void requeueAllAfterFailure() {
+ if (inflight != null) {
+ log.debug("Re-queueing all in-flight assignments after failure");
+ for (AssignmentEvent event : inflight.values()) {
+ pending.put(event.partition, event);
+ }
+ inflight = null;
+ ++failedAttempts;
+ long backoffNs =
TimeUnit.MILLISECONDS.toNanos(resendExponentialBackoff.backoff(failedAttempts));
+ scheduleDispatch(DISPATCH_INTERVAL_NS + backoffNs);
+ }
+ }
+
+ private static boolean responseIsError(ClientResponse response) {
+ if (response == null) {
+ log.debug("Response is null");
+ return true;
+ }
+ if (response.authenticationException() != null) {
+ log.error("Failed to propagate directory assignments because
authentication failed", response.authenticationException());
+ return true;
+ }
+ if (response.versionMismatch() != null) {
+ log.error("Failed to propagate directory assignments because the
request version is unsupported", response.versionMismatch());
+ return true;
+ }
+ if (response.wasDisconnected()) {
+ log.error("Failed to propagate directory assignments because the
connection to the controller was disconnected");
+ return true;
+ }
+ if (response.wasTimedOut()) {
+ log.error("Failed to propagate directory assignments because the
request timed out");
+ return true;
+ }
+ if (response.responseBody() == null) {
+ log.error("Failed to propagate directory assignments because the
Controller returned an empty response");
+ return true;
+ }
+ if (!(response.responseBody() instanceof
AssignReplicasToDirsResponse)) {
+ log.error("Failed to propagate directory assignments because the
Controller returned an invalid response type");
+ return true;
+ }
+ AssignReplicasToDirsResponseData data =
((AssignReplicasToDirsResponse) response.responseBody()).data();
+ Errors error = Errors.forCode(data.errorCode());
+ if (error != Errors.NONE) {
+ log.error("Failed to propagate directory assignments because the
Controller returned error {}", error.name());
+ return true;
+ }
+ return false;
+ }
+
+ private static Set<AssignmentEvent> filterFailures(
+ AssignReplicasToDirsResponseData data,
+ Map<TopicIdPartition, AssignmentEvent> sent) {
+ Set<AssignmentEvent> failures = new HashSet<>();
+ Set<TopicIdPartition> acknowledged = new HashSet<>();
+ for (AssignReplicasToDirsResponseData.DirectoryData directory :
data.directories()) {
+ for (AssignReplicasToDirsResponseData.TopicData topic :
directory.topics()) {
+ for (AssignReplicasToDirsResponseData.PartitionData partition
: topic.partitions()) {
+ TopicIdPartition topicPartition = new
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+ AssignmentEvent event = sent.get(topicPartition);
+ if (event == null) {
+ log.error("AssignReplicasToDirsResponse contains
unexpected partition {} into directory {}", partition, directory.id());
+ } else {
+ acknowledged.add(topicPartition);
+ Errors error = Errors.forCode(partition.errorCode());
+ if (error != Errors.NONE) {
+ log.error("Controller returned error {} for
assignment of partition {} into directory {}",
+ error.name(), partition, event.dirId);
+ failures.add(event);
+ }
+ }
+ }
+ }
+ }
+ for (AssignmentEvent event : sent.values()) {
+ if (!acknowledged.contains(event.partition)) {
+ log.error("AssignReplicasToDirsResponse is missing assignment
of partition {} into directory {}", event.partition, event.dirId);
+ failures.add(event);
+ }
+ }
+ return failures;
+ }
+
+ // visible for testing
+ static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long
brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
+ Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
+ Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
+ for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
+ TopicIdPartition topicPartition = entry.getKey();
+ Uuid directoryId = entry.getValue();
+ DirectoryData directory =
directoryMap.computeIfAbsent(directoryId, d -> new
DirectoryData().setId(directoryId));
+ TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new
HashMap<>())
+ .computeIfAbsent(topicPartition.topicId(), topicId -> {
+ TopicData data = new TopicData().setTopicId(topicId);
+ directory.topics().add(data);
+ return data;
+ });
+ PartitionData partition = new
PartitionData().setPartitionIndex(topicPartition.partitionId());
+ topic.partitions().add(partition);
+ }
+ return new AssignReplicasToDirsRequestData()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(brokerEpoch)
+ .setDirectories(new ArrayList<>(directoryMap.values()));
+ }
+}
diff --git
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 2566e4bcfcb..6859900100e 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -35,6 +35,7 @@ import kafka.log.remote.RemoteLogManager;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.server.util.Scheduler;
import scala.compat.java8.OptionConverters;
@@ -66,6 +67,7 @@ public class ReplicaManagerBuilder {
private Optional<String> threadNamePrefix = Optional.empty();
private Long brokerEpoch = -1L;
private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager =
Optional.empty();
+ private DirectoryEventHandler directoryEventHandler =
DirectoryEventHandler.NOOP;
public ReplicaManagerBuilder setConfig(KafkaConfig config) {
this.config = config;
@@ -172,6 +174,11 @@ public class ReplicaManagerBuilder {
return this;
}
+ public ReplicaManagerBuilder
setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
+ this.directoryEventHandler = directoryEventHandler;
+ return this;
+ }
+
public ReplicaManager build() {
if (config == null) config = new KafkaConfig(Collections.emptyMap());
if (metrics == null) metrics = new Metrics();
@@ -200,6 +207,7 @@ public class ReplicaManagerBuilder {
OptionConverters.toScala(delayedRemoteFetchPurgatory),
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
-
OptionConverters.toScala(addPartitionsToTxnManager));
+
OptionConverters.toScala(addPartitionsToTxnManager),
+ directoryEventHandler);
}
}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 251f198dc57..abb69926eb3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -85,6 +85,7 @@ trait AlterPartitionListener {
def markIsrExpand(): Unit
def markIsrShrink(): Unit
def markFailed(): Unit
+ def assignDir(dir: String): Unit
}
class DelayedOperations(topicPartition: TopicPartition,
@@ -119,6 +120,10 @@ object Partition {
}
override def markFailed(): Unit =
replicaManager.failedIsrUpdatesRate.mark()
+
+ override def assignDir(dir: String): Unit = {
+ replicaManager.maybeNotifyPartitionAssignedToDirectory(topicPartition,
dir)
+ }
}
val delayedOperations = new DelayedOperations(
@@ -480,6 +485,7 @@ class Partition(val topicPartition: TopicPartition,
if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener)
maybeLog = Some(log)
updateHighWatermark(log)
+ alterPartitionListener.assignDir(log.parentDir)
log
} finally {
logManager.finishedInitializingLog(topicPartition, maybeLog)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e34fe4c89c9..045429c8436 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{ClusterResource, KafkaException,
TopicPartition}
+import org.apache.kafka.common.{ClusterResource, KafkaException,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.group
import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
import org.apache.kafka.coordinator.group.util.SystemTimerReaper
@@ -43,7 +43,7 @@ import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
@@ -85,6 +85,8 @@ class BrokerServer(
@volatile var lifecycleManager: BrokerLifecycleManager = _
+ var assignmentsManager: AssignmentsManager = _
+
private val isShuttingDown = new AtomicBoolean(false)
val lock = new ReentrantLock()
@@ -277,6 +279,28 @@ class BrokerServer(
time
)
+ val assignmentsChannelManager = NodeToControllerChannelManager(
+ controllerNodeProvider,
+ time,
+ metrics,
+ config,
+ "directory-assignments",
+ s"broker-${config.nodeId}-",
+ retryTimeoutMs = 60000
+ )
+ assignmentsManager = new AssignmentsManager(
+ time,
+ assignmentsChannelManager,
+ config.brokerId,
+ () => lifecycleManager.brokerEpoch
+ )
+ val directoryEventHandler = new DirectoryEventHandler {
+ override def handleAssignment(partition: TopicIdPartition,
directoryId: Uuid): Unit =
+ assignmentsManager.onAssignment(partition, directoryId)
+ override def handleFailure(directoryId: Uuid): Unit =
+ lifecycleManager.propagateDirectoryFailure(directoryId)
+ }
+
this._replicaManager = new ReplicaManager(
config = config,
metrics = metrics,
@@ -294,7 +318,8 @@ class BrokerServer(
threadNamePrefix = None, // The ReplicaManager only runs on the
broker, and already includes the ID in thread names.
delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
- addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
+ addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
+ directoryEventHandler = directoryEventHandler
)
/* start token manager */
@@ -648,6 +673,9 @@ class BrokerServer(
if (tokenManager != null)
CoreUtils.swallow(tokenManager.shutdown(), this)
+ if (assignmentsManager != null)
+ CoreUtils.swallow(assignmentsManager.close(), this)
+
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3b584f54621..dfd50fdec6c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName,
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName,
IsrShrinksPerSecMetricName, LeaderCountMetricName,
OfflineReplicaCountMetricName, PartitionCountMetricName,
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName,
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName,
UnderReplicatedPartitionsMetricName}
import kafka.server.ReplicaManager.createLogReadResult
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile,
OffsetCheckpoints}
-import kafka.server.metadata.ZkMetadataCache
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.utils.Implicits._
import kafka.utils._
import kafka.zk.KafkaZkClient
@@ -55,6 +55,8 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
+import org.apache.kafka.server.common
+import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
@@ -269,7 +271,8 @@ class ReplicaManager(val config: KafkaConfig,
delayedRemoteFetchPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1,
- addPartitionsToTxnManager:
Option[AddPartitionsToTxnManager] = None
+ addPartitionsToTxnManager:
Option[AddPartitionsToTxnManager] = None,
+ directoryEventHandler: DirectoryEventHandler =
DirectoryEventHandler.NOOP
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -2292,9 +2295,9 @@ class ReplicaManager(val config: KafkaConfig,
* The log directory failure handler for the replica
*
* @param dir the absolute path of the log directory
- * @param sendZkNotification check if we need to send notification to
zookeeper node (needed for unit test)
+ * @param notifyController check if we need to send notification to
the Controller (needed for unit test)
*/
- def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true):
Unit = {
+ def handleLogDirFailure(dir: String, notifyController: Boolean = true): Unit
= {
if (!logManager.isLogDirOnline(dir))
return
warn(s"Stopping serving replicas in dir $dir")
@@ -2323,16 +2326,57 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions
${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the
failed log directory $dir.")
}
logManager.handleLogDirFailure(dir)
+ if (dir == config.metadataLogDir) {
+ fatal(s"Shutdown broker because the metadata log dir $dir has failed")
+ Exit.halt(1)
+ }
- if (sendZkNotification)
+ if (notifyController) {
+ if (config.migrationEnabled) {
+ fatal(s"Shutdown broker because some log directory has failed during
migration mode: $dir")
+ Exit.halt(1)
+ }
if (zkClient.isEmpty) {
- warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+ val uuid = logManager.directoryId(dir)
+ if (uuid.isDefined) {
+ directoryEventHandler.handleFailure(uuid.get)
+ } else {
+ fatal(s"Unable to propagate directory failure disabled because
directory $dir has no UUID")
+ Exit.halt(1)
+ }
} else {
zkClient.get.propagateLogDirEvent(localBrokerId)
}
+ }
warn(s"Stopped serving replicas in dir $dir")
}
+ /**
+ * Called when a topic partition is placed in a log directory.
+ * If a directory event listener is configured,
+ * and if the selected log directory has an assigned Uuid,
+ * then the listener is notified of the assignment.
+ */
+ def maybeNotifyPartitionAssignedToDirectory(tp: TopicPartition, dir:
String): Unit = {
+ if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
+ logManager.directoryId(dir) match {
+ case None => throw new IllegalStateException(s"Assignment into
unidentified directory: ${dir}")
+ case Some(dirId) =>
+ getPartition(tp) match {
+ case HostedPartition.Offline | HostedPartition.None =>
+ throw new IllegalStateException("Assignment for a partition that
is not online")
+ case HostedPartition.Online(partition) => partition.topicId match {
+ case None =>
+ throw new IllegalStateException(s"Assignment for topic without
ID: ${tp.topic()}")
+ case Some(topicId) =>
+ val topicIdPartition = new common.TopicIdPartition(topicId,
tp.partition())
+ directoryEventHandler.handleAssignment(topicIdPartition, dirId)
+ }
+ }
+ }
+ }
+ }
+
def removeMetrics(): Unit = {
ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric)
}
diff --git a/core/src/test/java/kafka/server/AssignmentsManagerTest.java
b/core/src/test/java/kafka/server/AssignmentsManagerTest.java
new file mode 100644
index 00000000000..54de654960c
--- /dev/null
+++ b/core/src/test/java/kafka/server/AssignmentsManagerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class AssignmentsManagerTest {
+
+ private static final Uuid TOPIC_1 =
Uuid.fromString("88rnFIqYSZykX4ZSKv81bg");
+ private static final Uuid TOPIC_2 =
Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ");
+ private static final Uuid DIR_1 =
Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w");
+ private static final Uuid DIR_2 =
Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ");
+ private static final Uuid DIR_3 =
Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q");
+
+ private MockTime time;
+ private NodeToControllerChannelManager channelManager;
+ private AssignmentsManager manager;
+
+ @BeforeEach
+ public void setup() {
+ time = new MockTime();
+ channelManager = mock(NodeToControllerChannelManager.class);
+ manager = new AssignmentsManager(time, channelManager, 8, () -> 100L);
+ }
+
+ @AfterEach
+ void tearDown() throws InterruptedException {
+ manager.close();
+ }
+
+ @Test
+ void testBuildRequestData() {
+ Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition,
Uuid>() {{
+ put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+ put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+ put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+ put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+ }};
+ AssignReplicasToDirsRequestData built =
AssignmentsManager.buildRequestData(8, 100L, assignment);
+ AssignReplicasToDirsRequestData expected = new
AssignReplicasToDirsRequestData()
+ .setBrokerId(8)
+ .setBrokerEpoch(100L)
+ .setDirectories(Arrays.asList(
+ new AssignReplicasToDirsRequestData.DirectoryData()
+ .setId(DIR_2)
+ .setTopics(Arrays.asList(
+ new
AssignReplicasToDirsRequestData.TopicData()
+ .setTopicId(TOPIC_1)
+
.setPartitions(Collections.singletonList(
+ new
AssignReplicasToDirsRequestData.PartitionData()
+
.setPartitionIndex(2)
+ )),
+ new
AssignReplicasToDirsRequestData.TopicData()
+ .setTopicId(TOPIC_2)
+
.setPartitions(Collections.singletonList(
+ new
AssignReplicasToDirsRequestData.PartitionData()
+
.setPartitionIndex(5)
+ ))
+ )),
+ new AssignReplicasToDirsRequestData.DirectoryData()
+ .setId(DIR_3)
+ .setTopics(Collections.singletonList(
+ new
AssignReplicasToDirsRequestData.TopicData()
+ .setTopicId(TOPIC_1)
+
.setPartitions(Collections.singletonList(
+ new
AssignReplicasToDirsRequestData.PartitionData()
+
.setPartitionIndex(3)
+ ))
+ )),
+ new AssignReplicasToDirsRequestData.DirectoryData()
+ .setId(DIR_1)
+ .setTopics(Collections.singletonList(
+ new
AssignReplicasToDirsRequestData.TopicData()
+ .setTopicId(TOPIC_1)
+ .setPartitions(Arrays.asList(
+ new
AssignReplicasToDirsRequestData.PartitionData()
+
.setPartitionIndex(4),
+ new
AssignReplicasToDirsRequestData.PartitionData()
+
.setPartitionIndex(1)
+ ))
+ ))
+ ));
+ assertEquals(expected, built);
+ }
+
+ @Test
+ public void testAssignmentAggregation() throws InterruptedException {
+ CountDownLatch readyToAssert = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ if (readyToAssert.getCount() > 0) {
+ readyToAssert.countDown();
+ }
+ return null;
+
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
any(ControllerRequestCompletionHandler.class));
+
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+ manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+ while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
+ time.sleep(100);
+ manager.wakeup();
+ }
+
+ ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
+ verify(channelManager).sendRequest(captor.capture(),
any(ControllerRequestCompletionHandler.class));
+ verifyNoMoreInteractions(channelManager);
+ assertEquals(1, captor.getAllValues().size());
+ AssignReplicasToDirsRequestData actual =
captor.getValue().build().data();
+ AssignReplicasToDirsRequestData expected =
AssignmentsManager.buildRequestData(
+ 8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+ put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+ put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+ put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+ put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+ }}
+ );
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testRequeuesFailedAssignmentPropagations() throws
InterruptedException {
+ CountDownLatch readyToAssert = new CountDownLatch(5);
+ doAnswer(invocation -> {
+ if (readyToAssert.getCount() > 0) {
+ readyToAssert.countDown();
+ }
+ if (readyToAssert.getCount() == 4) {
+ invocation.getArgument(1,
ControllerRequestCompletionHandler.class).onTimeout();
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3);
+ }
+ if (readyToAssert.getCount() == 3) {
+ invocation.getArgument(1,
ControllerRequestCompletionHandler.class).onComplete(
+ new ClientResponse(null, null, null, 0L, 0L, false,
false,
+ new UnsupportedVersionException("test
unsupported version exception"), null, null)
+ );
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 3),
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
+ }
+ if (readyToAssert.getCount() == 2) {
+ invocation.getArgument(1,
ControllerRequestCompletionHandler.class).onComplete(
+ new ClientResponse(null, null, null, 0L, 0L, false,
false, null,
+ new AuthenticationException("test
authentication exception"), null)
+ );
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 4),
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
+ }
+ if (readyToAssert.getCount() == 1) {
+ invocation.getArgument(1,
ControllerRequestCompletionHandler.class).onComplete(
+ new ClientResponse(null, null, null, 0L, 0L, false,
false, null, null,
+ new AssignReplicasToDirsResponse(new
AssignReplicasToDirsResponseData()
+
.setErrorCode(Errors.NOT_CONTROLLER.code())
+ .setThrottleTimeMs(0)))
+ );
+ }
+ return null;
+
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
any(ControllerRequestCompletionHandler.class));
+
+ manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
+ time.sleep(TimeUnit.SECONDS.toMillis(1));
+ manager.wakeup();
+ }
+
+ ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor =
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
+ verify(channelManager, times(5)).sendRequest(captor.capture(),
any(ControllerRequestCompletionHandler.class));
+ verifyNoMoreInteractions(channelManager);
+ assertEquals(5, captor.getAllValues().size());
+ assertEquals(AssignmentsManager.buildRequestData(
+ 8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+ put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ }}
+ ), captor.getAllValues().get(0).build().data());
+ assertEquals(AssignmentsManager.buildRequestData(
+ 8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+ put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
+ }}
+ ), captor.getAllValues().get(1).build().data());
+ assertEquals(AssignmentsManager.buildRequestData(
+ 8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+ put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+ put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
+ put(new TopicIdPartition(TOPIC_1, 3),
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
+ put(new TopicIdPartition(TOPIC_1, 4),
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
+ }}
+ ), captor.getAllValues().get(4).build().data());
+ }
+}
\ No newline at end of file
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 19836ac299c..70ae654b6e6 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -17,17 +17,16 @@
package kafka.server
-import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.io.File
import java.net.InetSocketAddress
import java.util
import java.util.{Collections, Optional, OptionalInt, Properties}
import java.util.concurrent.{CompletableFuture, TimeUnit}
import javax.security.auth.login.Configuration
-import kafka.tools.StorageTool
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
@@ -110,6 +109,7 @@ class KRaftQuorumImplementation(
setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId).
setNodeId(config.nodeId).
+ setDirectoryId(DirectoryId.random()).
build())
})
copier.setPreWriteHandler((logDir, _, _) => {
@@ -300,25 +300,6 @@ abstract class QuorumTestHarness extends Logging {
def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
- private def formatDirectories(directories: immutable.Seq[String],
- metaProperties: MetaProperties): Unit = {
- val stream = new ByteArrayOutputStream()
- var out: PrintStream = null
- try {
- out = new PrintStream(stream)
- val bootstrapMetadata =
StorageTool.buildBootstrapMetadata(metadataVersion,
-
optionalMetadataRecords, "format command")
- if (StorageTool.formatCommand(out, directories, metaProperties,
bootstrapMetadata, metadataVersion,
- ignoreFormatted = false) != 0) {
- throw new RuntimeException(stream.toString())
- }
- debug(s"Formatted storage directory(ies) ${directories}")
- } finally {
- if (out != null) out.close()
- stream.close()
- }
- }
-
private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
val propsList = kraftControllerConfigs()
if (propsList.size != 1) {
@@ -337,7 +318,7 @@ abstract class QuorumTestHarness extends Logging {
setClusterId(Uuid.randomUuid().toString).
setNodeId(nodeId).
build()
- formatDirectories(immutable.Seq(metadataDir.getAbsolutePath),
metaProperties)
+ TestUtils.formatDirectories(immutable.Seq(metadataDir.getAbsolutePath),
metaProperties, metadataVersion, optionalMetadataRecords)
val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 00d7d61f5e4..330ea2c0c78 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -38,6 +38,8 @@ import org.apache.kafka.common.{IsolationLevel,
TopicIdPartition, TopicPartition
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesVersion}
+import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation,
FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
@@ -147,6 +149,12 @@ class ReplicaManagerConcurrencyTest {
metadataCache: MetadataCache,
): ReplicaManager = {
val logDir = TestUtils.tempDir()
+ val metaProperties = new MetaProperties.Builder().
+ setVersion(MetaPropertiesVersion.V1).
+ setClusterId(Uuid.randomUuid().toString).
+ setNodeId(1).
+ build()
+ TestUtils.formatDirectories(Seq(logDir.getAbsolutePath), metaProperties,
MetadataVersion.latest(), None)
val props = new Properties
props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345")
diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
index d509a511456..aa9aadc22a7 100644
--- a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
@@ -46,7 +46,7 @@ class StopReplicaRequestTest extends BaseRequestTest {
val server = servers.head
val offlineDir = server.logManager.getLog(tp1).get.dir.getParent
- server.replicaManager.handleLogDirFailure(offlineDir, sendZkNotification =
false)
+ server.replicaManager.handleLogDirFailure(offlineDir, notifyController =
false)
val topicStates = Seq(
new StopReplicaTopicState()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index be859f8a17a..2e9ce677add 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger,
AtomicReference}
import java.util.concurrent.{Callable, CompletableFuture, ExecutionException,
Executors, TimeUnit}
import java.util.{Arrays, Collections, Optional, Properties}
import com.yammer.metrics.core.{Gauge, Histogram, Meter}
@@ -39,6 +39,7 @@ import kafka.network.RequestChannel
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
+import kafka.tools.StorageTool
import kafka.utils.Implicits._
import kafka.zk._
import org.apache.kafka.admin.BrokerMetadata
@@ -69,8 +70,9 @@ import
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController
+import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext,
Authorizer => JAuthorizer}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -84,7 +86,7 @@ import org.mockito.Mockito
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.collection.{Map, Seq, mutable}
+import scala.collection.{Map, Seq, immutable, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
@@ -1410,6 +1412,27 @@ object TestUtils extends Logging {
}.mkString("\n")
}
+ def formatDirectories(
+ directories: immutable.Seq[String],
+ metaProperties: MetaProperties,
+ metadataVersion: MetadataVersion,
+ optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]]
+ ): Unit = {
+ val stream = new ByteArrayOutputStream()
+ var out: PrintStream = null
+ try {
+ out = new PrintStream(stream)
+ val bootstrapMetadata =
StorageTool.buildBootstrapMetadata(metadataVersion, optionalMetadataRecords,
"format command")
+ if (StorageTool.formatCommand(out, directories, metaProperties,
bootstrapMetadata, metadataVersion, ignoreFormatted = false) != 0) {
+ throw new RuntimeException(stream.toString())
+ }
+ debug(s"Formatted storage directory(ies) ${directories}")
+ } finally {
+ if (out != null) out.close()
+ stream.close()
+ }
+ }
+
/**
* Create new LogManager instance with default configuration for testing
*/
@@ -1504,6 +1527,7 @@ object TestUtils extends Logging {
val expands: AtomicInteger = new AtomicInteger(0)
val shrinks: AtomicInteger = new AtomicInteger(0)
val failures: AtomicInteger = new AtomicInteger(0)
+ val directory: AtomicReference[String] = new AtomicReference[String]()
override def markIsrExpand(): Unit = expands.incrementAndGet()
@@ -1511,10 +1535,13 @@ object TestUtils extends Logging {
override def markFailed(): Unit = failures.incrementAndGet()
+ override def assignDir(dir: String): Unit = directory.set(dir)
+
def reset(): Unit = {
expands.set(0)
shrinks.set(0)
failures.set(0)
+ directory.set(null)
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
index f79515b8a10..cd2f43cc3e5 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
@@ -189,7 +189,7 @@ public class ControllerRegistration {
setMaxSupportedVersion(entry.getValue().max()));
}
return new ApiMessageAndVersion(registrationRecord,
- options.metadataVersion().registerBrokerRecordVersion());
+ options.metadataVersion().registerControllerRecordVersion());
}
@Override
diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
index df1fd0987cb..72d8026824a 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
@@ -107,6 +107,25 @@ public interface EventQueue extends AutoCloseable {
}
}
+ class LatestDeadlineFunction implements Function<OptionalLong,
OptionalLong> {
+ private final long newDeadlineNs;
+
+ public LatestDeadlineFunction(long newDeadlineNs) {
+ this.newDeadlineNs = newDeadlineNs;
+ }
+
+ @Override
+ public OptionalLong apply(OptionalLong prevDeadlineNs) {
+ if (!prevDeadlineNs.isPresent()) {
+ return OptionalLong.of(newDeadlineNs);
+ } else if (prevDeadlineNs.getAsLong() > newDeadlineNs) {
+ return prevDeadlineNs;
+ } else {
+ return OptionalLong.of(newDeadlineNs);
+ }
+ }
+ }
+
class VoidEvent implements Event {
public final static VoidEvent INSTANCE = new VoidEvent();
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
new file mode 100644
index 00000000000..e4bfe6781f0
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.Uuid;
+
+public interface DirectoryEventHandler {
+
+ /**
+ * A no-op implementation of {@link DirectoryEventHandler}.
+ */
+ DirectoryEventHandler NOOP = new DirectoryEventHandler() {
+ @Override public void handleAssignment(TopicIdPartition partition,
Uuid directoryId) {}
+ @Override public void handleFailure(Uuid directoryId) {}
+ };
+
+ /**
+ * Handle the assignment of a topic partition to a directory.
+ * @param directoryId The directory ID
+ * @param partition The topic partition
+ */
+ void handleAssignment(TopicIdPartition partition, Uuid directoryId);
+
+ /**
+ * Handle the transition of an online log directory to the offline state.
+ * @param directoryId The directory ID
+ */
+ void handleFailure(Uuid directoryId);
+}