This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a03a71d7b5f KAFKA-15357: Aggregate and propagate assignments
a03a71d7b5f is described below

commit a03a71d7b5f123304d98f0de07cd56c2534d381a
Author: Igor Soarez <[email protected]>
AuthorDate: Mon Sep 11 22:44:40 2023 +0100

    KAFKA-15357: Aggregate and propagate assignments
    
    A new AssignmentsManager accumulates, batches, and sends KIP-858
    assignment events to the Controller. Assignments are sent via
    AssignReplicasToDirs requests.
    
    Move QuorumTestHarness.formatDirectories into TestUtils so it can be
    used in other test contexts.
    
    Fix a bug in ControllerRegistration.java where the wrong version of the
    record was being generated in ControllerRegistration.toRecord.
    
    Reviewers: Colin P. McCabe <[email protected]>, Proven Provenzano 
<[email protected]>, Omnia G H Ibrahim <[email protected]>
---
 checkstyle/import-control-core.xml                 |   1 +
 .../requests/AssignReplicasToDirsRequest.java      |   7 +
 .../main/java/kafka/server/AssignmentsManager.java | 394 +++++++++++++++++++++
 .../server/builders/ReplicaManagerBuilder.java     |  10 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   6 +
 .../src/main/scala/kafka/server/BrokerServer.scala |  34 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  56 ++-
 .../java/kafka/server/AssignmentsManagerTest.java  | 234 ++++++++++++
 .../kafka/server/QuorumTestHarness.scala           |  27 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |   8 +
 .../unit/kafka/server/StopReplicaRequestTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  33 +-
 .../kafka/metadata/ControllerRegistration.java     |   2 +-
 .../java/org/apache/kafka/queue/EventQueue.java    |  19 +
 .../kafka/server/common/DirectoryEventHandler.java |  44 +++
 15 files changed, 839 insertions(+), 38 deletions(-)

diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index 849c45e5b19..4430b8ec9dd 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -36,6 +36,7 @@
   <allow pkg="kafka.utils" />
   <allow pkg="kafka.serializer" />
   <allow pkg="org.apache.kafka.common" />
+  <allow pkg="org.mockito" class="AssignmentsManagerTest"/>
 
   <!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the 
global default yammer metrics registry
        
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable
 -->
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
index 4edd726850a..5941181ed81 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsRequest.java
@@ -27,6 +27,13 @@ import java.nio.ByteBuffer;
 
 public class AssignReplicasToDirsRequest extends AbstractRequest {
 
+    /**
+     * The maximum number of assignments to be included in a single request.
+     * This limit was chosen based on the maximum size of 
AssignReplicasToDirsRequest for
+     * 10 different directory IDs, so that it still fits in a single TCP 
packet. i.e. 64KB.
+     */
+    public static final int MAX_ASSIGNMENTS_PER_REQUEST = 2250;
+
     public static class Builder extends 
AbstractRequest.Builder<AssignReplicasToDirsRequest> {
 
         private final AssignReplicasToDirsRequestData data;
diff --git a/core/src/main/java/kafka/server/AssignmentsManager.java 
b/core/src/main/java/kafka/server/AssignmentsManager.java
new file mode 100644
index 00000000000..ade476bcab2
--- /dev/null
+++ b/core/src/main/java/kafka/server/AssignmentsManager.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class AssignmentsManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(AssignmentsManager.class);
+
+    /**
+     * Assignments are dispatched to the controller this long after
+     * being submitted to {@link AssignmentsManager}, if there
+     * is no request in flight already.
+     * The interval is reset when a new assignment is submitted.
+     * If {@link AssignReplicasToDirsRequest#MAX_ASSIGNMENTS_PER_REQUEST}
+     * is reached, we ignore this interval and dispatch immediately.
+     */
+    private static final long DISPATCH_INTERVAL_NS = 
TimeUnit.MILLISECONDS.toNanos(500);
+
+    private static final long MAX_BACKOFF_INTERVAL_MS = 
TimeUnit.SECONDS.toNanos(10);
+
+    private final Time time;
+    private final NodeToControllerChannelManager channelManager;
+    private final int brokerId;
+    private final Supplier<Long> brokerEpochSupplier;
+    private final KafkaEventQueue eventQueue;
+
+    // These variables should only be mutated from the KafkaEventQueue thread
+    private Map<TopicIdPartition, AssignmentEvent> inflight = null;
+    private Map<TopicIdPartition, AssignmentEvent> pending = new HashMap<>();
+    private final ExponentialBackoff resendExponentialBackoff =
+            new ExponentialBackoff(100, 2, MAX_BACKOFF_INTERVAL_MS, 0.02);
+    private int failedAttempts = 0;
+
+    public AssignmentsManager(Time time,
+                              NodeToControllerChannelManager channelManager,
+                              int brokerId,
+                              Supplier<Long> brokerEpochSupplier) {
+        this.time = time;
+        this.channelManager = channelManager;
+        this.brokerId = brokerId;
+        this.brokerEpochSupplier = brokerEpochSupplier;
+        this.eventQueue = new KafkaEventQueue(time,
+                new LogContext("[AssignmentsManager id=" + brokerId + "]"),
+                "broker-" + brokerId + "-directory-assignments-manager-");
+    }
+
+    public void close() throws InterruptedException {
+        eventQueue.close();
+        channelManager.shutdown();
+    }
+
+    public void onAssignment(TopicIdPartition topicPartition, Uuid dirId) {
+        eventQueue.append(new AssignmentEvent(time.nanoseconds(), 
topicPartition, dirId));
+    }
+
+    // only for testing
+    void wakeup() {
+        eventQueue.wakeup();
+    }
+
+    /**
+     * Base class for all the events handled by {@link AssignmentsManager}.
+     */
+    private abstract static class Event implements EventQueue.Event {
+        /**
+         * Override the default behavior in
+         * {@link EventQueue.Event#handleException}
+         * which swallows the exception.
+         */
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Unexpected error handling {}", this, e);
+        }
+    }
+
+    /**
+     * Handles new generated assignments, to be propagated to the controller.
+     * Assignment events may be handled out of order, so for any two assignment
+     * events for the same topic partition, the one with the oldest timestamp 
is
+     * disregarded.
+     */
+    private class AssignmentEvent extends Event {
+        final long timestampNs;
+        final TopicIdPartition partition;
+        final Uuid dirId;
+        AssignmentEvent(long timestampNs, TopicIdPartition partition, Uuid 
dirId) {
+            this.timestampNs = timestampNs;
+            this.partition = partition;
+            this.dirId = dirId;
+        }
+        @Override
+        public void run() throws Exception {
+            AssignmentEvent existing = pending.getOrDefault(partition, null);
+            if (existing != null && existing.timestampNs > timestampNs) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Dropping assignment {} because it's older than 
{}", this, existing);
+                }
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Received new assignment {}", this);
+            }
+            pending.put(partition, this);
+            if (inflight == null || inflight.isEmpty()) {
+                scheduleDispatch();
+            }
+        }
+        @Override
+        public String toString() {
+            return "AssignmentEvent{" +
+                    "timestampNs=" + timestampNs +
+                    ", partition=" + partition +
+                    ", dirId=" + dirId +
+                    '}';
+        }
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            AssignmentEvent that = (AssignmentEvent) o;
+            return timestampNs == that.timestampNs &&
+                    Objects.equals(partition, that.partition) &&
+                    Objects.equals(dirId, that.dirId);
+        }
+        @Override
+        public int hashCode() {
+            return Objects.hash(timestampNs, partition, dirId);
+        }
+    }
+
+    /**
+     * Gathers pending assignments and pushes them to the controller in a 
{@link AssignReplicasToDirsRequest}.
+     */
+    private class DispatchEvent extends Event {
+        static final String TAG = "dispatch";
+        @Override
+        public void run() throws Exception {
+            if (inflight != null) {
+                throw new IllegalStateException("Bug. Should not be 
dispatching while there are assignments in flight");
+            }
+            if (pending.isEmpty()) {
+                log.trace("No pending assignments, no-op dispatch");
+                return;
+            }
+            Collection<AssignmentEvent> events = pending.values();
+            pending = new HashMap<>();
+            inflight = new HashMap<>();
+            for (AssignmentEvent event : events) {
+                if (inflight.size() < 
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
+                    inflight.put(event.partition, event);
+                } else {
+                    pending.put(event.partition, event);
+                }
+            }
+            if (!pending.isEmpty()) {
+                log.warn("Too many assignments ({}) to fit in one call, 
sending only {} and queueing the rest",
+                        
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST + pending.size(),
+                        
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST);
+            }
+            Map<TopicIdPartition, Uuid> assignment = 
inflight.entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().dirId));
+            if (log.isDebugEnabled()) {
+                log.debug("Dispatching {} assignments:  {}", 
assignment.size(), assignment);
+            }
+            channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
+                    buildRequestData(brokerId, brokerEpochSupplier.get(), 
assignment)),
+                    new AssignReplicasToDirsRequestCompletionHandler());
+        }
+    }
+
+    /**
+     * Handles the response to a dispatched {@link 
AssignReplicasToDirsRequest}.
+     */
+    private class AssignmentResponseEvent extends Event {
+        private final ClientResponse response;
+        public AssignmentResponseEvent(ClientResponse response) {
+            this.response = response;
+        }
+        @Override
+        public void run() throws Exception {
+            if (inflight == null) {
+                throw new IllegalStateException("Bug. Cannot not be handling a 
client response if there is are no assignments in flight");
+            }
+            if (responseIsError(response)) {
+                requeueAllAfterFailure();
+            } else {
+                failedAttempts = 0;
+                AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
+                Set<AssignmentEvent> failed = filterFailures(data, inflight);
+                log.warn("Re-queueing assignments: {}", failed);
+                if (!failed.isEmpty()) {
+                    for (AssignmentEvent event : failed) {
+                        pending.put(event.partition, event);
+                    }
+                }
+                inflight = null;
+                if (!pending.isEmpty()) {
+                    scheduleDispatch();
+                }
+            }
+        }
+    }
+
+    /**
+     * Callback for a {@link AssignReplicasToDirsRequest}.
+     */
+    private class AssignReplicasToDirsRequestCompletionHandler extends 
ControllerRequestCompletionHandler {
+        @Override
+        public void onTimeout() {
+            log.warn("Request to controller timed out");
+            appendResponseEvent(null);
+        }
+        @Override
+        public void onComplete(ClientResponse response) {
+            if (log.isDebugEnabled()) {
+                log.debug("Received controller response: {}", response);
+            }
+            appendResponseEvent(response);
+        }
+        void appendResponseEvent(ClientResponse response) {
+            eventQueue.prepend(new AssignmentResponseEvent(response));
+        }
+    }
+
+    private void scheduleDispatch() {
+        if (pending.size() < 
AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST) {
+            scheduleDispatch(DISPATCH_INTERVAL_NS);
+        } else {
+            log.debug("Too many pending assignments, dispatching immediately");
+            eventQueue.enqueue(EventQueue.EventInsertionType.APPEND, 
DispatchEvent.TAG + "-immediate",
+                    new EventQueue.NoDeadlineFunction(), new DispatchEvent());
+        }
+    }
+
+    private void scheduleDispatch(long delayNs) {
+        if (log.isTraceEnabled()) {
+            log.debug("Scheduling dispatch in {}ns", delayNs);
+        }
+        eventQueue.enqueue(EventQueue.EventInsertionType.DEFERRED, 
DispatchEvent.TAG,
+                new EventQueue.LatestDeadlineFunction(time.nanoseconds() + 
delayNs), new DispatchEvent());
+    }
+
+    private void requeueAllAfterFailure() {
+        if (inflight != null) {
+            log.debug("Re-queueing all in-flight assignments after failure");
+            for (AssignmentEvent event : inflight.values()) {
+                pending.put(event.partition, event);
+            }
+            inflight = null;
+            ++failedAttempts;
+            long backoffNs = 
TimeUnit.MILLISECONDS.toNanos(resendExponentialBackoff.backoff(failedAttempts));
+            scheduleDispatch(DISPATCH_INTERVAL_NS + backoffNs);
+        }
+    }
+
+    private static boolean responseIsError(ClientResponse response) {
+        if (response == null) {
+            log.debug("Response is null");
+            return true;
+        }
+        if (response.authenticationException() != null) {
+            log.error("Failed to propagate directory assignments because 
authentication failed", response.authenticationException());
+            return true;
+        }
+        if (response.versionMismatch() != null) {
+            log.error("Failed to propagate directory assignments because the 
request version is unsupported", response.versionMismatch());
+            return true;
+        }
+        if (response.wasDisconnected()) {
+            log.error("Failed to propagate directory assignments because the 
connection to the controller was disconnected");
+            return true;
+        }
+        if (response.wasTimedOut()) {
+            log.error("Failed to propagate directory assignments because the 
request timed out");
+            return true;
+        }
+        if (response.responseBody() == null) {
+            log.error("Failed to propagate directory assignments because the 
Controller returned an empty response");
+            return true;
+        }
+        if (!(response.responseBody() instanceof 
AssignReplicasToDirsResponse)) {
+            log.error("Failed to propagate directory assignments because the 
Controller returned an invalid response type");
+            return true;
+        }
+        AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
+        Errors error = Errors.forCode(data.errorCode());
+        if (error != Errors.NONE) {
+            log.error("Failed to propagate directory assignments because the 
Controller returned error {}", error.name());
+            return true;
+        }
+        return false;
+    }
+
+    private static Set<AssignmentEvent> filterFailures(
+            AssignReplicasToDirsResponseData data,
+            Map<TopicIdPartition, AssignmentEvent> sent) {
+        Set<AssignmentEvent> failures = new HashSet<>();
+        Set<TopicIdPartition> acknowledged = new HashSet<>();
+        for (AssignReplicasToDirsResponseData.DirectoryData directory : 
data.directories()) {
+            for (AssignReplicasToDirsResponseData.TopicData topic : 
directory.topics()) {
+                for (AssignReplicasToDirsResponseData.PartitionData partition 
: topic.partitions()) {
+                    TopicIdPartition topicPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+                    AssignmentEvent event = sent.get(topicPartition);
+                    if (event == null) {
+                        log.error("AssignReplicasToDirsResponse contains 
unexpected partition {} into directory {}", partition, directory.id());
+                    } else {
+                        acknowledged.add(topicPartition);
+                        Errors error = Errors.forCode(partition.errorCode());
+                        if (error != Errors.NONE) {
+                            log.error("Controller returned error {} for 
assignment of partition {} into directory {}",
+                                    error.name(), partition, event.dirId);
+                            failures.add(event);
+                        }
+                    }
+                }
+            }
+        }
+        for (AssignmentEvent event : sent.values()) {
+            if (!acknowledged.contains(event.partition)) {
+                log.error("AssignReplicasToDirsResponse is missing assignment 
of partition {} into directory {}", event.partition, event.dirId);
+                failures.add(event);
+            }
+        }
+        return failures;
+    }
+
+    // visible for testing
+    static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long 
brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
+        Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
+        Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
+        for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
+            TopicIdPartition topicPartition = entry.getKey();
+            Uuid directoryId = entry.getValue();
+            DirectoryData directory = 
directoryMap.computeIfAbsent(directoryId, d -> new 
DirectoryData().setId(directoryId));
+            TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new 
HashMap<>())
+                    .computeIfAbsent(topicPartition.topicId(), topicId -> {
+                        TopicData data = new TopicData().setTopicId(topicId);
+                        directory.topics().add(data);
+                        return data;
+                    });
+            PartitionData partition = new 
PartitionData().setPartitionIndex(topicPartition.partitionId());
+            topic.partitions().add(partition);
+        }
+        return new AssignReplicasToDirsRequestData()
+                .setBrokerId(brokerId)
+                .setBrokerEpoch(brokerEpoch)
+                .setDirectories(new ArrayList<>(directoryMap.values()));
+    }
+}
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 2566e4bcfcb..6859900100e 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -35,6 +35,7 @@ import kafka.log.remote.RemoteLogManager;
 import kafka.zk.KafkaZkClient;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.DirectoryEventHandler;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
 import org.apache.kafka.server.util.Scheduler;
 import scala.compat.java8.OptionConverters;
@@ -66,6 +67,7 @@ public class ReplicaManagerBuilder {
     private Optional<String> threadNamePrefix = Optional.empty();
     private Long brokerEpoch = -1L;
     private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = 
Optional.empty();
+    private DirectoryEventHandler directoryEventHandler = 
DirectoryEventHandler.NOOP;
 
     public ReplicaManagerBuilder setConfig(KafkaConfig config) {
         this.config = config;
@@ -172,6 +174,11 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
+    public ReplicaManagerBuilder 
setDirectoryEventHandler(DirectoryEventHandler directoryEventHandler) {
+        this.directoryEventHandler = directoryEventHandler;
+        return this;
+    }
+
     public ReplicaManager build() {
         if (config == null) config = new KafkaConfig(Collections.emptyMap());
         if (metrics == null) metrics = new Metrics();
@@ -200,6 +207,7 @@ public class ReplicaManagerBuilder {
                              
OptionConverters.toScala(delayedRemoteFetchPurgatory),
                              OptionConverters.toScala(threadNamePrefix),
                              () -> brokerEpoch,
-                             
OptionConverters.toScala(addPartitionsToTxnManager));
+                             
OptionConverters.toScala(addPartitionsToTxnManager),
+                             directoryEventHandler);
     }
 }
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 251f198dc57..abb69926eb3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -85,6 +85,7 @@ trait AlterPartitionListener {
   def markIsrExpand(): Unit
   def markIsrShrink(): Unit
   def markFailed(): Unit
+  def assignDir(dir: String): Unit
 }
 
 class DelayedOperations(topicPartition: TopicPartition,
@@ -119,6 +120,10 @@ object Partition {
       }
 
       override def markFailed(): Unit = 
replicaManager.failedIsrUpdatesRate.mark()
+
+      override def assignDir(dir: String): Unit = {
+        replicaManager.maybeNotifyPartitionAssignedToDirectory(topicPartition, 
dir)
+      }
     }
 
     val delayedOperations = new DelayedOperations(
@@ -480,6 +485,7 @@ class Partition(val topicPartition: TopicPartition,
       if (!isFutureReplica) log.setLogOffsetsListener(logOffsetsListener)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      alterPartitionListener.assignDir(log.parentDir)
       log
     } finally {
       logManager.finishedInitializingLog(topicPartition, maybeLog)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e34fe4c89c9..045429c8436 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{ClusterResource, KafkaException, 
TopicPartition}
+import org.apache.kafka.common.{ClusterResource, KafkaException, 
TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group
 import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
 import org.apache.kafka.coordinator.group.util.SystemTimerReaper
@@ -43,7 +43,7 @@ import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, TopicIdPartition}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
@@ -85,6 +85,8 @@ class BrokerServer(
 
   @volatile var lifecycleManager: BrokerLifecycleManager = _
 
+  var assignmentsManager: AssignmentsManager = _
+
   private val isShuttingDown = new AtomicBoolean(false)
 
   val lock = new ReentrantLock()
@@ -277,6 +279,28 @@ class BrokerServer(
         time
       )
 
+      val assignmentsChannelManager = NodeToControllerChannelManager(
+        controllerNodeProvider,
+        time,
+        metrics,
+        config,
+        "directory-assignments",
+        s"broker-${config.nodeId}-",
+        retryTimeoutMs = 60000
+      )
+      assignmentsManager = new AssignmentsManager(
+        time,
+        assignmentsChannelManager,
+        config.brokerId,
+        () => lifecycleManager.brokerEpoch
+      )
+      val directoryEventHandler = new DirectoryEventHandler {
+        override def handleAssignment(partition: TopicIdPartition, 
directoryId: Uuid): Unit =
+          assignmentsManager.onAssignment(partition, directoryId)
+        override def handleFailure(directoryId: Uuid): Unit =
+          lifecycleManager.propagateDirectoryFailure(directoryId)
+      }
+
       this._replicaManager = new ReplicaManager(
         config = config,
         metrics = metrics,
@@ -294,7 +318,8 @@ class BrokerServer(
         threadNamePrefix = None, // The ReplicaManager only runs on the 
broker, and already includes the ID in thread names.
         delayedRemoteFetchPurgatoryParam = None,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
-        addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
+        addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
+        directoryEventHandler = directoryEventHandler
       )
 
       /* start token manager */
@@ -648,6 +673,9 @@ class BrokerServer(
       if (tokenManager != null)
         CoreUtils.swallow(tokenManager.shutdown(), this)
 
+      if (assignmentsManager != null)
+        CoreUtils.swallow(assignmentsManager.close(), this)
+
       if (replicaManager != null)
         CoreUtils.swallow(replicaManager.shutdown(), this)
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3b584f54621..dfd50fdec6c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, 
FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, 
IsrShrinksPerSecMetricName, LeaderCountMetricName, 
OfflineReplicaCountMetricName, PartitionCountMetricName, 
PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, 
ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, 
UnderReplicatedPartitionsMetricName}
 import kafka.server.ReplicaManager.createLogReadResult
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, 
OffsetCheckpoints}
-import kafka.server.metadata.ZkMetadataCache
+import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
 import kafka.utils.Implicits._
 import kafka.utils._
 import kafka.zk.KafkaZkClient
@@ -55,6 +55,8 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
+import org.apache.kafka.server.common
+import org.apache.kafka.server.common.DirectoryEventHandler
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
@@ -269,7 +271,8 @@ class ReplicaManager(val config: KafkaConfig,
                      delayedRemoteFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
                      threadNamePrefix: Option[String] = None,
                      val brokerEpochSupplier: () => Long = () => -1,
-                     addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None
+                     addPartitionsToTxnManager: 
Option[AddPartitionsToTxnManager] = None,
+                     directoryEventHandler: DirectoryEventHandler = 
DirectoryEventHandler.NOOP
                      ) extends Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
@@ -2292,9 +2295,9 @@ class ReplicaManager(val config: KafkaConfig,
    * The log directory failure handler for the replica
    *
    * @param dir                     the absolute path of the log directory
-   * @param sendZkNotification      check if we need to send notification to 
zookeeper node (needed for unit test)
+   * @param notifyController        check if we need to send notification to 
the Controller (needed for unit test)
    */
-  def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): 
Unit = {
+  def handleLogDirFailure(dir: String, notifyController: Boolean = true): Unit 
= {
     if (!logManager.isLogDirOnline(dir))
       return
     warn(s"Stopping serving replicas in dir $dir")
@@ -2323,16 +2326,57 @@ class ReplicaManager(val config: KafkaConfig,
            s"for partitions 
${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the 
failed log directory $dir.")
     }
     logManager.handleLogDirFailure(dir)
+    if (dir == config.metadataLogDir) {
+      fatal(s"Shutdown broker because the metadata log dir $dir has failed")
+      Exit.halt(1)
+    }
 
-    if (sendZkNotification)
+    if (notifyController) {
+      if (config.migrationEnabled) {
+        fatal(s"Shutdown broker because some log directory has failed during 
migration mode: $dir")
+        Exit.halt(1)
+      }
       if (zkClient.isEmpty) {
-        warn("Unable to propagate log dir failure via Zookeeper in KRaft mode")
+        val uuid = logManager.directoryId(dir)
+        if (uuid.isDefined) {
+          directoryEventHandler.handleFailure(uuid.get)
+        } else {
+          fatal(s"Unable to propagate directory failure disabled because 
directory $dir has no UUID")
+          Exit.halt(1)
+        }
       } else {
         zkClient.get.propagateLogDirEvent(localBrokerId)
       }
+    }
     warn(s"Stopped serving replicas in dir $dir")
   }
 
+  /**
+   * Called when a topic partition is placed in a log directory.
+   * If a directory event listener is configured,
+   * and if the selected log directory has an assigned Uuid,
+   * then the listener is notified of the assignment.
+   */
+  def maybeNotifyPartitionAssignedToDirectory(tp: TopicPartition, dir: 
String): Unit = {
+    if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
+      logManager.directoryId(dir) match {
+        case None => throw new IllegalStateException(s"Assignment into 
unidentified directory: ${dir}")
+        case Some(dirId) =>
+          getPartition(tp) match {
+            case HostedPartition.Offline | HostedPartition.None =>
+              throw new IllegalStateException("Assignment for a partition that 
is not online")
+            case HostedPartition.Online(partition) => partition.topicId match {
+              case None =>
+                throw new IllegalStateException(s"Assignment for topic without 
ID: ${tp.topic()}")
+              case Some(topicId) =>
+                val topicIdPartition = new common.TopicIdPartition(topicId, 
tp.partition())
+                directoryEventHandler.handleAssignment(topicIdPartition, dirId)
+            }
+          }
+        }
+    }
+  }
+
   def removeMetrics(): Unit = {
     ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric)
   }
diff --git a/core/src/test/java/kafka/server/AssignmentsManagerTest.java 
b/core/src/test/java/kafka/server/AssignmentsManagerTest.java
new file mode 100644
index 00000000000..54de654960c
--- /dev/null
+++ b/core/src/test/java/kafka/server/AssignmentsManagerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
+import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class AssignmentsManagerTest {
+
+    private static final Uuid TOPIC_1 = 
Uuid.fromString("88rnFIqYSZykX4ZSKv81bg");
+    private static final Uuid TOPIC_2 = 
Uuid.fromString("VKCnzHdhR5uDQc1shqBYrQ");
+    private static final Uuid DIR_1 = 
Uuid.fromString("cbgD8WdLQCyzLrFIMBhv3w");
+    private static final Uuid DIR_2 = 
Uuid.fromString("zO0bDc0vSuam7Db9iH7rYQ");
+    private static final Uuid DIR_3 = 
Uuid.fromString("CGBWbrFkRkeJQy6Aryzq2Q");
+
+    private MockTime time;
+    private NodeToControllerChannelManager channelManager;
+    private AssignmentsManager manager;
+
+    @BeforeEach
+    public void setup() {
+        time = new MockTime();
+        channelManager = mock(NodeToControllerChannelManager.class);
+        manager = new AssignmentsManager(time, channelManager, 8, () -> 100L);
+    }
+
+    @AfterEach
+    void tearDown() throws InterruptedException {
+        manager.close();
+    }
+
+    @Test
+    void testBuildRequestData() {
+        Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, 
Uuid>() {{
+                put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+                put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+                put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+                put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+                put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+            }};
+        AssignReplicasToDirsRequestData built = 
AssignmentsManager.buildRequestData(8, 100L, assignment);
+        AssignReplicasToDirsRequestData expected = new 
AssignReplicasToDirsRequestData()
+                .setBrokerId(8)
+                .setBrokerEpoch(100L)
+                .setDirectories(Arrays.asList(
+                        new AssignReplicasToDirsRequestData.DirectoryData()
+                                .setId(DIR_2)
+                                .setTopics(Arrays.asList(
+                                        new 
AssignReplicasToDirsRequestData.TopicData()
+                                                .setTopicId(TOPIC_1)
+                                                
.setPartitions(Collections.singletonList(
+                                                        new 
AssignReplicasToDirsRequestData.PartitionData()
+                                                                
.setPartitionIndex(2)
+                                                )),
+                                        new 
AssignReplicasToDirsRequestData.TopicData()
+                                                .setTopicId(TOPIC_2)
+                                                
.setPartitions(Collections.singletonList(
+                                                        new 
AssignReplicasToDirsRequestData.PartitionData()
+                                                                
.setPartitionIndex(5)
+                                                ))
+                                )),
+                        new AssignReplicasToDirsRequestData.DirectoryData()
+                                .setId(DIR_3)
+                                .setTopics(Collections.singletonList(
+                                        new 
AssignReplicasToDirsRequestData.TopicData()
+                                                .setTopicId(TOPIC_1)
+                                                
.setPartitions(Collections.singletonList(
+                                                        new 
AssignReplicasToDirsRequestData.PartitionData()
+                                                                
.setPartitionIndex(3)
+                                                ))
+                                )),
+                        new AssignReplicasToDirsRequestData.DirectoryData()
+                                .setId(DIR_1)
+                                .setTopics(Collections.singletonList(
+                                        new 
AssignReplicasToDirsRequestData.TopicData()
+                                                .setTopicId(TOPIC_1)
+                                                .setPartitions(Arrays.asList(
+                                                        new 
AssignReplicasToDirsRequestData.PartitionData()
+                                                                
.setPartitionIndex(4),
+                                                        new 
AssignReplicasToDirsRequestData.PartitionData()
+                                                                
.setPartitionIndex(1)
+                                                ))
+                                ))
+                ));
+        assertEquals(expected, built);
+    }
+
+    @Test
+    public void testAssignmentAggregation() throws InterruptedException {
+        CountDownLatch readyToAssert = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            if (readyToAssert.getCount() > 0) {
+                readyToAssert.countDown();
+            }
+            return null;
+        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
 any(ControllerRequestCompletionHandler.class));
+
+        manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+        manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+        manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+        manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+        manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+        while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
+            time.sleep(100);
+            manager.wakeup();
+        }
+
+        ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor = 
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
+        verify(channelManager).sendRequest(captor.capture(), 
any(ControllerRequestCompletionHandler.class));
+        verifyNoMoreInteractions(channelManager);
+        assertEquals(1, captor.getAllValues().size());
+        AssignReplicasToDirsRequestData actual = 
captor.getValue().build().data();
+        AssignReplicasToDirsRequestData expected = 
AssignmentsManager.buildRequestData(
+                8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+                        put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+                        put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
+                        put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
+                        put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
+                        put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
+                    }}
+        );
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    void testRequeuesFailedAssignmentPropagations() throws 
InterruptedException {
+        CountDownLatch readyToAssert = new CountDownLatch(5);
+        doAnswer(invocation -> {
+            if (readyToAssert.getCount() > 0) {
+                readyToAssert.countDown();
+            }
+            if (readyToAssert.getCount() == 4) {
+                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onTimeout();
+                manager.onAssignment(new TopicIdPartition(TOPIC_1, 2), DIR_3);
+            }
+            if (readyToAssert.getCount() == 3) {
+                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onComplete(
+                        new ClientResponse(null, null, null, 0L, 0L, false, 
false,
+                                new UnsupportedVersionException("test 
unsupported version exception"), null, null)
+                );
+                manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), 
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
+            }
+            if (readyToAssert.getCount() == 2) {
+                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onComplete(
+                        new ClientResponse(null, null, null, 0L, 0L, false, 
false, null,
+                                new AuthenticationException("test 
authentication exception"), null)
+                );
+                manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), 
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
+            }
+            if (readyToAssert.getCount() == 1) {
+                invocation.getArgument(1, 
ControllerRequestCompletionHandler.class).onComplete(
+                        new ClientResponse(null, null, null, 0L, 0L, false, 
false, null, null,
+                                new AssignReplicasToDirsResponse(new 
AssignReplicasToDirsResponseData()
+                                        
.setErrorCode(Errors.NOT_CONTROLLER.code())
+                                        .setThrottleTimeMs(0)))
+                );
+            }
+            return null;
+        
}).when(channelManager).sendRequest(any(AssignReplicasToDirsRequest.Builder.class),
 any(ControllerRequestCompletionHandler.class));
+
+        manager.onAssignment(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+        while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) {
+            time.sleep(TimeUnit.SECONDS.toMillis(1));
+            manager.wakeup();
+        }
+
+        ArgumentCaptor<AssignReplicasToDirsRequest.Builder> captor = 
ArgumentCaptor.forClass(AssignReplicasToDirsRequest.Builder.class);
+        verify(channelManager, times(5)).sendRequest(captor.capture(), 
any(ControllerRequestCompletionHandler.class));
+        verifyNoMoreInteractions(channelManager);
+        assertEquals(5, captor.getAllValues().size());
+        assertEquals(AssignmentsManager.buildRequestData(
+                8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+                        put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+                    }}
+        ), captor.getAllValues().get(0).build().data());
+        assertEquals(AssignmentsManager.buildRequestData(
+                8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+                        put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+                        put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
+                    }}
+        ), captor.getAllValues().get(1).build().data());
+        assertEquals(AssignmentsManager.buildRequestData(
+                8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
+                        put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
+                        put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
+                        put(new TopicIdPartition(TOPIC_1, 3), 
Uuid.fromString("xHLCnG54R9W3lZxTPnpk1Q"));
+                        put(new TopicIdPartition(TOPIC_1, 4), 
Uuid.fromString("RCYu1A0CTa6eEIpuKDOfxw"));
+                    }}
+        ), captor.getAllValues().get(4).build().data());
+    }
+}
\ No newline at end of file
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 19836ac299c..70ae654b6e6 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -17,17 +17,16 @@
 
 package kafka.server
 
-import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.io.File
 import java.net.InetSocketAddress
 import java.util
 import java.util.{Collections, Optional, OptionalInt, Properties}
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import javax.security.auth.login.Configuration
-import kafka.tools.StorageTool
 import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
 import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.{DirectoryId, Uuid}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Exit, Time}
@@ -110,6 +109,7 @@ class KRaftQuorumImplementation(
           setVersion(MetaPropertiesVersion.V1).
           setClusterId(clusterId).
           setNodeId(config.nodeId).
+          setDirectoryId(DirectoryId.random()).
           build())
       })
       copier.setPreWriteHandler((logDir, _, _) => {
@@ -300,25 +300,6 @@ abstract class QuorumTestHarness extends Logging {
 
   def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
 
-  private def formatDirectories(directories: immutable.Seq[String],
-                                metaProperties: MetaProperties): Unit = {
-    val stream = new ByteArrayOutputStream()
-    var out: PrintStream = null
-    try {
-      out = new PrintStream(stream)
-      val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(metadataVersion,
-                                                                 
optionalMetadataRecords, "format command")
-      if (StorageTool.formatCommand(out, directories, metaProperties, 
bootstrapMetadata, metadataVersion,
-                                    ignoreFormatted = false) != 0) {
-        throw new RuntimeException(stream.toString())
-      }
-      debug(s"Formatted storage directory(ies) ${directories}")
-    } finally {
-      if (out != null) out.close()
-      stream.close()
-    }
-  }
-
   private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
     val propsList = kraftControllerConfigs()
     if (propsList.size != 1) {
@@ -337,7 +318,7 @@ abstract class QuorumTestHarness extends Logging {
       setClusterId(Uuid.randomUuid().toString).
       setNodeId(nodeId).
       build()
-    formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), 
metaProperties)
+    TestUtils.formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), 
metaProperties, metadataVersion, optionalMetadataRecords)
 
     val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
     metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 00d7d61f5e4..330ea2c0c78 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -38,6 +38,8 @@ import org.apache.kafka.common.{IsolationLevel, 
TopicIdPartition, TopicPartition
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesVersion}
+import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, 
FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
 import org.junit.jupiter.api.Assertions._
@@ -147,6 +149,12 @@ class ReplicaManagerConcurrencyTest {
     metadataCache: MetadataCache,
   ): ReplicaManager = {
     val logDir = TestUtils.tempDir()
+    val metaProperties = new MetaProperties.Builder().
+      setVersion(MetaPropertiesVersion.V1).
+      setClusterId(Uuid.randomUuid().toString).
+      setNodeId(1).
+      build()
+    TestUtils.formatDirectories(Seq(logDir.getAbsolutePath), metaProperties, 
MetadataVersion.latest(), None)
 
     val props = new Properties
     props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345")
diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
index d509a511456..aa9aadc22a7 100644
--- a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala
@@ -46,7 +46,7 @@ class StopReplicaRequestTest extends BaseRequestTest {
 
     val server = servers.head
     val offlineDir = server.logManager.getLog(tp1).get.dir.getParent
-    server.replicaManager.handleLogDirFailure(offlineDir, sendZkNotification = 
false)
+    server.replicaManager.handleLogDirFailure(offlineDir, notifyController = 
false)
 
     val topicStates = Seq(
       new StopReplicaTopicState()
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index be859f8a17a..2e9ce677add 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
 import java.util.concurrent.{Callable, CompletableFuture, ExecutionException, 
Executors, TimeUnit}
 import java.util.{Arrays, Collections, Optional, Properties}
 import com.yammer.metrics.core.{Gauge, Histogram, Meter}
@@ -39,6 +39,7 @@ import kafka.network.RequestChannel
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
+import kafka.tools.StorageTool
 import kafka.utils.Implicits._
 import kafka.zk._
 import org.apache.kafka.admin.BrokerMetadata
@@ -69,8 +70,9 @@ import 
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.controller.QuorumController
+import org.apache.kafka.metadata.properties.MetaProperties
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -84,7 +86,7 @@ import org.mockito.Mockito
 
 import scala.annotation.nowarn
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.collection.{Map, Seq, mutable}
+import scala.collection.{Map, Seq, immutable, mutable}
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.jdk.CollectionConverters._
@@ -1410,6 +1412,27 @@ object TestUtils extends Logging {
     }.mkString("\n")
   }
 
+  def formatDirectories(
+    directories: immutable.Seq[String],
+    metaProperties: MetaProperties,
+    metadataVersion: MetadataVersion,
+    optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]]
+  ): Unit = {
+    val stream = new ByteArrayOutputStream()
+    var out: PrintStream = null
+    try {
+      out = new PrintStream(stream)
+      val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(metadataVersion, optionalMetadataRecords, 
"format command")
+      if (StorageTool.formatCommand(out, directories, metaProperties, 
bootstrapMetadata, metadataVersion, ignoreFormatted = false) != 0) {
+        throw new RuntimeException(stream.toString())
+      }
+      debug(s"Formatted storage directory(ies) ${directories}")
+    } finally {
+      if (out != null) out.close()
+      stream.close()
+    }
+  }
+
   /**
    * Create new LogManager instance with default configuration for testing
    */
@@ -1504,6 +1527,7 @@ object TestUtils extends Logging {
     val expands: AtomicInteger = new AtomicInteger(0)
     val shrinks: AtomicInteger = new AtomicInteger(0)
     val failures: AtomicInteger = new AtomicInteger(0)
+    val directory: AtomicReference[String] = new AtomicReference[String]()
 
     override def markIsrExpand(): Unit = expands.incrementAndGet()
 
@@ -1511,10 +1535,13 @@ object TestUtils extends Logging {
 
     override def markFailed(): Unit = failures.incrementAndGet()
 
+    override def assignDir(dir: String): Unit = directory.set(dir)
+
     def reset(): Unit = {
       expands.set(0)
       shrinks.set(0)
       failures.set(0)
+      directory.set(null)
     }
   }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
index f79515b8a10..cd2f43cc3e5 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
@@ -189,7 +189,7 @@ public class ControllerRegistration {
                 setMaxSupportedVersion(entry.getValue().max()));
         }
         return new ApiMessageAndVersion(registrationRecord,
-            options.metadataVersion().registerBrokerRecordVersion());
+            options.metadataVersion().registerControllerRecordVersion());
     }
 
     @Override
diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java 
b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
index df1fd0987cb..72d8026824a 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
@@ -107,6 +107,25 @@ public interface EventQueue extends AutoCloseable {
         }
     }
 
+    class LatestDeadlineFunction implements Function<OptionalLong, 
OptionalLong> {
+        private final long newDeadlineNs;
+
+        public LatestDeadlineFunction(long newDeadlineNs) {
+            this.newDeadlineNs = newDeadlineNs;
+        }
+
+        @Override
+        public OptionalLong apply(OptionalLong prevDeadlineNs) {
+            if (!prevDeadlineNs.isPresent()) {
+                return OptionalLong.of(newDeadlineNs);
+            } else if (prevDeadlineNs.getAsLong() > newDeadlineNs) {
+                return prevDeadlineNs;
+            } else {
+                return OptionalLong.of(newDeadlineNs);
+            }
+        }
+    }
+
     class VoidEvent implements Event {
         public final static VoidEvent INSTANCE = new VoidEvent();
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
new file mode 100644
index 00000000000..e4bfe6781f0
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.Uuid;
+
+public interface DirectoryEventHandler {
+
+    /**
+     * A no-op implementation of {@link DirectoryEventHandler}.
+     */
+    DirectoryEventHandler NOOP = new DirectoryEventHandler() {
+        @Override public void handleAssignment(TopicIdPartition partition, 
Uuid directoryId) {}
+        @Override public void handleFailure(Uuid directoryId) {}
+    };
+
+    /**
+     * Handle the assignment of a topic partition to a directory.
+     * @param directoryId  The directory ID
+     * @param partition    The topic partition
+     */
+    void handleAssignment(TopicIdPartition partition, Uuid directoryId);
+
+    /**
+     * Handle the transition of an online log directory to the offline state.
+     * @param directoryId  The directory ID
+     */
+    void handleFailure(Uuid directoryId);
+}

Reply via email to