This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27647c7c7c7 MINOR: Remove the MetaLogShim namings (#20357)
27647c7c7c7 is described below

commit 27647c7c7c784f70979f6b392fb4ae60ce1c6c46
Author: OuO <mapan0...@gmail.com>
AuthorDate: Sat Aug 16 02:02:56 2025 +0800

    MINOR: Remove the MetaLogShim namings (#20357)
    
    Correct parameter name from `logManager` to `raftClient` (leftover from
    PR #10705)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 checkstyle/import-control-metadata.xml             |  16 ---
 .../apache/kafka/controller/QuorumController.java  |   6 +-
 .../MockRaftClient.java}                           |  82 ++++++-------
 .../MockRaftClientListener.java}                   |   6 +-
 .../MockRaftClientTest.java}                       |  57 ++++-----
 .../MockRaftClientTestEnv.java}                    |  80 ++++++-------
 .../QuorumControllerMetricsIntegrationTest.java    |  19 ++-
 .../kafka/controller/QuorumControllerTest.java     | 132 ++++++++++-----------
 .../kafka/controller/QuorumControllerTestEnv.java  |  23 ++--
 9 files changed, 201 insertions(+), 220 deletions(-)

diff --git a/checkstyle/import-control-metadata.xml 
b/checkstyle/import-control-metadata.xml
index c2660674e32..293801bd75f 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -83,7 +83,6 @@
         <allow pkg="org.apache.kafka.metadata" />
         <allow pkg="org.apache.kafka.metadata.authorizer" />
         <allow pkg="org.apache.kafka.metadata.migration" />
-        <allow pkg="org.apache.kafka.metalog" />
         <allow pkg="org.apache.kafka.deferred" />
         <allow pkg="org.apache.kafka.queue" />
         <allow pkg="org.apache.kafka.raft" />
@@ -160,7 +159,6 @@
         <allow pkg="org.apache.kafka.common.requests" />
         <allow pkg="org.apache.kafka.image" />
         <allow pkg="org.apache.kafka.metadata" />
-        <allow pkg="org.apache.kafka.metalog" />
         <allow pkg="org.apache.kafka.queue" />
         <allow pkg="org.apache.kafka.raft" />
         <allow pkg="org.apache.kafka.server.authorizer" />
@@ -198,18 +196,4 @@
         </subpackage>
     </subpackage>
 
-    <subpackage name="metalog">
-        <allow class="org.apache.kafka.common.compress.Compression" 
exact-match="true" />
-        <allow pkg="org.apache.kafka.common.metadata" />
-        <allow pkg="org.apache.kafka.common.protocol" />
-        <allow pkg="org.apache.kafka.common.record" />
-        <allow pkg="org.apache.kafka.metadata" />
-        <allow pkg="org.apache.kafka.metalog" />
-        <allow pkg="org.apache.kafka.raft" />
-        <allow pkg="org.apache.kafka.snapshot" />
-        <allow pkg="org.apache.kafka.queue" />
-        <allow pkg="org.apache.kafka.server.common" />
-        <allow pkg="org.apache.kafka.test" />
-    </subpackage>
-
 </import-control>
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 5f4d6142434..3e1dd69723b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -260,8 +260,8 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> 
logManager) {
-            this.raftClient = logManager;
+        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> 
raftClient) {
+            this.raftClient = raftClient;
             return this;
         }
 
@@ -1082,7 +1082,7 @@ public final class QuorumController implements Controller 
{
 
         @Override
         public void beginShutdown() {
-            queue.beginShutdown("MetaLogManager.Listener");
+            queue.beginShutdown("QuorumMetaLogListener");
         }
 
         private void appendRaftEvent(String name, Runnable runnable) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
similarity index 91%
rename from metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
rename to metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
index 013d5945d2e..21e3a645530 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
 
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.utils.BufferSupplier;
@@ -66,9 +66,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * The LocalLogManager is a test implementation that relies on the contents of 
memory.
+ * The MockRaftClient is a test implementation that relies on the contents of 
memory.
  */
-public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>, AutoCloseable {
+public final class MockRaftClient implements RaftClient<ApiMessageAndVersion>, 
AutoCloseable {
     interface LocalBatch {
         int epoch();
         int size();
@@ -158,9 +158,9 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         private static final Logger log = 
LoggerFactory.getLogger(SharedLogData.class);
 
         /**
-         * Maps node IDs to the matching log managers.
+         * Maps node IDs to the matching raft clients.
          */
-        private final HashMap<Integer, LocalLogManager> logManagers = new 
HashMap<>();
+        private final HashMap<Integer, MockRaftClient> raftClients = new 
HashMap<>();
 
         /**
          * Maps offsets to record batches.
@@ -198,17 +198,17 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             }
         }
 
-        synchronized void registerLogManager(LocalLogManager logManager) {
-            if (logManagers.put(logManager.nodeId, logManager) != null) {
-                throw new RuntimeException("Can't have multiple 
LocalLogManagers " +
-                    "with id " + logManager.nodeId());
+        synchronized void registerRaftClient(MockRaftClient raftClient) {
+            if (raftClients.put(raftClient.nodeId, raftClient) != null) {
+                throw new RuntimeException("Can't have multiple 
MockRaftClients " +
+                    "with id " + raftClient.nodeId());
             }
             electLeaderIfNeeded();
         }
 
-        synchronized void unregisterLogManager(LocalLogManager logManager) {
-            if (!logManagers.remove(logManager.nodeId, logManager)) {
-                throw new RuntimeException("Log manager " + 
logManager.nodeId() +
+        synchronized void unregisterRaftClient(MockRaftClient raftClient) {
+            if (!raftClients.remove(raftClient.nodeId, raftClient)) {
+                throw new RuntimeException("MockRaftClient " + 
raftClient.nodeId() +
                     " was not found.");
             }
         }
@@ -259,19 +259,19 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             if (batch instanceof LeaderChangeBatch leaderChangeBatch) {
                 leader = leaderChangeBatch.newLeader;
             }
-            for (LocalLogManager logManager : logManagers.values()) {
-                logManager.scheduleLogCheck();
+            for (MockRaftClient raftClient : raftClients.values()) {
+                raftClient.scheduleLogCheck();
             }
             prevOffset = nextEndOffset;
             return nextEndOffset;
         }
 
         synchronized void electLeaderIfNeeded() {
-            if (leader.leaderId().isPresent() || logManagers.isEmpty()) {
+            if (leader.leaderId().isPresent() || raftClients.isEmpty()) {
                 return;
             }
-            int nextLeaderIndex = 
ThreadLocalRandom.current().nextInt(logManagers.size());
-            Iterator<Integer> iter = logManagers.keySet().iterator();
+            int nextLeaderIndex = 
ThreadLocalRandom.current().nextInt(raftClients.size());
+            Iterator<Integer> iter = raftClients.keySet().iterator();
             Integer nextLeaderNode = null;
             for (int i = 0; i <= nextLeaderIndex; i++) {
                 nextLeaderNode = iter.next();
@@ -294,7 +294,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         }
 
         /**
-         * Optionally return a snapshot reader if the offset if less than the 
first batch.
+         * Optionally return a snapshot reader if the offset is less than the 
first batch.
          */
         synchronized Optional<RawSnapshotReader> nextSnapshot(long offset) {
             return Optional.ofNullable(snapshots.lastEntry()).flatMap(entry -> 
{
@@ -325,7 +325,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         /**
          * Returns the snapshot id of the latest snapshot if there is one.
          *
-         * If a snapshot doesn't exists, it return an empty Optional.
+         * If a snapshot doesn't exist, it returns an empty Optional.
          */
         synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
             return Optional.ofNullable(snapshots.lastEntry()).map(entry -> 
entry.getValue().snapshotId());
@@ -400,65 +400,65 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     private final Logger log;
 
     /**
-     * The node ID of this local log manager. Each log manager must have a 
unique ID.
+     * The node ID of this raft client. Each raft client must have a unique ID.
      */
     private final int nodeId;
 
     /**
-     * A reference to the in-memory state that unites all the log managers in 
use.
+     * A reference to the in-memory state that unites all the raft clients in 
use.
      */
     private final SharedLogData shared;
 
     /**
-     * The event queue used by this local log manager.
+     * The event queue used by this raft client.
      */
     private final EventQueue eventQueue;
 
     /**
-     * The latest kraft version used by this local log manager.
+     * The latest kraft version used by this raft client.
      */
     private KRaftVersion lastKRaftVersion;
 
     /**
-     * Whether this LocalLogManager has been shut down.
+     * Whether this raft client has been shut down.
      */
     private boolean shutdown = false;
 
     /**
-     * An offset that the log manager will not read beyond. This exists only 
for testing
+     * An offset that the raft client will not read beyond. This exists only 
for testing
      * purposes.
      */
     private long maxReadOffset;
 
     /**
-     * The listener objects attached to this local log manager.
+     * The listener objects attached to this raft client.
      */
     private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData> 
listeners = new IdentityHashMap<>();
 
     /**
-     * The current leader, as seen by this log manager.
+     * The current leader, as seen by this raft client.
      */
     private volatile LeaderAndEpoch leader = new 
LeaderAndEpoch(OptionalInt.empty(), 0);
 
-    /*
+    /**
      * If this variable is true the next scheduleAppend will fail
      */
     private final AtomicBoolean throwOnNextAppend = new AtomicBoolean(false);
 
-    public LocalLogManager(LogContext logContext,
-                           int nodeId,
-                           SharedLogData shared,
-                           String threadNamePrefix,
-                           KRaftVersion lastKRaftVersion) {
+    public MockRaftClient(LogContext logContext,
+                          int nodeId,
+                          SharedLogData shared,
+                          String threadNamePrefix,
+                          KRaftVersion lastKRaftVersion) {
         this.logContext = logContext;
-        this.log = logContext.logger(LocalLogManager.class);
+        this.log = logContext.logger(MockRaftClient.class);
         this.nodeId = nodeId;
         this.shared = shared;
         this.maxReadOffset = shared.initialMaxReadOffset();
         this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
                 threadNamePrefix, new ShutdownEvent());
         this.lastKRaftVersion = lastKRaftVersion;
-        shared.registerLogManager(this);
+        this.shared.registerRaftClient(this);
     }
 
     private void scheduleLogCheck() {
@@ -477,7 +477,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                                 listenerData.handleLoadSnapshot(
                                     RecordsSnapshotReader.of(
                                         snapshot.get(),
-                                        new  MetadataRecordSerde(),
+                                        new MetadataRecordSerde(),
                                         BufferSupplier.create(),
                                         Integer.MAX_VALUE,
                                         true,
@@ -556,7 +556,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     }
 
     public void beginShutdown() {
-        eventQueue.beginShutdown("beginShutdown");
+        eventQueue.beginShutdown("MockKafkaRaftClient");
     }
 
     class ShutdownEvent implements EventQueue.Event {
@@ -569,7 +569,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                     for (MetaLogListenerData listenerData : 
listeners.values()) {
                         listenerData.beginShutdown();
                     }
-                    shared.unregisterLogManager(LocalLogManager.this);
+                    shared.unregisterRaftClient(MockRaftClient.this);
                 }
             } catch (Exception e) {
                 log.error("Unexpected exception while sending beginShutdown 
callbacks", e);
@@ -592,7 +592,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     }
 
     /**
-     * Shutdown the log manager.
+     * Shutdown the raft client.
      *
      * Even though the API suggests a non-blocking shutdown, this method 
always returns a completed
      * future. This means that shutdown is a blocking operation.
@@ -614,7 +614,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         CompletableFuture<Void> future = new CompletableFuture<>();
         eventQueue.append(() -> {
             if (shutdown) {
-                log.info("Node {}: can't register because local log manager 
has " +
+                log.info("Node {}: can't register because raft client has " +
                     "already been shut down.", nodeId);
                 future.complete(null);
             } else {
@@ -643,7 +643,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
     public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) 
{
         eventQueue.append(() -> {
             if (shutdown) {
-                log.info("Node {}: can't unregister because local log manager 
is shutdown", nodeId);
+                log.info("Node {}: can't unregister because raft client is 
shutdown", nodeId);
             } else {
                 int id = System.identityHashCode(listener);
                 if (listeners.remove(listener) == null) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
similarity index 95%
rename from 
metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
rename to 
metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
index 53bc49f8075..b4ee74346ee 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
 
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.raft.Batch;
@@ -29,7 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.OptionalInt;
 
-public class MockMetaLogManagerListener implements 
RaftClient.Listener<ApiMessageAndVersion> {
+public class MockRaftClientListener implements 
RaftClient.Listener<ApiMessageAndVersion> {
     public static final String COMMIT = "COMMIT";
     public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
     public static final String NEW_LEADER = "NEW_LEADER";
@@ -41,7 +41,7 @@ public class MockMetaLogManagerListener implements 
RaftClient.Listener<ApiMessag
     private final List<String> serializedEvents = new ArrayList<>();
     private LeaderAndEpoch leaderAndEpoch = new 
LeaderAndEpoch(OptionalInt.empty(), 0);
 
-    public MockMetaLogManagerListener(int nodeId) {
+    public MockRaftClientListener(int nodeId) {
         this.nodeId = nodeId;
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java
similarity index 72%
rename from 
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
rename to 
metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java
index b42f781e985..4217bd4ed26 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
 
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.raft.LeaderAndEpoch;
@@ -28,23 +28,23 @@ import org.junit.jupiter.api.Timeout;
 import java.util.List;
 import java.util.OptionalInt;
 
-import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
-import static 
org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
-import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
+import static org.apache.kafka.controller.MockRaftClientListener.COMMIT;
+import static 
org.apache.kafka.controller.MockRaftClientListener.LAST_COMMITTED_OFFSET;
+import static org.apache.kafka.controller.MockRaftClientListener.SHUTDOWN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 
 @Timeout(value = 40)
-public class LocalLogManagerTest {
+public class MockRaftClientTest {
 
     /**
-     * Test creating a LocalLogManager and closing it.
+     * Test creating a MockRaftClient and closing it.
      */
     @Test
     public void testCreateAndClose() throws Exception {
         try (
-            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(1).
                 buildWithMockListeners()
         ) {
             env.close();
@@ -53,12 +53,12 @@ public class LocalLogManagerTest {
     }
 
     /**
-     * Test that the local log manager will claim leadership.
+     * Test that the raft client will claim leadership.
      */
     @Test
     public void testClaimsLeadership() throws Exception {
         try (
-            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(1).
                     buildWithMockListeners()
         ) {
             assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), 
env.waitForLeader());
@@ -68,12 +68,12 @@ public class LocalLogManagerTest {
     }
 
     /**
-     * Test that we can pass leadership back and forth between log managers.
+     * Test that we can pass leadership back and forth between raft clients.
      */
     @Test
     public void testPassLeadership() throws Exception {
         try (
-            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(3).
                     buildWithMockListeners()
         ) {
             LeaderAndEpoch first = env.waitForLeader();
@@ -82,7 +82,7 @@ public class LocalLogManagerTest {
                 int currentLeaderId = cur.leaderId().orElseThrow(() ->
                     new AssertionError("Current leader is undefined")
                 );
-                env.logManagers().get(currentLeaderId).resign(cur.epoch());
+                env.raftClients().get(currentLeaderId).resign(cur.epoch());
 
                 LeaderAndEpoch next = env.waitForLeader();
                 while (next.epoch() == cur.epoch()) {
@@ -100,10 +100,9 @@ public class LocalLogManagerTest {
     }
 
     private static void waitForLastCommittedOffset(long targetOffset,
-                LocalLogManager logManager) throws InterruptedException {
+                MockRaftClient raftClient) throws InterruptedException {
         TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
-            MockMetaLogManagerListener listener =
-                (MockMetaLogManagerListener) logManager.listeners().get(0);
+            MockRaftClientListener listener = (MockRaftClientListener) 
raftClient.listeners().get(0);
             long highestOffset = -1;
             for (String event : listener.serializedEvents()) {
                 if (event.startsWith(LAST_COMMITTED_OFFSET)) {
@@ -117,19 +116,19 @@ public class LocalLogManagerTest {
                 }
             }
             if (highestOffset < targetOffset) {
-                throw new RuntimeException("Offset for log manager " +
-                    logManager.nodeId() + " only reached " + highestOffset);
+                throw new RuntimeException("Offset for raft client " +
+                    raftClient.nodeId() + " only reached " + highestOffset);
             }
         });
     }
 
     /**
-     * Test that all the log managers see all the commits.
+     * Test that all the raft clients see all the commits.
      */
     @Test
     public void testCommits() throws Exception {
         try (
-            LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(3).
                     buildWithMockListeners()
         ) {
             LeaderAndEpoch leaderInfo = env.waitForLeader();
@@ -137,22 +136,24 @@ public class LocalLogManagerTest {
                 new AssertionError("Current leader is undefined")
             );
 
-            LocalLogManager activeLogManager = env.logManagers().get(leaderId);
-            int epoch = activeLogManager.leaderAndEpoch().epoch();
+            MockRaftClient activeRaftClient = env.raftClients().get(leaderId);
+            int epoch = activeRaftClient.leaderAndEpoch().epoch();
             List<ApiMessageAndVersion> messages = List.of(
                 new ApiMessageAndVersion(new 
RegisterBrokerRecord().setBrokerId(0), (short) 0),
                 new ApiMessageAndVersion(new 
RegisterBrokerRecord().setBrokerId(1), (short) 0),
                 new ApiMessageAndVersion(new 
RegisterBrokerRecord().setBrokerId(2), (short) 0));
-            assertEquals(3, activeLogManager.prepareAppend(epoch, messages));
-            activeLogManager.schedulePreparedAppend();
-            for (LocalLogManager logManager : env.logManagers()) {
-                waitForLastCommittedOffset(3, logManager);
+            assertEquals(3, activeRaftClient.prepareAppend(epoch, messages));
+
+            activeRaftClient.schedulePreparedAppend();
+            for (MockRaftClient raftClient : env.raftClients()) {
+                waitForLastCommittedOffset(3, raftClient);
             }
-            List<MockMetaLogManagerListener> listeners = 
env.logManagers().stream().
-                map(m -> (MockMetaLogManagerListener) m.listeners().get(0)).
+
+            List<MockRaftClientListener> listeners = 
env.raftClients().stream().
+                map(m -> (MockRaftClientListener) m.listeners().get(0)).
                 toList();
             env.close();
-            for (MockMetaLogManagerListener listener : listeners) {
+            for (MockRaftClientListener listener : listeners) {
                 List<String> events = listener.serializedEvents();
                 assertEquals(SHUTDOWN, events.get(events.size() - 1));
                 int foundIndex = 0;
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java
similarity index 73%
rename from 
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
rename to 
metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java
index a1e6742f8ef..6d1e741d1d8 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.metalog.LocalLogManager.LeaderChangeBatch;
-import org.apache.kafka.metalog.LocalLogManager.LocalRecordBatch;
-import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.controller.MockRaftClient.LeaderChangeBatch;
+import org.apache.kafka.controller.MockRaftClient.LocalRecordBatch;
+import org.apache.kafka.controller.MockRaftClient.SharedLogData;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.KRaftVersion;
@@ -41,9 +41,9 @@ import java.util.OptionalInt;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
-public class LocalLogManagerTestEnv implements AutoCloseable {
+public class MockRaftClientTestEnv implements AutoCloseable {
     private static final Logger log =
-        LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
+        LoggerFactory.getLogger(MockRaftClientTestEnv.class);
 
     private final String clusterId;
 
@@ -59,23 +59,23 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
     private final File dir;
 
     /**
-     * The shared data for our LocalLogManager instances.
+     * The shared data for our MockRaftClient instances.
      */
     private final SharedLogData shared;
 
     /**
-     * A list of log managers.
+     * A list of raft clients.
      */
-    private final List<LocalLogManager> logManagers;
+    private final List<MockRaftClient> raftClients;
 
     public static class Builder {
-        private final int numManagers;
+        private final int numNodes;
         private Optional<RawSnapshotReader> snapshotReader = Optional.empty();
         private Consumer<SharedLogData> sharedLogDataInitializer = __ -> { };
         private KRaftVersion lastKRaftVersion = KRaftVersion.KRAFT_VERSION_0;
 
-        public Builder(int numManagers) {
-            this.numManagers = numManagers;
+        public Builder(int numNodes) {
+            this.numNodes = numNodes;
         }
 
         public Builder setSnapshotReader(RawSnapshotReader snapshotReader) {
@@ -96,19 +96,19 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
             return this;
         }
 
-        public LocalLogManagerTestEnv build() {
-            return new LocalLogManagerTestEnv(
-                numManagers,
+        public MockRaftClientTestEnv build() {
+            return new MockRaftClientTestEnv(
+                numNodes,
                 snapshotReader,
                 sharedLogDataInitializer,
                 lastKRaftVersion);
         }
 
-        public LocalLogManagerTestEnv buildWithMockListeners() {
-            LocalLogManagerTestEnv env = build();
+        public MockRaftClientTestEnv buildWithMockListeners() {
+            MockRaftClientTestEnv env = build();
             try {
-                for (LocalLogManager logManager : env.logManagers) {
-                    logManager.register(new 
MockMetaLogManagerListener(logManager.nodeId().getAsInt()));
+                for (MockRaftClient raftClient : env.raftClients) {
+                    raftClient.register(new 
MockRaftClientListener(raftClient.nodeId().getAsInt()));
                 }
             } catch (Exception e) {
                 try {
@@ -122,8 +122,8 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         }
     }
 
-    private LocalLogManagerTestEnv(
-        int numManagers,
+    private MockRaftClientTestEnv(
+        int numNodes,
         Optional<RawSnapshotReader> snapshotReader,
         Consumer<SharedLogData> sharedLogDataInitializer,
         KRaftVersion lastKRaftVersion
@@ -132,23 +132,23 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         dir = TestUtils.tempDirectory();
         shared = new SharedLogData(snapshotReader);
         sharedLogDataInitializer.accept(shared);
-        List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
+        List<MockRaftClient> newRaftClients = new ArrayList<>(numNodes);
         try {
-            for (int nodeId = 0; nodeId < numManagers; nodeId++) {
-                newLogManagers.add(new LocalLogManager(
-                    new LogContext(String.format("[LocalLogManager %d] ", 
nodeId)),
+            for (int nodeId = 0; nodeId < numNodes; nodeId++) {
+                newRaftClients.add(new MockRaftClient(
+                    new LogContext(String.format("[MockRaftClient %d] ", 
nodeId)),
                     nodeId,
                     shared,
-                    String.format("LocalLogManager-%d_", nodeId),
+                    String.format("MockRaftClient-%d_", nodeId),
                     lastKRaftVersion));
             }
         } catch (Throwable t) {
-            for (LocalLogManager logManager : newLogManagers) {
-                logManager.close();
+            for (MockRaftClient raftClient : newRaftClients) {
+                raftClient.close();
             }
             throw t;
         }
-        this.logManagers = newLogManagers;
+        this.raftClients = newRaftClients;
     }
 
     /**
@@ -179,9 +179,9 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
         TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
             LeaderAndEpoch result = null;
-            for (LocalLogManager logManager : logManagers) {
-                LeaderAndEpoch leader = logManager.leaderAndEpoch();
-                int nodeId = logManager.nodeId().getAsInt();
+            for (MockRaftClient raftClient : raftClients) {
+                LeaderAndEpoch leader = raftClient.leaderAndEpoch();
+                int nodeId = raftClient.nodeId().getAsInt();
                 if (leader.isLeader(nodeId)) {
                     if (result != null) {
                         throw new RuntimeException("node " + nodeId +
@@ -198,14 +198,14 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
         return value.get();
     }
 
-    public List<LocalLogManager> logManagers() {
-        return logManagers;
+    public List<MockRaftClient> raftClients() {
+        return raftClients;
     }
 
-    public Optional<LocalLogManager> activeLogManager() {
+    public Optional<MockRaftClient> activeRaftClient() {
         OptionalInt leader = shared.leaderAndEpoch().leaderId();
         if (leader.isPresent()) {
-            return Optional.of(logManagers.get(leader.getAsInt()));
+            return Optional.of(raftClients.get(leader.getAsInt()));
         } else {
             return Optional.empty();
         }
@@ -218,11 +218,11 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
     @Override
     public void close() throws InterruptedException {
         try {
-            for (LocalLogManager logManager : logManagers) {
-                logManager.beginShutdown();
+            for (MockRaftClient raftClient : raftClients) {
+                raftClient.beginShutdown();
             }
-            for (LocalLogManager logManager : logManagers) {
-                logManager.close();
+            for (MockRaftClient raftClient : raftClients) {
+                raftClient.close();
             }
             Utils.delete(dir);
         } catch (IOException e) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
index 7e082ac3033..894a7f739c4 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -74,9 +73,9 @@ public class QuorumControllerMetricsIntegrationTest {
     public void testClosingQuorumControllerClosesMetrics() throws Throwable {
         MockControllerMetrics metrics = new MockControllerMetrics();
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setControllerBuilderInitializer(controllerBuilder ->
                     controllerBuilder.setMetrics(metrics)
                 ).
@@ -97,9 +96,9 @@ public class QuorumControllerMetricsIntegrationTest {
         boolean forceFailoverUsingLogLayer
     ) throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             registerBrokersAndUnfence(controlEnv.activeController(), 1); // 
wait for a controller to become active.
@@ -109,7 +108,7 @@ public class QuorumControllerMetricsIntegrationTest {
                 }
             });
             if (forceFailoverUsingLogLayer) {
-                logEnv.activeLogManager().get().throwOnNextAppend();
+                clientEnv.activeRaftClient().get().throwOnNextAppend();
 
                 TestUtils.retryOnExceptionWithTimeout(30_000, () ->
                     createTopics(controlEnv.activeController(), "test_", 1, 1)
@@ -133,9 +132,9 @@ public class QuorumControllerMetricsIntegrationTest {
     @Test
     public void testTimeoutMetrics() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
@@ -192,9 +191,9 @@ public class QuorumControllerMetricsIntegrationTest {
     @Test
     public void testEventQueueOperationsStartedMetric() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                                                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                                                      build()
         ) {
             QuorumController active = controlEnv.activeController();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 41ef67ab885..3e58cabeac3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -99,8 +99,6 @@ import 
org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
 import 
org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.util.BatchFileWriter;
-import org.apache.kafka.metalog.LocalLogManager;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
@@ -180,9 +178,9 @@ public class QuorumControllerTest {
     @Test
     public void testConfigurationOperations() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
@@ -191,10 +189,10 @@ public class QuorumControllerTest {
                     Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
                 setBrokerId(0).
                 setLogDirs(List.of(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
-                setClusterId(logEnv.clusterId())).get();
+                setClusterId(clientEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
@@ -221,9 +219,9 @@ public class QuorumControllerTest {
     @Test
     public void testDelayedConfigurationOperations() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
@@ -232,18 +230,18 @@ public class QuorumControllerTest {
                         Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
                     setBrokerId(0).
                     
setLogDirs(List.of(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
-                    setClusterId(logEnv.clusterId())).get();
-            testDelayedConfigurationOperations(logEnv, 
controlEnv.activeController());
+                    setClusterId(clientEnv.clusterId())).get();
+            testDelayedConfigurationOperations(clientEnv, 
controlEnv.activeController());
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
     private void testDelayedConfigurationOperations(
-        LocalLogManagerTestEnv logEnv,
+        MockRaftClientTestEnv clientEnv,
         QuorumController controller
     ) throws Throwable {
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
+        clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(1L));
         CompletableFuture<Map<ConfigResource, ApiError>> future1 =
             controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
                 BROKER0, Map.of("baz", entry(SET, "123"))), false);
@@ -252,7 +250,7 @@ public class QuorumControllerTest {
             new ResultOrError<>(Map.of())),
             controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of(
                 BROKER0, List.of())).get());
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
+        clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(8L));
         assertEquals(Map.of(BROKER0, ApiError.NONE), future1.get());
     }
 
@@ -266,9 +264,9 @@ public class QuorumControllerTest {
         long sessionTimeoutMillis = 1000;
 
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
                 setBootstrapMetadata(SIMPLE_BOOTSTRAP).
                 build()
@@ -344,7 +342,7 @@ public class QuorumControllerTest {
             // Check that there are imbalaned partitions
             
assertTrue(active.replicationControl().arePartitionLeadersImbalanced());
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
@@ -352,9 +350,9 @@ public class QuorumControllerTest {
     public  void testElrEnabledByDefault() throws Throwable {
         long sessionTimeoutMillis = 500;
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
                 setBootstrapMetadata(BootstrapMetadata.fromRecords(
                     List.of(
@@ -384,9 +382,9 @@ public class QuorumControllerTest {
         long sessionTimeoutMillis = 500;
 
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
                 
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, 
"test-provided bootstrap ELR enabled")).
                 build()
@@ -521,9 +519,9 @@ public class QuorumControllerTest {
         long sessionTimeoutMillis = 500;
 
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv)
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv)
                 .setControllerBuilderInitializer(controllerBuilder ->
                     
controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15)))
                 .setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis))
@@ -618,8 +616,8 @@ public class QuorumControllerTest {
         long sessionTimeoutMillis = 300;
 
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).build();
+                QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
                 
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, 
"test-provided bootstrap ELR enabled")).
                 build()
@@ -747,9 +745,9 @@ public class QuorumControllerTest {
         long leaderImbalanceCheckIntervalNs = 1_000_000_000;
 
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
                 
setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).
                 setBootstrapMetadata(SIMPLE_BOOTSTRAP).
@@ -873,7 +871,7 @@ public class QuorumControllerTest {
                 "Leaders were not balanced after unfencing all of the brokers"
             );
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
@@ -886,9 +884,9 @@ public class QuorumControllerTest {
         long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100);
         long maxReplicationDelayMs = 1_000;
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setControllerBuilderInitializer(controllerBuilder ->
                     
controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs))
                 ).
@@ -898,29 +896,29 @@ public class QuorumControllerTest {
             listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
             QuorumController active = controlEnv.activeController();
 
-            LocalLogManager localLogManager = logEnv
-                .logManagers()
+            MockRaftClient mockRaftClient = clientEnv
+                .raftClients()
                 .stream()
                 .filter(logManager -> 
logManager.nodeId().equals(OptionalInt.of(active.nodeId())))
                 .findAny()
                 .get();
             TestUtils.waitForCondition(
-                () -> localLogManager.highWatermark().isPresent(),
+                () -> mockRaftClient.highWatermark().isPresent(),
                 maxReplicationDelayMs,
                 "High watermark was not established"
             );
 
-            final long firstHighWatermark = 
localLogManager.highWatermark().getAsLong();
+            final long firstHighWatermark = 
mockRaftClient.highWatermark().getAsLong();
             TestUtils.waitForCondition(
-                () -> localLogManager.highWatermark().getAsLong() > 
firstHighWatermark,
+                () -> mockRaftClient.highWatermark().getAsLong() > 
firstHighWatermark,
                 maxReplicationDelayMs,
                 "Active controller didn't write NoOpRecord the first time"
             );
 
             // Do it again to make sure that we are not counting the leader 
change record
-            final long secondHighWatermark = 
localLogManager.highWatermark().getAsLong();
+            final long secondHighWatermark = 
mockRaftClient.highWatermark().getAsLong();
             TestUtils.waitForCondition(
-                () -> localLogManager.highWatermark().getAsLong() > 
secondHighWatermark,
+                () -> mockRaftClient.highWatermark().getAsLong() > 
secondHighWatermark,
                 maxReplicationDelayMs,
                 "Active controller didn't write NoOpRecord the second time"
             );
@@ -931,10 +929,10 @@ public class QuorumControllerTest {
     @CsvSource(value = {"0, 0", "0, 1", "1, 0", "1, 1"})
     public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, 
short brokerMaxSupportedKraftVersion) throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 
setLastKRaftVersion(KRaftVersion.fromFeatureLevel(finalizedKraftVersion)).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setBootstrapMetadata(SIMPLE_BOOTSTRAP).
                 build()
         ) {
@@ -983,9 +981,9 @@ public class QuorumControllerTest {
     @Test
     public void testUnregisterBroker() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             ListenerCollection listeners = new ListenerCollection();
@@ -1041,7 +1039,7 @@ public class QuorumControllerTest {
                 });
             assertEquals(0, topicPartitionFuture.get().partitionId());
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
@@ -1063,14 +1061,14 @@ public class QuorumControllerTest {
         Map<Integer, Long> brokerEpochs = new HashMap<>();
         Uuid fooId;
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setBootstrapMetadata(SIMPLE_BOOTSTRAP).
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
-            for (int i = 0; i < logEnv.logManagers().size(); i++) {
+            for (int i = 0; i < clientEnv.raftClients().size(); i++) {
                 active.registerController(ANONYMOUS_CONTEXT,
                     new ControllerRegistrationRequestData().
                         setControllerId(i).
@@ -1131,9 +1129,9 @@ public class QuorumControllerTest {
             active.allocateProducerIds(ANONYMOUS_CONTEXT,
                 new 
AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
             controlEnv.close();
-            assertEquals(generateTestRecords(fooId, brokerEpochs), 
logEnv.allRecords());
+            assertEquals(generateTestRecords(fooId, brokerEpochs), 
clientEnv.allRecords());
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
@@ -1269,8 +1267,8 @@ public class QuorumControllerTest {
     @Test
     public void testTimeouts() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).build();
+                QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
@@ -1315,7 +1313,7 @@ public class QuorumControllerTest {
             assertYieldsTimeout(alterReassignmentsFuture);
             assertYieldsTimeout(listReassignmentsFuture);
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
@@ -1330,9 +1328,9 @@ public class QuorumControllerTest {
     @Test
     public void testEarlyControllerResults() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(1).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
@@ -1362,16 +1360,16 @@ public class QuorumControllerTest {
             alterReassignmentsFuture.get();
             countDownLatch.countDown();
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
     @Test
     public void testConfigResourceExistenceChecker() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
@@ -1402,16 +1400,16 @@ public class QuorumControllerTest {
             assertThrows(UnknownTopicOrPartitionException.class,
                 () -> checker.accept(new ConfigResource(TOPIC, "bar")));
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
     @Test
     public void testFatalMetadataReplayErrorOnActive() throws Throwable {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
@@ -1435,14 +1433,14 @@ public class QuorumControllerTest {
             new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
         );
 
-        LocalLogManagerTestEnv.Builder logEnvBuilder = new 
LocalLogManagerTestEnv.Builder(3)
+        MockRaftClientTestEnv.Builder clientEnvBuilder = new 
MockRaftClientTestEnv.Builder(3)
             .setSnapshotReader(FileRawSnapshotReader.open(
                 invalidSnapshot.tempDir.toPath(),
                 new OffsetAndEpoch(0, 0)
             ));
 
-        try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
-            try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).build()) {
+        try (MockRaftClientTestEnv clientEnv = clientEnvBuilder.build()) {
+            try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).build()) {
                 TestUtils.waitForCondition(() -> 
controlEnv.controllers().stream().allMatch(
                         controller -> 
controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null),
                         "At least one controller failed to detect the fatal 
fault"
@@ -1454,11 +1452,11 @@ public class QuorumControllerTest {
 
     @Test
     public void testFatalMetadataErrorDuringLogLoading() throws Exception {
-        try (LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).build()) {
-            logEnv.appendInitialRecords(List.of(
+        try (MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).build()) {
+            clientEnv.appendInitialRecords(List.of(
                     new ApiMessageAndVersion(new PartitionRecord(), (short) 
0)));
 
-            try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).build()) {
+            try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).build()) {
                 TestUtils.waitForCondition(() -> 
controlEnv.controllers().stream().allMatch(
                         controller -> 
controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null),
                         "At least one controller failed to detect the fatal 
fault"
@@ -1505,9 +1503,9 @@ public class QuorumControllerTest {
     @Test
     public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+            MockRaftClientTestEnv clientEnv = new 
MockRaftClientTestEnv.Builder(3).
                 build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(clientEnv).
                 setBootstrapMetadata(COMPLEX_BOOTSTRAP).
                 build()
         ) {
@@ -1538,7 +1536,7 @@ public class QuorumControllerTest {
                     Map.of("foo", "bar").equals(resultOrError.result());
             }, "Failed to see expected config change from bootstrap metadata");
 
-            testToImages(logEnv.allRecords());
+            testToImages(clientEnv.allRecords());
         }
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index b04a3e8c658..e6ececd2ba1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -19,7 +19,6 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.metadata.FakeKafkaConfigSchema;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -41,20 +40,20 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class QuorumControllerTestEnv implements AutoCloseable {
     private final List<QuorumController> controllers;
-    private final LocalLogManagerTestEnv logEnv;
+    private final MockRaftClientTestEnv clientEnv;
     private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new 
HashMap<>();
     private final Map<Integer, MockFaultHandler> nonFatalFaultHandlers = new 
HashMap<>();
 
     public static class Builder {
-        private final LocalLogManagerTestEnv logEnv;
+        private final MockRaftClientTestEnv clientEnv;
         private Consumer<QuorumController.Builder> 
controllerBuilderInitializer = __ -> { };
         private OptionalLong sessionTimeoutMillis = OptionalLong.empty();
         private OptionalLong leaderImbalanceCheckIntervalNs = 
OptionalLong.empty();
         private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
                 fromVersion(MetadataVersion.latestTesting(), "test-provided 
version");
 
-        public Builder(LocalLogManagerTestEnv logEnv) {
-            this.logEnv = logEnv;
+        public Builder(MockRaftClientTestEnv clientEnv) {
+            this.clientEnv = clientEnv;
         }
 
         public Builder 
setControllerBuilderInitializer(Consumer<QuorumController.Builder> 
controllerBuilderInitializer) {
@@ -79,7 +78,7 @@ public class QuorumControllerTestEnv implements AutoCloseable 
{
 
         public QuorumControllerTestEnv build() throws Exception {
             return new QuorumControllerTestEnv(
-                logEnv,
+                clientEnv,
                 controllerBuilderInitializer,
                 sessionTimeoutMillis,
                 leaderImbalanceCheckIntervalNs,
@@ -89,21 +88,21 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
     }
 
     private QuorumControllerTestEnv(
-        LocalLogManagerTestEnv logEnv,
+        MockRaftClientTestEnv clientEnv,
         Consumer<QuorumController.Builder> controllerBuilderInitializer,
         OptionalLong sessionTimeoutMillis,
         OptionalLong leaderImbalanceCheckIntervalNs,
         boolean eligibleLeaderReplicasEnabled,
         BootstrapMetadata bootstrapMetadata
     ) throws Exception {
-        this.logEnv = logEnv;
-        int numControllers = logEnv.logManagers().size();
+        this.clientEnv = clientEnv;
+        int numControllers = clientEnv.raftClients().size();
         this.controllers = new ArrayList<>(numControllers);
         try {
             List<Integer> nodeIds = IntStream.range(0, 
numControllers).boxed().toList();
             for (int nodeId = 0; nodeId < numControllers; nodeId++) {
-                QuorumController.Builder builder = new 
QuorumController.Builder(nodeId, logEnv.clusterId());
-                builder.setRaftClient(logEnv.logManagers().get(nodeId));
+                QuorumController.Builder builder = new 
QuorumController.Builder(nodeId, clientEnv.clusterId());
+                builder.setRaftClient(clientEnv.raftClients().get(nodeId));
                 if (eligibleLeaderReplicasEnabled) {
                     bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(
                         EligibleLeaderReplicasVersion.FEATURE_NAME,
@@ -140,7 +139,7 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
     QuorumController activeController(boolean waitForActivation) throws 
InterruptedException {
         AtomicReference<QuorumController> value = new AtomicReference<>(null);
         TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
-            LeaderAndEpoch leader = logEnv.leaderAndEpoch();
+            LeaderAndEpoch leader = clientEnv.leaderAndEpoch();
             for (QuorumController controller : controllers) {
                 if 
(OptionalInt.of(controller.nodeId()).equals(leader.leaderId()) &&
                     controller.curClaimEpoch() == leader.epoch()) {

Reply via email to