This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 051fb66fe6f KAFKA-19938: Fix combined node log dir format in 
KafkaClusterTestKit (#21017)
051fb66fe6f is described below

commit 051fb66fe6f3990293c51e996179511b4b152502
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Dec 3 17:30:13 2025 +0100

    KAFKA-19938: Fix combined node log dir format in KafkaClusterTestKit 
(#21017)
    
    The previous logic did not format all log directories on combined nodes.
    I was only formatting the metadata log directory which is the expected
    result, all log directories should be formatted.
    
    I stumble on this issue while implementing
    [KIP-1066](https://cwiki.apache.org/confluence/x/Lg_TEg)
    
    Reviewers: PoAn Yang <[email protected]>
---
 .../kafka/common/test/KafkaClusterTestKit.java     | 25 +++-------
 .../org/apache/kafka/common/test/TestKitNodes.java | 38 +++++++++++-----
 .../kafka/common/test/KafkaClusterTestKitTest.java | 53 ++++++++++++++++------
 3 files changed, 73 insertions(+), 43 deletions(-)

diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index a8440ff32fd..c958f144f62 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -75,7 +75,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
 import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
@@ -441,12 +440,13 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
         List<Future<?>> futures = new ArrayList<>();
         try {
             for (ControllerServer controller : controllers.values()) {
-                futures.add(executorService.submit(() -> 
formatNode(controller.sharedServer().metaPropsEnsemble(), true)));
+                futures.add(executorService.submit(() -> 
formatNode(controller.sharedServer().metaPropsEnsemble())));
             }
             for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
                 BrokerServer broker = entry.getValue();
-                futures.add(executorService.submit(() -> 
formatNode(broker.sharedServer().metaPropsEnsemble(),
-                    
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()))));
+                if 
(!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id())) {
+                    futures.add(executorService.submit(() -> 
formatNode(broker.sharedServer().metaPropsEnsemble())));
+                }
             }
             for (Future<?> future: futures) {
                 future.get();
@@ -460,21 +460,14 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
     }
 
     private void formatNode(
-        MetaPropertiesEnsemble ensemble,
-        boolean writeMetadataDirectory
+        MetaPropertiesEnsemble ensemble
     ) {
         try {
             final var nodeId = ensemble.nodeId().getAsInt();
             Formatter formatter = new Formatter();
             formatter.setNodeId(nodeId);
             formatter.setClusterId(ensemble.clusterId().get());
-            if (writeMetadataDirectory) {
-                formatter.setDirectories(ensemble.logDirProps().keySet());
-            } else {
-                
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
-                    filter(d -> !ensemble.metadataLogDir().get().equals(d)).
-                    collect(Collectors.toSet()));
-            }
+            formatter.setDirectories(ensemble.logDirProps().keySet());
             if (formatter.directories().isEmpty()) {
                 return;
             }
@@ -482,11 +475,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             formatter.setUnstableFeatureVersionsEnabled(true);
             formatter.setIgnoreFormatted(false);
             formatter.setControllerListenerName(controllerListenerName);
-            if (writeMetadataDirectory) {
-                
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
-            } else {
-                formatter.setMetadataLogDirectory(Optional.empty());
-            }
+            formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
             StringBuilder dynamicVotersBuilder = new StringBuilder();
             String prefix = "";
             if (standalone) {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 3622430f487..30c597915c1 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -196,7 +196,8 @@ public class TestKitNodes {
                     baseDirectory.toFile().getAbsolutePath(),
                     clusterId,
                     brokerNodeIds.contains(id),
-                    perServerProperties.getOrDefault(id, Map.of())
+                    perServerProperties.getOrDefault(id, Map.of()),
+                    numDisksPerBroker
                 );
                 controllerNodes.put(id, controllerNode);
             }
@@ -346,21 +347,36 @@ public class TestKitNodes {
                                                   String baseDirectory,
                                                   String clusterId,
                                                   boolean combined,
-                                                  Map<String, String> 
propertyOverrides) {
+                                                  Map<String, String> 
propertyOverrides,
+                                                  int numDisksPerController) {
+        List<String> logDataDirectories = combined
+            ? IntStream
+                .range(0, numDisksPerController)
+                .mapToObj(i -> String.format("combined_%d_%d", id, i))
+                .map(logDir -> {
+                    if (Paths.get(logDir).isAbsolute()) {
+                        return logDir;
+                    }
+                    return new File(baseDirectory, logDir).getAbsolutePath();
+                })
+                .toList()
+            : List.of(new File(baseDirectory, String.format("controller_%d", 
id)).getAbsolutePath());
         String metadataDirectory = new File(baseDirectory,
             combined ? String.format("combined_%d_0", id) : 
String.format("controller_%d", id)).getAbsolutePath();
         MetaPropertiesEnsemble.Copier copier = new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
 
         copier.setMetaLogDir(Optional.of(metadataDirectory));
-        copier.setLogDirProps(
-            metadataDirectory,
-            new MetaProperties.Builder()
-                .setVersion(MetaPropertiesVersion.V1)
-                .setClusterId(clusterId)
-                .setNodeId(id)
-                .setDirectoryId(copier.generateValidDirectoryId())
-                .build()
-        );
+        for (String logDir : logDataDirectories) {
+            copier.setLogDirProps(
+                logDir,
+                new MetaProperties.Builder()
+                    .setVersion(MetaPropertiesVersion.V1)
+                    .setClusterId(clusterId)
+                    .setNodeId(id)
+                    .setDirectoryId(copier.generateValidDirectoryId())
+                    .build()
+            );
+        }
 
         return new TestKitNode() {
             private final MetaPropertiesEnsemble ensemble = copier.copy();
diff --git 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
index fa287aaaf4a..3272ab3fa78 100644
--- 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
+++ 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/KafkaClusterTestKitTest.java
@@ -17,16 +17,22 @@
 
 package org.apache.kafka.common.test;
 
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -87,37 +93,56 @@ public class KafkaClusterTestKitTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) 
throws Exception {
+    @CsvSource({
+        "true,1,1,2", /* 1 combined node */
+        "true,5,7,2", /* 5 combined nodes + 2 controllers */
+        "true,7,5,2", /* 7 combined nodes */
+        "false,1,1,2", /* 1 broker + 1 controller */
+        "false,5,7,2", /* 5 brokers + 7 controllers */
+        "false,7,5,2", /* 7 brokers + 5 controllers */
+    })
+    public void testCreateClusterFormatAndCloseWithMultipleLogDirs(boolean 
combined, int numBrokers, int numControllers, int numDisks) throws Exception {
         try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
             new TestKitNodes.Builder().
-                setNumBrokerNodes(5).
-                setNumDisksPerBroker(2).
+                setNumBrokerNodes(numBrokers).
+                setNumDisksPerBroker(numDisks).
                 setCombined(combined).
-                setNumControllerNodes(3).build()).build()) {
+                setNumControllerNodes(numControllers).build()).build()) {
 
             TestKitNodes nodes = cluster.nodes();
-            assertEquals(5, nodes.brokerNodes().size());
-            assertEquals(3, nodes.controllerNodes().size());
+            assertEquals(numBrokers, nodes.brokerNodes().size());
+            assertEquals(numControllers, nodes.controllerNodes().size());
 
+            Set<String> logDirs = new HashSet<>();
             nodes.brokerNodes().forEach((brokerId, node) -> {
-                assertEquals(2, node.logDataDirectories().size());
-                Set<String> expected = Set.of(String.format("broker_%d_data0", 
brokerId), String.format("broker_%d_data1", brokerId));
-                if (nodes.isCombined(node.id())) {
-                    expected = Set.of(String.format("combined_%d_0", 
brokerId), String.format("combined_%d_1", brokerId));
-                }
+                assertEquals(numDisks, node.logDataDirectories().size());
+                Set<String> expectedDisks = IntStream.range(0, numDisks)
+                        .mapToObj(i -> {
+                            if (nodes.isCombined(node.id())) {
+                                return String.format("combined_%d_%d", 
brokerId, i);
+                            } else {
+                                return String.format("broker_%d_data%d", 
brokerId, i);
+                            }
+                        }).collect(Collectors.toSet());
                 assertEquals(
-                    expected,
+                    expectedDisks,
                     node.logDataDirectories().stream()
                         .map(p -> Paths.get(p).getFileName().toString())
                         .collect(Collectors.toSet())
                 );
+                logDirs.addAll(node.logDataDirectories());
             });
 
             nodes.controllerNodes().forEach((controllerId, node) -> {
-                String expected = combined ? String.format("combined_%d_0", 
controllerId) : String.format("controller_%d", controllerId);
+                String expected = nodes.isCombined(node.id()) ? 
String.format("combined_%d_0", controllerId) : String.format("controller_%d", 
controllerId);
                 assertEquals(expected, 
Paths.get(node.metadataDirectory()).getFileName().toString());
+                logDirs.addAll(node.logDataDirectories());
             });
+
+            cluster.format();
+            logDirs.forEach(logDir ->
+                assertTrue(Files.exists(Paths.get(logDir, 
MetaPropertiesEnsemble.META_PROPERTIES_NAME)))
+            );
         }
     }
 

Reply via email to