This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1084d3b9c95 KAFKA-17175 Remove interface `BrokerNode` and 
`ControllerNode` (#16666)
1084d3b9c95 is described below

commit 1084d3b9c95aecccbe3c82e84ae4c8f406fc68e1
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Jul 31 01:54:02 2024 +0800

    KAFKA-17175 Remove interface `BrokerNode` and `ControllerNode` (#16666)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/test/java/kafka/testkit/BrokerNode.java   | 160 ---------------------
 .../test/java/kafka/testkit/BrokerNodeTest.java    |  43 ------
 .../test/java/kafka/testkit/ControllerNode.java    | 125 ----------------
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  10 +-
 .../kafka/testkit/KafkaClusterTestKitTest.java     |  11 +-
 core/src/test/java/kafka/testkit/TestKitNode.java  |   9 +-
 core/src/test/java/kafka/testkit/TestKitNodes.java | 146 +++++++++++++++----
 7 files changed, 136 insertions(+), 368 deletions(-)

diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java 
b/core/src/test/java/kafka/testkit/BrokerNode.java
deleted file mode 100644
index 51da3fe72ee..00000000000
--- a/core/src/test/java/kafka/testkit/BrokerNode.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.testkit;
-
-import org.apache.kafka.metadata.properties.MetaProperties;
-import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
-import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
-
-import java.io.File;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class BrokerNode implements TestKitNode {
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-        private int id = -1;
-        private String baseDirectory;
-        private String clusterId;
-        private int numLogDirectories = 1;
-        private Map<String, String> propertyOverrides = Collections.emptyMap();
-        private boolean combined;
-
-        private Builder() {}
-
-        public int id() {
-            return id;
-        }
-
-        public Builder setId(int id) {
-            this.id = id;
-            return this;
-        }
-
-        public Builder setNumLogDirectories(int numLogDirectories) {
-            this.numLogDirectories = numLogDirectories;
-            return this;
-        }
-
-        public Builder setClusterId(String clusterId) {
-            this.clusterId = clusterId;
-            return this;
-        }
-
-        public Builder setBaseDirectory(String baseDirectory) {
-            this.baseDirectory = baseDirectory;
-            return this;
-        }
-
-        public Builder setCombined(boolean combined) {
-            this.combined = combined;
-            return this;
-        }
-
-        public Builder setPropertyOverrides(Map<String, String> 
propertyOverrides) {
-            this.propertyOverrides = Collections.unmodifiableMap(new 
HashMap<>(propertyOverrides));
-            return this;
-        }
-
-        public BrokerNode build() {
-            Objects.requireNonNull(baseDirectory);
-            Objects.requireNonNull(clusterId);
-            if (id == -1) {
-                throw new IllegalArgumentException("You must set the node 
id.");
-            }
-            if (numLogDirectories < 1) {
-                throw new IllegalArgumentException("The value of 
numLogDirectories should be at least 1.");
-            }
-            List<String> logDataDirectories = IntStream
-                .range(0, numLogDirectories)
-                .mapToObj(i -> {
-                    if (combined) {
-                        return String.format("combined_%d_%d", id, i);
-                    }
-                    return String.format("broker_%d_data%d", id, i);
-                })
-                .map(logDir -> {
-                    if (Paths.get(logDir).isAbsolute()) {
-                        return logDir;
-                    }
-                    return new File(baseDirectory, logDir).getAbsolutePath();
-                })
-                .collect(Collectors.toList());
-            MetaPropertiesEnsemble.Copier copier =
-                    new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
-            copier.setMetaLogDir(Optional.of(logDataDirectories.get(0)));
-            for (String logDir : logDataDirectories) {
-                copier.setLogDirProps(logDir, new MetaProperties.Builder().
-                    setVersion(MetaPropertiesVersion.V1).
-                    setClusterId(clusterId).
-                    setNodeId(id).
-                    setDirectoryId(copier.generateValidDirectoryId()).
-                    build());
-            }
-            return new BrokerNode(
-                copier.copy(),
-                combined,
-                propertyOverrides);
-        }
-    }
-
-    private final MetaPropertiesEnsemble initialMetaPropertiesEnsemble;
-    private final boolean combined;
-    private final Map<String, String> propertyOverrides;
-    private final Set<String> logDataDirectories;
-
-    private BrokerNode(
-        MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
-        boolean combined,
-        Map<String, String> propertyOverrides
-    ) {
-        this.initialMetaPropertiesEnsemble = 
Objects.requireNonNull(initialMetaPropertiesEnsemble);
-        this.combined = combined;
-        this.propertyOverrides = Objects.requireNonNull(propertyOverrides);
-        this.logDataDirectories = 
Collections.unmodifiableSet(initialMetaPropertiesEnsemble.logDirProps().keySet());
-    }
-
-    @Override
-    public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
-        return initialMetaPropertiesEnsemble;
-    }
-
-    @Override
-    public boolean combined() {
-        return combined;
-    }
-
-    public Set<String> logDataDirectories() {
-        return logDataDirectories;
-    }
-
-    public Map<String, String> propertyOverrides() {
-        return propertyOverrides;
-    }
-}
diff --git a/core/src/test/java/kafka/testkit/BrokerNodeTest.java 
b/core/src/test/java/kafka/testkit/BrokerNodeTest.java
deleted file mode 100644
index 220c8ee4e93..00000000000
--- a/core/src/test/java/kafka/testkit/BrokerNodeTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.testkit;
-
-import org.apache.kafka.common.Uuid;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class BrokerNodeTest {
-
-    @Test
-    public void testInvalidBuilder() {
-        Assertions.assertEquals("You must set the node id.",
-                Assertions.assertThrows(IllegalArgumentException.class, () -> 
BrokerNode.builder()
-                        .setBaseDirectory("foo")
-                        .setClusterId(Uuid.randomUuid().toString())
-                        .build()).getMessage());
-
-        Assertions.assertEquals("The value of numLogDirectories should be at 
least 1.",
-                Assertions.assertThrows(IllegalArgumentException.class, () -> 
BrokerNode.builder()
-                        .setBaseDirectory("foo")
-                        .setClusterId(Uuid.randomUuid().toString())
-                        .setId(0)
-                        .setNumLogDirectories(0)
-                        .build()).getMessage());
-    }
-}
diff --git a/core/src/test/java/kafka/testkit/ControllerNode.java 
b/core/src/test/java/kafka/testkit/ControllerNode.java
deleted file mode 100644
index a52a6868e53..00000000000
--- a/core/src/test/java/kafka/testkit/ControllerNode.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.testkit;
-
-import org.apache.kafka.metadata.properties.MetaProperties;
-import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
-import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-public class ControllerNode implements TestKitNode {
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-        private int id = -1;
-        private String baseDirectory;
-        private String clusterId;
-        private boolean combined;
-        private Map<String, String> propertyOverrides = Collections.emptyMap();
-
-        private Builder() {}
-
-        public int id() {
-            return id;
-        }
-
-        public Builder setId(int id) {
-            this.id = id;
-            return this;
-        }
-
-        public Builder setClusterId(String clusterId) {
-            this.clusterId = clusterId;
-            return this;
-        }
-
-        public Builder setBaseDirectory(String baseDirectory) {
-            this.baseDirectory = baseDirectory;
-            return this;
-        }
-
-        public Builder setCombined(boolean combined) {
-            this.combined = combined;
-            return this;
-        }
-
-        public Builder setPropertyOverrides(Map<String, String> 
propertyOverrides) {
-            this.propertyOverrides = Collections.unmodifiableMap(new 
HashMap<>(propertyOverrides));
-            return this;
-        }
-
-        public ControllerNode build() {
-            if (id == -1) {
-                throw new IllegalArgumentException("You must set the node 
id.");
-            }
-            if (baseDirectory == null) {
-                throw new IllegalArgumentException("You must set the base 
directory.");
-            }
-            String metadataDirectory = new File(baseDirectory,
-                combined ? String.format("combined_%d_0", id) : 
String.format("controller_%d", id)).getAbsolutePath();
-            MetaPropertiesEnsemble.Copier copier =
-                new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
-            copier.setMetaLogDir(Optional.of(metadataDirectory));
-            copier.setLogDirProps(metadataDirectory, new 
MetaProperties.Builder().
-                setVersion(MetaPropertiesVersion.V1).
-                setClusterId(clusterId.toString()).
-                setNodeId(id).
-                setDirectoryId(copier.generateValidDirectoryId()).
-                build());
-            return new ControllerNode(copier.copy(), combined, 
propertyOverrides);
-        }
-    }
-
-    private final MetaPropertiesEnsemble initialMetaPropertiesEnsemble;
-
-    private final boolean combined;
-
-    private final Map<String, String> propertyOverrides;
-
-    private ControllerNode(
-        MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
-        boolean combined,
-        Map<String, String> propertyOverrides
-    ) {
-        this.initialMetaPropertiesEnsemble = 
Objects.requireNonNull(initialMetaPropertiesEnsemble);
-        this.combined = combined;
-        this.propertyOverrides = Objects.requireNonNull(propertyOverrides);
-    }
-
-    @Override
-    public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
-        return initialMetaPropertiesEnsemble;
-    }
-
-    @Override
-    public boolean combined() {
-        return combined;
-    }
-
-    public Map<String, String> propertyOverrides() {
-        return propertyOverrides;
-    }
-}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java 
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index e44f5ad5219..fa9eb64b0cc 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -162,8 +162,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         }
 
         private KafkaConfig createNodeConfig(TestKitNode node) {
-            BrokerNode brokerNode = nodes.brokerNodes().get(node.id());
-            ControllerNode controllerNode = 
nodes.controllerNodes().get(node.id());
+            TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
+            TestKitNode controllerNode = 
nodes.controllerNodes().get(node.id());
 
             Map<String, Object> props = new HashMap<>(configProps);
             props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
@@ -241,7 +241,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 baseDirectory = new File(nodes.baseDirectory());
                 executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
                     
ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
-                for (ControllerNode node : nodes.controllerNodes().values()) {
+                for (TestKitNode node : nodes.controllerNodes().values()) {
                     setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
                     SharedServer sharedServer = new SharedServer(
                         createNodeConfig(node),
@@ -273,7 +273,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     });
                     jointServers.put(node.id(), sharedServer);
                 }
-                for (BrokerNode node : nodes.brokerNodes().values()) {
+                for (TestKitNode node : nodes.brokerNodes().values()) {
                     SharedServer sharedServer = jointServers.computeIfAbsent(
                         node.id(),
                         id -> new SharedServer(
@@ -390,7 +390,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 BrokerServer broker = entry.getValue();
                 futures.add(executorService.submit(() -> {
                     formatNode(broker.sharedServer().metaPropsEnsemble(),
-                        !nodes().brokerNodes().get(entry.getKey()).combined());
+                        
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()));
                 }));
             }
             for (Future<?> future: futures) {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java 
b/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
index 1987c1d444a..f2b2cf2dee1 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
@@ -96,13 +96,14 @@ public class KafkaClusterTestKitTest {
                         setCombined(combined).
                         setNumControllerNodes(3).build()).build()) {
 
-            assertEquals(5, cluster.nodes().brokerNodes().size());
-            assertEquals(3, cluster.nodes().controllerNodes().size());
+            TestKitNodes nodes = cluster.nodes();
+            assertEquals(5, nodes.brokerNodes().size());
+            assertEquals(3, nodes.controllerNodes().size());
 
-            cluster.nodes().brokerNodes().forEach((brokerId, node) -> {
+            nodes.brokerNodes().forEach((brokerId, node) -> {
                 assertEquals(2, node.logDataDirectories().size());
                 Set<String> expected = new 
HashSet<>(Arrays.asList(String.format("broker_%d_data0", brokerId), 
String.format("broker_%d_data1", brokerId)));
-                if (node.combined()) {
+                if (nodes.isCombined(node.id())) {
                     expected = new 
HashSet<>(Arrays.asList(String.format("combined_%d_0", brokerId), 
String.format("combined_%d_1", brokerId)));
                 }
                 assertEquals(
@@ -113,7 +114,7 @@ public class KafkaClusterTestKitTest {
                 );
             });
 
-            cluster.nodes().controllerNodes().forEach((controllerId, node) -> {
+            nodes.controllerNodes().forEach((controllerId, node) -> {
                 String expected = combined ? String.format("combined_%d_0", 
controllerId) : String.format("controller_%d", controllerId);
                 assertEquals(expected, 
Paths.get(node.metadataDirectory()).getFileName().toString());
             });
diff --git a/core/src/test/java/kafka/testkit/TestKitNode.java 
b/core/src/test/java/kafka/testkit/TestKitNode.java
index 17f73e82569..2293c91bd21 100644
--- a/core/src/test/java/kafka/testkit/TestKitNode.java
+++ b/core/src/test/java/kafka/testkit/TestKitNode.java
@@ -19,6 +19,9 @@ package kafka.testkit;
 
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 
+import java.util.Map;
+import java.util.Set;
+
 public interface TestKitNode {
     default int id() {
         return initialMetaPropertiesEnsemble().nodeId().getAsInt();
@@ -28,7 +31,11 @@ public interface TestKitNode {
         return initialMetaPropertiesEnsemble().metadataLogDir().get();
     }
 
+    default Set<String> logDataDirectories() {
+        return initialMetaPropertiesEnsemble().logDirProps().keySet();
+    }
+
     MetaPropertiesEnsemble initialMetaPropertiesEnsemble();
 
-    boolean combined();
+    Map<String, String> propertyOverrides();
 }
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java 
b/core/src/test/java/kafka/testkit/TestKitNodes.java
index 1e12f25c08c..b716f664168 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -20,14 +20,20 @@ package kafka.testkit;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -128,51 +134,47 @@ public class TestKitNodes {
                                         .collect(Collectors.joining(", "))));
             }
 
-            TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
+            TreeMap<Integer, TestKitNode> controllerNodes = new TreeMap<>();
             for (int id : controllerNodeIds) {
-                ControllerNode controllerNode = ControllerNode.builder()
-                    .setId(id)
-                    .setBaseDirectory(baseDirectory)
-                    .setClusterId(clusterId)
-                    .setCombined(brokerNodeIds.contains(id))
-                    .setPropertyOverrides(perServerProperties.getOrDefault(id, 
Collections.emptyMap()))
-                    .build();
+                TestKitNode controllerNode = TestKitNodes.buildControllerNode(
+                    id,
+                    baseDirectory,
+                    clusterId,
+                    brokerNodeIds.contains(id),
+                    perServerProperties.getOrDefault(id, 
Collections.emptyMap())
+                );
                 controllerNodes.put(id, controllerNode);
             }
 
-            TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
+            TreeMap<Integer, TestKitNode> brokerNodes = new TreeMap<>();
             for (int id : brokerNodeIds) {
-                BrokerNode brokerNode = BrokerNode.builder()
-                    .setId(id)
-                    .setNumLogDirectories(numDisksPerBroker)
-                    .setBaseDirectory(baseDirectory)
-                    .setClusterId(clusterId)
-                    .setCombined(controllerNodeIds.contains(id))
-                    .setPropertyOverrides(perServerProperties.getOrDefault(id, 
Collections.emptyMap()))
-                    .build();
+                TestKitNode brokerNode = TestKitNodes.buildBrokerNode(
+                    id,
+                    baseDirectory,
+                    clusterId,
+                    controllerNodeIds.contains(id),
+                    perServerProperties.getOrDefault(id, 
Collections.emptyMap()),
+                    numDisksPerBroker
+                );
                 brokerNodes.put(id, brokerNode);
             }
 
-            return new TestKitNodes(baseDirectory,
-                clusterId,
-                bootstrapMetadata,
-                controllerNodes,
-                brokerNodes);
+            return new TestKitNodes(baseDirectory, clusterId, 
bootstrapMetadata, controllerNodes, brokerNodes);
         }
     }
 
     private final String baseDirectory;
     private final String clusterId;
     private final BootstrapMetadata bootstrapMetadata;
-    private final SortedMap<Integer, ControllerNode> controllerNodes;
-    private final SortedMap<Integer, BrokerNode> brokerNodes;
+    private final SortedMap<Integer, TestKitNode> controllerNodes;
+    private final SortedMap<Integer, TestKitNode> brokerNodes;
 
     private TestKitNodes(
         String baseDirectory,
         String clusterId,
         BootstrapMetadata bootstrapMetadata,
-        SortedMap<Integer, ControllerNode> controllerNodes,
-        SortedMap<Integer, BrokerNode> brokerNodes
+        SortedMap<Integer, TestKitNode> controllerNodes,
+        SortedMap<Integer, TestKitNode> brokerNodes
     ) {
         this.baseDirectory = Objects.requireNonNull(baseDirectory);
         this.clusterId = Objects.requireNonNull(clusterId);
@@ -193,7 +195,7 @@ public class TestKitNodes {
         return clusterId;
     }
 
-    public SortedMap<Integer, ControllerNode> controllerNodes() {
+    public SortedMap<Integer, TestKitNode> controllerNodes() {
         return controllerNodes;
     }
 
@@ -201,7 +203,7 @@ public class TestKitNodes {
         return bootstrapMetadata;
     }
 
-    public SortedMap<Integer, BrokerNode> brokerNodes() {
+    public SortedMap<Integer, TestKitNode> brokerNodes() {
         return brokerNodes;
     }
 
@@ -216,4 +218,90 @@ public class TestKitNodes {
     public ListenerName controllerListenerName() {
         return new ListenerName("CONTROLLER");
     }
-}
+
+    private static TestKitNode buildBrokerNode(int id,
+                                              String baseDirectory,
+                                              String clusterId,
+                                              boolean combined,
+                                              Map<String, String> 
propertyOverrides,
+                                              int numDisksPerBroker) {
+        List<String> logDataDirectories = IntStream
+            .range(0, numDisksPerBroker)
+            .mapToObj(i -> {
+                if (combined) {
+                    return String.format("combined_%d_%d", id, i);
+                }
+                return String.format("broker_%d_data%d", id, i);
+            })
+            .map(logDir -> {
+                if (Paths.get(logDir).isAbsolute()) {
+                    return logDir;
+                }
+                return new File(baseDirectory, logDir).getAbsolutePath();
+            })
+            .collect(Collectors.toList());
+        MetaPropertiesEnsemble.Copier copier = new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
+
+        copier.setMetaLogDir(Optional.of(logDataDirectories.get(0)));
+        for (String logDir : logDataDirectories) {
+            copier.setLogDirProps(
+                logDir,
+                new MetaProperties.Builder()
+                    .setVersion(MetaPropertiesVersion.V1)
+                    .setClusterId(clusterId)
+                    .setNodeId(id)
+                    .setDirectoryId(copier.generateValidDirectoryId())
+                    .build()
+            );
+        }
+
+        return new TestKitNode() {
+            private final MetaPropertiesEnsemble ensemble = copier.copy();
+
+            @Override
+            public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
+                return ensemble;
+            }
+
+            @Override
+            public Map<String, String> propertyOverrides() {
+                return Collections.unmodifiableMap(propertyOverrides);
+            }
+        };
+    }
+
+    private static TestKitNode buildControllerNode(int id,
+                                                  String baseDirectory,
+                                                  String clusterId,
+                                                  boolean combined,
+                                                  Map<String, String> 
propertyOverrides) {
+        String metadataDirectory = new File(baseDirectory,
+            combined ? String.format("combined_%d_0", id) : 
String.format("controller_%d", id)).getAbsolutePath();
+        MetaPropertiesEnsemble.Copier copier = new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
+
+        copier.setMetaLogDir(Optional.of(metadataDirectory));
+        copier.setLogDirProps(
+            metadataDirectory,
+            new MetaProperties.Builder()
+                .setVersion(MetaPropertiesVersion.V1)
+                .setClusterId(clusterId)
+                .setNodeId(id)
+                .setDirectoryId(copier.generateValidDirectoryId())
+                .build()
+        );
+
+        return new TestKitNode() {
+            private final MetaPropertiesEnsemble ensemble = copier.copy();
+
+            @Override
+            public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
+                return ensemble;
+            }
+
+            @Override
+            public Map<String, String> propertyOverrides() {
+                return Collections.unmodifiableMap(propertyOverrides);
+            }
+        };
+    }
+}
\ No newline at end of file

Reply via email to