This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 27647c7c7c7 MINOR: Remove the MetaLogShim namings (#20357) 27647c7c7c7 is described below commit 27647c7c7c784f70979f6b392fb4ae60ce1c6c46 Author: OuO <mapan0...@gmail.com> AuthorDate: Sat Aug 16 02:02:56 2025 +0800 MINOR: Remove the MetaLogShim namings (#20357) Correct parameter name from `logManager` to `raftClient` (leftover from PR #10705) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- checkstyle/import-control-metadata.xml | 16 --- .../apache/kafka/controller/QuorumController.java | 6 +- .../MockRaftClient.java} | 82 ++++++------- .../MockRaftClientListener.java} | 6 +- .../MockRaftClientTest.java} | 57 ++++----- .../MockRaftClientTestEnv.java} | 80 ++++++------- .../QuorumControllerMetricsIntegrationTest.java | 19 ++- .../kafka/controller/QuorumControllerTest.java | 132 ++++++++++----------- .../kafka/controller/QuorumControllerTestEnv.java | 23 ++-- 9 files changed, 201 insertions(+), 220 deletions(-) diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index c2660674e32..293801bd75f 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -83,7 +83,6 @@ <allow pkg="org.apache.kafka.metadata" /> <allow pkg="org.apache.kafka.metadata.authorizer" /> <allow pkg="org.apache.kafka.metadata.migration" /> - <allow pkg="org.apache.kafka.metalog" /> <allow pkg="org.apache.kafka.deferred" /> <allow pkg="org.apache.kafka.queue" /> <allow pkg="org.apache.kafka.raft" /> @@ -160,7 +159,6 @@ <allow pkg="org.apache.kafka.common.requests" /> <allow pkg="org.apache.kafka.image" /> <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.metalog" /> <allow pkg="org.apache.kafka.queue" /> <allow pkg="org.apache.kafka.raft" /> <allow pkg="org.apache.kafka.server.authorizer" /> @@ -198,18 +196,4 @@ </subpackage> </subpackage> - <subpackage name="metalog"> - <allow class="org.apache.kafka.common.compress.Compression" exact-match="true" /> - <allow pkg="org.apache.kafka.common.metadata" /> - <allow pkg="org.apache.kafka.common.protocol" /> - <allow pkg="org.apache.kafka.common.record" /> - <allow pkg="org.apache.kafka.metadata" /> - <allow pkg="org.apache.kafka.metalog" /> - <allow pkg="org.apache.kafka.raft" /> - <allow pkg="org.apache.kafka.snapshot" /> - <allow pkg="org.apache.kafka.queue" /> - <allow pkg="org.apache.kafka.server.common" /> - <allow pkg="org.apache.kafka.test" /> - </subpackage> - </import-control> diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 5f4d6142434..3e1dd69723b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -260,8 +260,8 @@ public final class QuorumController implements Controller { return this; } - public Builder setRaftClient(RaftClient<ApiMessageAndVersion> logManager) { - this.raftClient = logManager; + public Builder setRaftClient(RaftClient<ApiMessageAndVersion> raftClient) { + this.raftClient = raftClient; return this; } @@ -1082,7 +1082,7 @@ public final class QuorumController implements Controller { @Override public void beginShutdown() { - queue.beginShutdown("MetaLogManager.Listener"); + queue.beginShutdown("QuorumMetaLogListener"); } private void appendRaftEvent(String name, Runnable runnable) { diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java similarity index 91% rename from metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java rename to metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java index 013d5945d2e..21e3a645530 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.metalog; +package org.apache.kafka.controller; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.utils.BufferSupplier; @@ -66,9 +66,9 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; /** - * The LocalLogManager is a test implementation that relies on the contents of memory. + * The MockRaftClient is a test implementation that relies on the contents of memory. */ -public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, AutoCloseable { +public final class MockRaftClient implements RaftClient<ApiMessageAndVersion>, AutoCloseable { interface LocalBatch { int epoch(); int size(); @@ -158,9 +158,9 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, private static final Logger log = LoggerFactory.getLogger(SharedLogData.class); /** - * Maps node IDs to the matching log managers. + * Maps node IDs to the matching raft clients. */ - private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>(); + private final HashMap<Integer, MockRaftClient> raftClients = new HashMap<>(); /** * Maps offsets to record batches. @@ -198,17 +198,17 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, } } - synchronized void registerLogManager(LocalLogManager logManager) { - if (logManagers.put(logManager.nodeId, logManager) != null) { - throw new RuntimeException("Can't have multiple LocalLogManagers " + - "with id " + logManager.nodeId()); + synchronized void registerRaftClient(MockRaftClient raftClient) { + if (raftClients.put(raftClient.nodeId, raftClient) != null) { + throw new RuntimeException("Can't have multiple MockRaftClients " + + "with id " + raftClient.nodeId()); } electLeaderIfNeeded(); } - synchronized void unregisterLogManager(LocalLogManager logManager) { - if (!logManagers.remove(logManager.nodeId, logManager)) { - throw new RuntimeException("Log manager " + logManager.nodeId() + + synchronized void unregisterRaftClient(MockRaftClient raftClient) { + if (!raftClients.remove(raftClient.nodeId, raftClient)) { + throw new RuntimeException("MockRaftClient " + raftClient.nodeId() + " was not found."); } } @@ -259,19 +259,19 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, if (batch instanceof LeaderChangeBatch leaderChangeBatch) { leader = leaderChangeBatch.newLeader; } - for (LocalLogManager logManager : logManagers.values()) { - logManager.scheduleLogCheck(); + for (MockRaftClient raftClient : raftClients.values()) { + raftClient.scheduleLogCheck(); } prevOffset = nextEndOffset; return nextEndOffset; } synchronized void electLeaderIfNeeded() { - if (leader.leaderId().isPresent() || logManagers.isEmpty()) { + if (leader.leaderId().isPresent() || raftClients.isEmpty()) { return; } - int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size()); - Iterator<Integer> iter = logManagers.keySet().iterator(); + int nextLeaderIndex = ThreadLocalRandom.current().nextInt(raftClients.size()); + Iterator<Integer> iter = raftClients.keySet().iterator(); Integer nextLeaderNode = null; for (int i = 0; i <= nextLeaderIndex; i++) { nextLeaderNode = iter.next(); @@ -294,7 +294,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, } /** - * Optionally return a snapshot reader if the offset if less than the first batch. + * Optionally return a snapshot reader if the offset is less than the first batch. */ synchronized Optional<RawSnapshotReader> nextSnapshot(long offset) { return Optional.ofNullable(snapshots.lastEntry()).flatMap(entry -> { @@ -325,7 +325,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, /** * Returns the snapshot id of the latest snapshot if there is one. * - * If a snapshot doesn't exists, it return an empty Optional. + * If a snapshot doesn't exist, it returns an empty Optional. */ synchronized Optional<OffsetAndEpoch> latestSnapshotId() { return Optional.ofNullable(snapshots.lastEntry()).map(entry -> entry.getValue().snapshotId()); @@ -400,65 +400,65 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, private final Logger log; /** - * The node ID of this local log manager. Each log manager must have a unique ID. + * The node ID of this raft client. Each raft client must have a unique ID. */ private final int nodeId; /** - * A reference to the in-memory state that unites all the log managers in use. + * A reference to the in-memory state that unites all the raft clients in use. */ private final SharedLogData shared; /** - * The event queue used by this local log manager. + * The event queue used by this raft client. */ private final EventQueue eventQueue; /** - * The latest kraft version used by this local log manager. + * The latest kraft version used by this raft client. */ private KRaftVersion lastKRaftVersion; /** - * Whether this LocalLogManager has been shut down. + * Whether this raft client has been shut down. */ private boolean shutdown = false; /** - * An offset that the log manager will not read beyond. This exists only for testing + * An offset that the raft client will not read beyond. This exists only for testing * purposes. */ private long maxReadOffset; /** - * The listener objects attached to this local log manager. + * The listener objects attached to this raft client. */ private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData> listeners = new IdentityHashMap<>(); /** - * The current leader, as seen by this log manager. + * The current leader, as seen by this raft client. */ private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0); - /* + /** * If this variable is true the next scheduleAppend will fail */ private final AtomicBoolean throwOnNextAppend = new AtomicBoolean(false); - public LocalLogManager(LogContext logContext, - int nodeId, - SharedLogData shared, - String threadNamePrefix, - KRaftVersion lastKRaftVersion) { + public MockRaftClient(LogContext logContext, + int nodeId, + SharedLogData shared, + String threadNamePrefix, + KRaftVersion lastKRaftVersion) { this.logContext = logContext; - this.log = logContext.logger(LocalLogManager.class); + this.log = logContext.logger(MockRaftClient.class); this.nodeId = nodeId; this.shared = shared; this.maxReadOffset = shared.initialMaxReadOffset(); this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix, new ShutdownEvent()); this.lastKRaftVersion = lastKRaftVersion; - shared.registerLogManager(this); + this.shared.registerRaftClient(this); } private void scheduleLogCheck() { @@ -477,7 +477,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, listenerData.handleLoadSnapshot( RecordsSnapshotReader.of( snapshot.get(), - new MetadataRecordSerde(), + new MetadataRecordSerde(), BufferSupplier.create(), Integer.MAX_VALUE, true, @@ -556,7 +556,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, } public void beginShutdown() { - eventQueue.beginShutdown("beginShutdown"); + eventQueue.beginShutdown("MockKafkaRaftClient"); } class ShutdownEvent implements EventQueue.Event { @@ -569,7 +569,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, for (MetaLogListenerData listenerData : listeners.values()) { listenerData.beginShutdown(); } - shared.unregisterLogManager(LocalLogManager.this); + shared.unregisterRaftClient(MockRaftClient.this); } } catch (Exception e) { log.error("Unexpected exception while sending beginShutdown callbacks", e); @@ -592,7 +592,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, } /** - * Shutdown the log manager. + * Shutdown the raft client. * * Even though the API suggests a non-blocking shutdown, this method always returns a completed * future. This means that shutdown is a blocking operation. @@ -614,7 +614,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, CompletableFuture<Void> future = new CompletableFuture<>(); eventQueue.append(() -> { if (shutdown) { - log.info("Node {}: can't register because local log manager has " + + log.info("Node {}: can't register because raft client has " + "already been shut down.", nodeId); future.complete(null); } else { @@ -643,7 +643,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener) { eventQueue.append(() -> { if (shutdown) { - log.info("Node {}: can't unregister because local log manager is shutdown", nodeId); + log.info("Node {}: can't unregister because raft client is shutdown", nodeId); } else { int id = System.identityHashCode(listener); if (listeners.remove(listener) == null) { diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java similarity index 95% rename from metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java rename to metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java index 53bc49f8075..b4ee74346ee 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.metalog; +package org.apache.kafka.controller; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.raft.Batch; @@ -29,7 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.OptionalInt; -public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessageAndVersion> { +public class MockRaftClientListener implements RaftClient.Listener<ApiMessageAndVersion> { public static final String COMMIT = "COMMIT"; public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET"; public static final String NEW_LEADER = "NEW_LEADER"; @@ -41,7 +41,7 @@ public class MockMetaLogManagerListener implements RaftClient.Listener<ApiMessag private final List<String> serializedEvents = new ArrayList<>(); private LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0); - public MockMetaLogManagerListener(int nodeId) { + public MockRaftClientListener(int nodeId) { this.nodeId = nodeId; } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java similarity index 72% rename from metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java rename to metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java index b42f781e985..4217bd4ed26 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.metalog; +package org.apache.kafka.controller; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.raft.LeaderAndEpoch; @@ -28,23 +28,23 @@ import org.junit.jupiter.api.Timeout; import java.util.List; import java.util.OptionalInt; -import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT; -import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET; -import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN; +import static org.apache.kafka.controller.MockRaftClientListener.COMMIT; +import static org.apache.kafka.controller.MockRaftClientListener.LAST_COMMITTED_OFFSET; +import static org.apache.kafka.controller.MockRaftClientListener.SHUTDOWN; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @Timeout(value = 40) -public class LocalLogManagerTest { +public class MockRaftClientTest { /** - * Test creating a LocalLogManager and closing it. + * Test creating a MockRaftClient and closing it. */ @Test public void testCreateAndClose() throws Exception { try ( - LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(1). buildWithMockListeners() ) { env.close(); @@ -53,12 +53,12 @@ public class LocalLogManagerTest { } /** - * Test that the local log manager will claim leadership. + * Test that the raft client will claim leadership. */ @Test public void testClaimsLeadership() throws Exception { try ( - LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(1). buildWithMockListeners() ) { assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader()); @@ -68,12 +68,12 @@ public class LocalLogManagerTest { } /** - * Test that we can pass leadership back and forth between log managers. + * Test that we can pass leadership back and forth between raft clients. */ @Test public void testPassLeadership() throws Exception { try ( - LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(3). buildWithMockListeners() ) { LeaderAndEpoch first = env.waitForLeader(); @@ -82,7 +82,7 @@ public class LocalLogManagerTest { int currentLeaderId = cur.leaderId().orElseThrow(() -> new AssertionError("Current leader is undefined") ); - env.logManagers().get(currentLeaderId).resign(cur.epoch()); + env.raftClients().get(currentLeaderId).resign(cur.epoch()); LeaderAndEpoch next = env.waitForLeader(); while (next.epoch() == cur.epoch()) { @@ -100,10 +100,9 @@ public class LocalLogManagerTest { } private static void waitForLastCommittedOffset(long targetOffset, - LocalLogManager logManager) throws InterruptedException { + MockRaftClient raftClient) throws InterruptedException { TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { - MockMetaLogManagerListener listener = - (MockMetaLogManagerListener) logManager.listeners().get(0); + MockRaftClientListener listener = (MockRaftClientListener) raftClient.listeners().get(0); long highestOffset = -1; for (String event : listener.serializedEvents()) { if (event.startsWith(LAST_COMMITTED_OFFSET)) { @@ -117,19 +116,19 @@ public class LocalLogManagerTest { } } if (highestOffset < targetOffset) { - throw new RuntimeException("Offset for log manager " + - logManager.nodeId() + " only reached " + highestOffset); + throw new RuntimeException("Offset for raft client " + + raftClient.nodeId() + " only reached " + highestOffset); } }); } /** - * Test that all the log managers see all the commits. + * Test that all the raft clients see all the commits. */ @Test public void testCommits() throws Exception { try ( - LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(3). buildWithMockListeners() ) { LeaderAndEpoch leaderInfo = env.waitForLeader(); @@ -137,22 +136,24 @@ public class LocalLogManagerTest { new AssertionError("Current leader is undefined") ); - LocalLogManager activeLogManager = env.logManagers().get(leaderId); - int epoch = activeLogManager.leaderAndEpoch().epoch(); + MockRaftClient activeRaftClient = env.raftClients().get(leaderId); + int epoch = activeRaftClient.leaderAndEpoch().epoch(); List<ApiMessageAndVersion> messages = List.of( new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0), new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0), new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0)); - assertEquals(3, activeLogManager.prepareAppend(epoch, messages)); - activeLogManager.schedulePreparedAppend(); - for (LocalLogManager logManager : env.logManagers()) { - waitForLastCommittedOffset(3, logManager); + assertEquals(3, activeRaftClient.prepareAppend(epoch, messages)); + + activeRaftClient.schedulePreparedAppend(); + for (MockRaftClient raftClient : env.raftClients()) { + waitForLastCommittedOffset(3, raftClient); } - List<MockMetaLogManagerListener> listeners = env.logManagers().stream(). - map(m -> (MockMetaLogManagerListener) m.listeners().get(0)). + + List<MockRaftClientListener> listeners = env.raftClients().stream(). + map(m -> (MockRaftClientListener) m.listeners().get(0)). toList(); env.close(); - for (MockMetaLogManagerListener listener : listeners) { + for (MockRaftClientListener listener : listeners) { List<String> events = listener.serializedEvents(); assertEquals(SHUTDOWN, events.get(events.size() - 1)); int foundIndex = 0; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java similarity index 73% rename from metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java rename to metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java index a1e6742f8ef..6d1e741d1d8 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.kafka.metalog; +package org.apache.kafka.controller; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.metalog.LocalLogManager.LeaderChangeBatch; -import org.apache.kafka.metalog.LocalLogManager.LocalRecordBatch; -import org.apache.kafka.metalog.LocalLogManager.SharedLogData; +import org.apache.kafka.controller.MockRaftClient.LeaderChangeBatch; +import org.apache.kafka.controller.MockRaftClient.LocalRecordBatch; +import org.apache.kafka.controller.MockRaftClient.SharedLogData; import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.KRaftVersion; @@ -41,9 +41,9 @@ import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -public class LocalLogManagerTestEnv implements AutoCloseable { +public class MockRaftClientTestEnv implements AutoCloseable { private static final Logger log = - LoggerFactory.getLogger(LocalLogManagerTestEnv.class); + LoggerFactory.getLogger(MockRaftClientTestEnv.class); private final String clusterId; @@ -59,23 +59,23 @@ public class LocalLogManagerTestEnv implements AutoCloseable { private final File dir; /** - * The shared data for our LocalLogManager instances. + * The shared data for our MockRaftClient instances. */ private final SharedLogData shared; /** - * A list of log managers. + * A list of raft clients. */ - private final List<LocalLogManager> logManagers; + private final List<MockRaftClient> raftClients; public static class Builder { - private final int numManagers; + private final int numNodes; private Optional<RawSnapshotReader> snapshotReader = Optional.empty(); private Consumer<SharedLogData> sharedLogDataInitializer = __ -> { }; private KRaftVersion lastKRaftVersion = KRaftVersion.KRAFT_VERSION_0; - public Builder(int numManagers) { - this.numManagers = numManagers; + public Builder(int numNodes) { + this.numNodes = numNodes; } public Builder setSnapshotReader(RawSnapshotReader snapshotReader) { @@ -96,19 +96,19 @@ public class LocalLogManagerTestEnv implements AutoCloseable { return this; } - public LocalLogManagerTestEnv build() { - return new LocalLogManagerTestEnv( - numManagers, + public MockRaftClientTestEnv build() { + return new MockRaftClientTestEnv( + numNodes, snapshotReader, sharedLogDataInitializer, lastKRaftVersion); } - public LocalLogManagerTestEnv buildWithMockListeners() { - LocalLogManagerTestEnv env = build(); + public MockRaftClientTestEnv buildWithMockListeners() { + MockRaftClientTestEnv env = build(); try { - for (LocalLogManager logManager : env.logManagers) { - logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt())); + for (MockRaftClient raftClient : env.raftClients) { + raftClient.register(new MockRaftClientListener(raftClient.nodeId().getAsInt())); } } catch (Exception e) { try { @@ -122,8 +122,8 @@ public class LocalLogManagerTestEnv implements AutoCloseable { } } - private LocalLogManagerTestEnv( - int numManagers, + private MockRaftClientTestEnv( + int numNodes, Optional<RawSnapshotReader> snapshotReader, Consumer<SharedLogData> sharedLogDataInitializer, KRaftVersion lastKRaftVersion @@ -132,23 +132,23 @@ public class LocalLogManagerTestEnv implements AutoCloseable { dir = TestUtils.tempDirectory(); shared = new SharedLogData(snapshotReader); sharedLogDataInitializer.accept(shared); - List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers); + List<MockRaftClient> newRaftClients = new ArrayList<>(numNodes); try { - for (int nodeId = 0; nodeId < numManagers; nodeId++) { - newLogManagers.add(new LocalLogManager( - new LogContext(String.format("[LocalLogManager %d] ", nodeId)), + for (int nodeId = 0; nodeId < numNodes; nodeId++) { + newRaftClients.add(new MockRaftClient( + new LogContext(String.format("[MockRaftClient %d] ", nodeId)), nodeId, shared, - String.format("LocalLogManager-%d_", nodeId), + String.format("MockRaftClient-%d_", nodeId), lastKRaftVersion)); } } catch (Throwable t) { - for (LocalLogManager logManager : newLogManagers) { - logManager.close(); + for (MockRaftClient raftClient : newRaftClients) { + raftClient.close(); } throw t; } - this.logManagers = newLogManagers; + this.raftClients = newRaftClients; } /** @@ -179,9 +179,9 @@ public class LocalLogManagerTestEnv implements AutoCloseable { AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null); TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { LeaderAndEpoch result = null; - for (LocalLogManager logManager : logManagers) { - LeaderAndEpoch leader = logManager.leaderAndEpoch(); - int nodeId = logManager.nodeId().getAsInt(); + for (MockRaftClient raftClient : raftClients) { + LeaderAndEpoch leader = raftClient.leaderAndEpoch(); + int nodeId = raftClient.nodeId().getAsInt(); if (leader.isLeader(nodeId)) { if (result != null) { throw new RuntimeException("node " + nodeId + @@ -198,14 +198,14 @@ public class LocalLogManagerTestEnv implements AutoCloseable { return value.get(); } - public List<LocalLogManager> logManagers() { - return logManagers; + public List<MockRaftClient> raftClients() { + return raftClients; } - public Optional<LocalLogManager> activeLogManager() { + public Optional<MockRaftClient> activeRaftClient() { OptionalInt leader = shared.leaderAndEpoch().leaderId(); if (leader.isPresent()) { - return Optional.of(logManagers.get(leader.getAsInt())); + return Optional.of(raftClients.get(leader.getAsInt())); } else { return Optional.empty(); } @@ -218,11 +218,11 @@ public class LocalLogManagerTestEnv implements AutoCloseable { @Override public void close() throws InterruptedException { try { - for (LocalLogManager logManager : logManagers) { - logManager.beginShutdown(); + for (MockRaftClient raftClient : raftClients) { + raftClient.beginShutdown(); } - for (LocalLogManager logManager : logManagers) { - logManager.close(); + for (MockRaftClient raftClient : raftClients) { + raftClient.close(); } Utils.delete(dir); } catch (IOException e) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java index 7e082ac3033..894a7f739c4 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.Time; import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.metadata.BrokerHeartbeatReply; -import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; @@ -74,9 +73,9 @@ public class QuorumControllerMetricsIntegrationTest { public void testClosingQuorumControllerClosesMetrics() throws Throwable { MockControllerMetrics metrics = new MockControllerMetrics(); try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setMetrics(metrics) ). @@ -97,9 +96,9 @@ public class QuorumControllerMetricsIntegrationTest { boolean forceFailoverUsingLogLayer ) throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { registerBrokersAndUnfence(controlEnv.activeController(), 1); // wait for a controller to become active. @@ -109,7 +108,7 @@ public class QuorumControllerMetricsIntegrationTest { } }); if (forceFailoverUsingLogLayer) { - logEnv.activeLogManager().get().throwOnNextAppend(); + clientEnv.activeRaftClient().get().throwOnNextAppend(); TestUtils.retryOnExceptionWithTimeout(30_000, () -> createTopics(controlEnv.activeController(), "test_", 1, 1) @@ -133,9 +132,9 @@ public class QuorumControllerMetricsIntegrationTest { @Test public void testTimeoutMetrics() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { QuorumController active = controlEnv.activeController(); @@ -192,9 +191,9 @@ public class QuorumControllerMetricsIntegrationTest { @Test public void testEventQueueOperationsStartedMetric() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { QuorumController active = controlEnv.activeController(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 41ef67ab885..3e58cabeac3 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -99,8 +99,6 @@ import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair; import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.util.BatchFileWriter; -import org.apache.kafka.metalog.LocalLogManager; -import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.Batch; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; @@ -180,9 +178,9 @@ public class QuorumControllerTest { @Test public void testConfigurationOperations() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, @@ -191,10 +189,10 @@ public class QuorumControllerTest { Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). setBrokerId(0). setLogDirs(List.of(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))). - setClusterId(logEnv.clusterId())).get(); + setClusterId(clientEnv.clusterId())).get(); testConfigurationOperations(controlEnv.activeController()); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @@ -221,9 +219,9 @@ public class QuorumControllerTest { @Test public void testDelayedConfigurationOperations() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, @@ -232,18 +230,18 @@ public class QuorumControllerTest { Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))). setBrokerId(0). setLogDirs(List.of(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))). - setClusterId(logEnv.clusterId())).get(); - testDelayedConfigurationOperations(logEnv, controlEnv.activeController()); + setClusterId(clientEnv.clusterId())).get(); + testDelayedConfigurationOperations(clientEnv, controlEnv.activeController()); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } private void testDelayedConfigurationOperations( - LocalLogManagerTestEnv logEnv, + MockRaftClientTestEnv clientEnv, QuorumController controller ) throws Throwable { - logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L)); + clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(1L)); CompletableFuture<Map<ConfigResource, ApiError>> future1 = controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of( BROKER0, Map.of("baz", entry(SET, "123"))), false); @@ -252,7 +250,7 @@ public class QuorumControllerTest { new ResultOrError<>(Map.of())), controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of( BROKER0, List.of())).get()); - logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L)); + clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(8L)); assertEquals(Map.of(BROKER0, ApiError.NONE), future1.get()); } @@ -266,9 +264,9 @@ public class QuorumControllerTest { long sessionTimeoutMillis = 1000; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). setBootstrapMetadata(SIMPLE_BOOTSTRAP). build() @@ -344,7 +342,7 @@ public class QuorumControllerTest { // Check that there are imbalaned partitions assertTrue(active.replicationControl().arePartitionLeadersImbalanced()); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @@ -352,9 +350,9 @@ public class QuorumControllerTest { public void testElrEnabledByDefault() throws Throwable { long sessionTimeoutMillis = 500; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). setBootstrapMetadata(BootstrapMetadata.fromRecords( List.of( @@ -384,9 +382,9 @@ public class QuorumControllerTest { long sessionTimeoutMillis = 500; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, "test-provided bootstrap ELR enabled")). build() @@ -521,9 +519,9 @@ public class QuorumControllerTest { long sessionTimeoutMillis = 500; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv) + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv) .setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15))) .setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)) @@ -618,8 +616,8 @@ public class QuorumControllerTest { long sessionTimeoutMillis = 300; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1).build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, "test-provided bootstrap ELR enabled")). build() @@ -747,9 +745,9 @@ public class QuorumControllerTest { long leaderImbalanceCheckIntervalNs = 1_000_000_000; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)). setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)). setBootstrapMetadata(SIMPLE_BOOTSTRAP). @@ -873,7 +871,7 @@ public class QuorumControllerTest { "Leaders were not balanced after unfencing all of the brokers" ); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @@ -886,9 +884,9 @@ public class QuorumControllerTest { long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100); long maxReplicationDelayMs = 1_000; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setControllerBuilderInitializer(controllerBuilder -> controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs)) ). @@ -898,29 +896,29 @@ public class QuorumControllerTest { listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); QuorumController active = controlEnv.activeController(); - LocalLogManager localLogManager = logEnv - .logManagers() + MockRaftClient mockRaftClient = clientEnv + .raftClients() .stream() .filter(logManager -> logManager.nodeId().equals(OptionalInt.of(active.nodeId()))) .findAny() .get(); TestUtils.waitForCondition( - () -> localLogManager.highWatermark().isPresent(), + () -> mockRaftClient.highWatermark().isPresent(), maxReplicationDelayMs, "High watermark was not established" ); - final long firstHighWatermark = localLogManager.highWatermark().getAsLong(); + final long firstHighWatermark = mockRaftClient.highWatermark().getAsLong(); TestUtils.waitForCondition( - () -> localLogManager.highWatermark().getAsLong() > firstHighWatermark, + () -> mockRaftClient.highWatermark().getAsLong() > firstHighWatermark, maxReplicationDelayMs, "Active controller didn't write NoOpRecord the first time" ); // Do it again to make sure that we are not counting the leader change record - final long secondHighWatermark = localLogManager.highWatermark().getAsLong(); + final long secondHighWatermark = mockRaftClient.highWatermark().getAsLong(); TestUtils.waitForCondition( - () -> localLogManager.highWatermark().getAsLong() > secondHighWatermark, + () -> mockRaftClient.highWatermark().getAsLong() > secondHighWatermark, maxReplicationDelayMs, "Active controller didn't write NoOpRecord the second time" ); @@ -931,10 +929,10 @@ public class QuorumControllerTest { @CsvSource(value = {"0, 0", "0, 1", "1, 0", "1, 1"}) public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion, short brokerMaxSupportedKraftVersion) throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). setLastKRaftVersion(KRaftVersion.fromFeatureLevel(finalizedKraftVersion)). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setBootstrapMetadata(SIMPLE_BOOTSTRAP). build() ) { @@ -983,9 +981,9 @@ public class QuorumControllerTest { @Test public void testUnregisterBroker() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { ListenerCollection listeners = new ListenerCollection(); @@ -1041,7 +1039,7 @@ public class QuorumControllerTest { }); assertEquals(0, topicPartitionFuture.get().partitionId()); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @@ -1063,14 +1061,14 @@ public class QuorumControllerTest { Map<Integer, Long> brokerEpochs = new HashMap<>(); Uuid fooId; try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setBootstrapMetadata(SIMPLE_BOOTSTRAP). build() ) { QuorumController active = controlEnv.activeController(); - for (int i = 0; i < logEnv.logManagers().size(); i++) { + for (int i = 0; i < clientEnv.raftClients().size(); i++) { active.registerController(ANONYMOUS_CONTEXT, new ControllerRegistrationRequestData(). setControllerId(i). @@ -1131,9 +1129,9 @@ public class QuorumControllerTest { active.allocateProducerIds(ANONYMOUS_CONTEXT, new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); controlEnv.close(); - assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords()); + assertEquals(generateTestRecords(fooId, brokerEpochs), clientEnv.allRecords()); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @@ -1269,8 +1267,8 @@ public class QuorumControllerTest { @Test public void testTimeouts() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1).build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { QuorumController controller = controlEnv.activeController(); @@ -1315,7 +1313,7 @@ public class QuorumControllerTest { assertYieldsTimeout(alterReassignmentsFuture); assertYieldsTimeout(listReassignmentsFuture); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @@ -1330,9 +1328,9 @@ public class QuorumControllerTest { @Test public void testEarlyControllerResults() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(1). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { QuorumController controller = controlEnv.activeController(); @@ -1362,16 +1360,16 @@ public class QuorumControllerTest { alterReassignmentsFuture.get(); countDownLatch.countDown(); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @Test public void testConfigResourceExistenceChecker() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { QuorumController active = controlEnv.activeController(); @@ -1402,16 +1400,16 @@ public class QuorumControllerTest { assertThrows(UnknownTopicOrPartitionException.class, () -> checker.accept(new ConfigResource(TOPIC, "bar"))); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } @Test public void testFatalMetadataReplayErrorOnActive() throws Throwable { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). build() ) { QuorumController active = controlEnv.activeController(); @@ -1435,14 +1433,14 @@ public class QuorumControllerTest { new ApiMessageAndVersion(new PartitionRecord(), (short) 0)) ); - LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3) + MockRaftClientTestEnv.Builder clientEnvBuilder = new MockRaftClientTestEnv.Builder(3) .setSnapshotReader(FileRawSnapshotReader.open( invalidSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0) )); - try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) { - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) { + try (MockRaftClientTestEnv clientEnv = clientEnvBuilder.build()) { + try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv).build()) { TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch( controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), "At least one controller failed to detect the fatal fault" @@ -1454,11 +1452,11 @@ public class QuorumControllerTest { @Test public void testFatalMetadataErrorDuringLogLoading() throws Exception { - try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) { - logEnv.appendInitialRecords(List.of( + try (MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3).build()) { + clientEnv.appendInitialRecords(List.of( new ApiMessageAndVersion(new PartitionRecord(), (short) 0))); - try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) { + try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv).build()) { TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch( controller -> controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null), "At least one controller failed to detect the fatal fault" @@ -1505,9 +1503,9 @@ public class QuorumControllerTest { @Test public void testInsertBootstrapRecordsToEmptyLog() throws Exception { try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3). build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv). setBootstrapMetadata(COMPLEX_BOOTSTRAP). build() ) { @@ -1538,7 +1536,7 @@ public class QuorumControllerTest { Map.of("foo", "bar").equals(resultOrError.result()); }, "Failed to see expected config change from bootstrap metadata"); - testToImages(logEnv.allRecords()); + testToImages(clientEnv.allRecords()); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index b04a3e8c658..e6ececd2ba1 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -19,7 +19,6 @@ package org.apache.kafka.controller; import org.apache.kafka.metadata.FakeKafkaConfigSchema; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; -import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.MetadataVersion; @@ -41,20 +40,20 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; public class QuorumControllerTestEnv implements AutoCloseable { private final List<QuorumController> controllers; - private final LocalLogManagerTestEnv logEnv; + private final MockRaftClientTestEnv clientEnv; private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new HashMap<>(); private final Map<Integer, MockFaultHandler> nonFatalFaultHandlers = new HashMap<>(); public static class Builder { - private final LocalLogManagerTestEnv logEnv; + private final MockRaftClientTestEnv clientEnv; private Consumer<QuorumController.Builder> controllerBuilderInitializer = __ -> { }; private OptionalLong sessionTimeoutMillis = OptionalLong.empty(); private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty(); private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(MetadataVersion.latestTesting(), "test-provided version"); - public Builder(LocalLogManagerTestEnv logEnv) { - this.logEnv = logEnv; + public Builder(MockRaftClientTestEnv clientEnv) { + this.clientEnv = clientEnv; } public Builder setControllerBuilderInitializer(Consumer<QuorumController.Builder> controllerBuilderInitializer) { @@ -79,7 +78,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { public QuorumControllerTestEnv build() throws Exception { return new QuorumControllerTestEnv( - logEnv, + clientEnv, controllerBuilderInitializer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs, @@ -89,21 +88,21 @@ public class QuorumControllerTestEnv implements AutoCloseable { } private QuorumControllerTestEnv( - LocalLogManagerTestEnv logEnv, + MockRaftClientTestEnv clientEnv, Consumer<QuorumController.Builder> controllerBuilderInitializer, OptionalLong sessionTimeoutMillis, OptionalLong leaderImbalanceCheckIntervalNs, boolean eligibleLeaderReplicasEnabled, BootstrapMetadata bootstrapMetadata ) throws Exception { - this.logEnv = logEnv; - int numControllers = logEnv.logManagers().size(); + this.clientEnv = clientEnv; + int numControllers = clientEnv.raftClients().size(); this.controllers = new ArrayList<>(numControllers); try { List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().toList(); for (int nodeId = 0; nodeId < numControllers; nodeId++) { - QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId()); - builder.setRaftClient(logEnv.logManagers().get(nodeId)); + QuorumController.Builder builder = new QuorumController.Builder(nodeId, clientEnv.clusterId()); + builder.setRaftClient(clientEnv.raftClients().get(nodeId)); if (eligibleLeaderReplicasEnabled) { bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord( EligibleLeaderReplicasVersion.FEATURE_NAME, @@ -140,7 +139,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { QuorumController activeController(boolean waitForActivation) throws InterruptedException { AtomicReference<QuorumController> value = new AtomicReference<>(null); TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { - LeaderAndEpoch leader = logEnv.leaderAndEpoch(); + LeaderAndEpoch leader = clientEnv.leaderAndEpoch(); for (QuorumController controller : controllers) { if (OptionalInt.of(controller.nodeId()).equals(leader.leaderId()) && controller.curClaimEpoch() == leader.epoch()) {