This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 979fcc3162b KAFKA-20574 SimpleConfigRepository in the storage module
could be replaced by MockConfigRepository (#22280)
979fcc3162b is described below
commit 979fcc3162beb7845140e23a0b416bc7a3d6d6e4
Author: Murali Basani <[email protected]>
AuthorDate: Fri May 15 09:20:35 2026 +0200
KAFKA-20574 SimpleConfigRepository in the storage module could be replaced
by MockConfigRepository (#22280)
Ref : https://issues.apache.org/jira/browse/KAFKA-20574
After test fixtures in metadata module, MockConfigRepository can be used
(removing duplicate SimpleConfigRepository) by importing it into storage
module.
Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang
<[email protected]>
---
build.gradle | 1 +
.../storage/internals/log/LogManagerTest.java | 46 ++++++----------------
2 files changed, 13 insertions(+), 34 deletions(-)
diff --git a/build.gradle b/build.gradle
index af5955316e6..9f1329dccff 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2472,6 +2472,7 @@ project(':storage') {
testImplementation project(':server')
testImplementation project(':server-common')
testImplementation testFixtures(project(':server-common'))
+ testImplementation testFixtures(project(':metadata'))
testImplementation project(':transaction-coordinator')
testImplementation libs.hamcrest
testImplementation libs.jacksonDataformatYaml
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
index a442edb1908..eb522be44d4 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogManagerTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
@@ -29,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.metadata.ConfigRepository;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
@@ -61,7 +61,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -466,7 +465,7 @@ public class LogManagerTest {
Properties properties = new Properties();
properties.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG,
String.valueOf(segmentBytes));
properties.put(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(5L *
10L * setSize + 10L));
- ConfigRepository configRepository =
SimpleConfigRepository.forTopic(NAME, properties);
+ ConfigRepository configRepository =
MockConfigRepository.forTopic(NAME, properties);
logManager = createLogManager(configRepository);
logManager.startup(Set.of());
@@ -520,7 +519,7 @@ public class LogManagerTest {
private void testDoesntCleanLogs(String policy) throws IOException {
logManager.shutdown();
- ConfigRepository configRepository =
SimpleConfigRepository.forTopic(NAME, TopicConfig.CLEANUP_POLICY_CONFIG,
policy);
+ ConfigRepository configRepository =
MockConfigRepository.forTopic(NAME, TopicConfig.CLEANUP_POLICY_CONFIG, policy);
logManager = createLogManager(configRepository);
UnifiedLog log = logManager.getOrCreateLog(new TopicPartition(NAME,
0), Optional.empty());
@@ -544,7 +543,7 @@ public class LogManagerTest {
@Test
public void testTimeBasedFlush() throws Exception {
logManager.shutdown();
- ConfigRepository configRepository =
SimpleConfigRepository.forTopic(NAME, TopicConfig.FLUSH_MS_CONFIG, "1000");
+ ConfigRepository configRepository =
MockConfigRepository.forTopic(NAME, TopicConfig.FLUSH_MS_CONFIG, "1000");
logManager = createLogManager(configRepository);
logManager.startup(Set.of());
@@ -684,7 +683,7 @@ public class LogManagerTest {
}
private LogManager createLogManager(List<File> logDirs) throws IOException
{
- return createLogManager(logDirs, new SimpleConfigRepository(), 1);
+ return createLogManager(logDirs, new MockConfigRepository(), 1);
}
private LogManager createLogManager(ConfigRepository configRepository)
throws IOException {
@@ -692,7 +691,7 @@ public class LogManagerTest {
}
private LogManager createLogManager(List<File> logDirs, int
recoveryThreadsPerDataDir) throws IOException {
- return createLogManager(logDirs, new SimpleConfigRepository(),
recoveryThreadsPerDataDir);
+ return createLogManager(logDirs, new MockConfigRepository(),
recoveryThreadsPerDataDir);
}
private LogManager createLogManager(List<File> logDirs, ConfigRepository
configRepository, int recoveryThreadsPerDataDir) throws IOException {
@@ -799,7 +798,7 @@ public class LogManagerTest {
@Test
public void testTopicConfigChangeUpdatesLogConfig() throws IOException {
logManager.shutdown();
- ConfigRepository spyConfigRepository = spy(new
SimpleConfigRepository());
+ ConfigRepository spyConfigRepository = spy(new MockConfigRepository());
logManager = createLogManager(spyConfigRepository);
LogManager spyLogManager = spy(logManager);
UnifiedLog mockLog = mock(UnifiedLog.class);
@@ -835,7 +834,7 @@ public class LogManagerTest {
@Test
public void testConfigChangeGetsCleanedUp() throws IOException {
logManager.shutdown();
- ConfigRepository spyConfigRepository = spy(new
SimpleConfigRepository());
+ ConfigRepository spyConfigRepository = spy(new MockConfigRepository());
logManager = createLogManager(spyConfigRepository);
LogManager spyLogManager = spy(logManager);
@@ -854,7 +853,7 @@ public class LogManagerTest {
@Test
public void testBrokerConfigChangeDeliveredToAllLogs() throws IOException {
logManager.shutdown();
- ConfigRepository spyConfigRepository = spy(new
SimpleConfigRepository());
+ ConfigRepository spyConfigRepository = spy(new MockConfigRepository());
logManager = createLogManager(spyConfigRepository);
LogManager spyLogManager = spy(logManager);
UnifiedLog mockLog = mock(UnifiedLog.class);
@@ -882,7 +881,7 @@ public class LogManagerTest {
@Test
public void testTopicConfigChangeStopCleaningIfCompactIsRemoved() throws
IOException {
logManager.shutdown();
- logManager = createLogManager(new SimpleConfigRepository());
+ logManager = createLogManager(new MockConfigRepository());
LogManager spyLogManager = spy(logManager);
String topic = "topic";
@@ -1350,7 +1349,7 @@ public class LogManagerTest {
logManager = LogTestUtils.createLogManager(
List.of(this.logDir),
logConfig,
- new SimpleConfigRepository(),
+ new MockConfigRepository(),
time,
1,
true
@@ -1465,7 +1464,7 @@ public class LogManagerTest {
KafkaScheduler scheduler = new KafkaScheduler(1, true,
"log-manager-test");
LogManager tmpLogManager = new
LogManager(List.of(tmpLogDir.getAbsoluteFile()),
List.of(),
- new SimpleConfigRepository(),
+ new MockConfigRepository(),
new LogConfig(tmpProperties),
new CleanerConfig(false),
1,
@@ -1586,25 +1585,4 @@ public class LogManagerTest {
assertTrue(LogManager.isStrayReplica(List.of(1, 2, 3), 0, log));
assertFalse(LogManager.isStrayReplica(List.of(0, 1, 2), 0, log));
}
-
- private static class SimpleConfigRepository implements ConfigRepository {
- private final Map<ConfigResource, Properties> configs = new
HashMap<>();
-
- public static SimpleConfigRepository forTopic(String topic, String
key, String value) {
- Properties properties = new Properties();
- properties.put(key, value);
- return forTopic(topic, properties);
- }
-
- public static SimpleConfigRepository forTopic(String topic, Properties
properties) {
- SimpleConfigRepository repository = new SimpleConfigRepository();
- repository.configs.put(new
ConfigResource(ConfigResource.Type.TOPIC, topic), properties);
- return repository;
- }
-
- @Override
- public Properties config(ConfigResource configResource) {
- return configs.getOrDefault(configResource, new Properties());
- }
- }
}