This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1084d3b9c95 KAFKA-17175 Remove interface `BrokerNode` and
`ControllerNode` (#16666)
1084d3b9c95 is described below
commit 1084d3b9c95aecccbe3c82e84ae4c8f406fc68e1
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Jul 31 01:54:02 2024 +0800
KAFKA-17175 Remove interface `BrokerNode` and `ControllerNode` (#16666)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/test/java/kafka/testkit/BrokerNode.java | 160 ---------------------
.../test/java/kafka/testkit/BrokerNodeTest.java | 43 ------
.../test/java/kafka/testkit/ControllerNode.java | 125 ----------------
.../java/kafka/testkit/KafkaClusterTestKit.java | 10 +-
.../kafka/testkit/KafkaClusterTestKitTest.java | 11 +-
core/src/test/java/kafka/testkit/TestKitNode.java | 9 +-
core/src/test/java/kafka/testkit/TestKitNodes.java | 146 +++++++++++++++----
7 files changed, 136 insertions(+), 368 deletions(-)
diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java
b/core/src/test/java/kafka/testkit/BrokerNode.java
deleted file mode 100644
index 51da3fe72ee..00000000000
--- a/core/src/test/java/kafka/testkit/BrokerNode.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.testkit;
-
-import org.apache.kafka.metadata.properties.MetaProperties;
-import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
-import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
-
-import java.io.File;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-public class BrokerNode implements TestKitNode {
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private int id = -1;
- private String baseDirectory;
- private String clusterId;
- private int numLogDirectories = 1;
- private Map<String, String> propertyOverrides = Collections.emptyMap();
- private boolean combined;
-
- private Builder() {}
-
- public int id() {
- return id;
- }
-
- public Builder setId(int id) {
- this.id = id;
- return this;
- }
-
- public Builder setNumLogDirectories(int numLogDirectories) {
- this.numLogDirectories = numLogDirectories;
- return this;
- }
-
- public Builder setClusterId(String clusterId) {
- this.clusterId = clusterId;
- return this;
- }
-
- public Builder setBaseDirectory(String baseDirectory) {
- this.baseDirectory = baseDirectory;
- return this;
- }
-
- public Builder setCombined(boolean combined) {
- this.combined = combined;
- return this;
- }
-
- public Builder setPropertyOverrides(Map<String, String>
propertyOverrides) {
- this.propertyOverrides = Collections.unmodifiableMap(new
HashMap<>(propertyOverrides));
- return this;
- }
-
- public BrokerNode build() {
- Objects.requireNonNull(baseDirectory);
- Objects.requireNonNull(clusterId);
- if (id == -1) {
- throw new IllegalArgumentException("You must set the node
id.");
- }
- if (numLogDirectories < 1) {
- throw new IllegalArgumentException("The value of
numLogDirectories should be at least 1.");
- }
- List<String> logDataDirectories = IntStream
- .range(0, numLogDirectories)
- .mapToObj(i -> {
- if (combined) {
- return String.format("combined_%d_%d", id, i);
- }
- return String.format("broker_%d_data%d", id, i);
- })
- .map(logDir -> {
- if (Paths.get(logDir).isAbsolute()) {
- return logDir;
- }
- return new File(baseDirectory, logDir).getAbsolutePath();
- })
- .collect(Collectors.toList());
- MetaPropertiesEnsemble.Copier copier =
- new
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
- copier.setMetaLogDir(Optional.of(logDataDirectories.get(0)));
- for (String logDir : logDataDirectories) {
- copier.setLogDirProps(logDir, new MetaProperties.Builder().
- setVersion(MetaPropertiesVersion.V1).
- setClusterId(clusterId).
- setNodeId(id).
- setDirectoryId(copier.generateValidDirectoryId()).
- build());
- }
- return new BrokerNode(
- copier.copy(),
- combined,
- propertyOverrides);
- }
- }
-
- private final MetaPropertiesEnsemble initialMetaPropertiesEnsemble;
- private final boolean combined;
- private final Map<String, String> propertyOverrides;
- private final Set<String> logDataDirectories;
-
- private BrokerNode(
- MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
- boolean combined,
- Map<String, String> propertyOverrides
- ) {
- this.initialMetaPropertiesEnsemble =
Objects.requireNonNull(initialMetaPropertiesEnsemble);
- this.combined = combined;
- this.propertyOverrides = Objects.requireNonNull(propertyOverrides);
- this.logDataDirectories =
Collections.unmodifiableSet(initialMetaPropertiesEnsemble.logDirProps().keySet());
- }
-
- @Override
- public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
- return initialMetaPropertiesEnsemble;
- }
-
- @Override
- public boolean combined() {
- return combined;
- }
-
- public Set<String> logDataDirectories() {
- return logDataDirectories;
- }
-
- public Map<String, String> propertyOverrides() {
- return propertyOverrides;
- }
-}
diff --git a/core/src/test/java/kafka/testkit/BrokerNodeTest.java
b/core/src/test/java/kafka/testkit/BrokerNodeTest.java
deleted file mode 100644
index 220c8ee4e93..00000000000
--- a/core/src/test/java/kafka/testkit/BrokerNodeTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.testkit;
-
-import org.apache.kafka.common.Uuid;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class BrokerNodeTest {
-
- @Test
- public void testInvalidBuilder() {
- Assertions.assertEquals("You must set the node id.",
- Assertions.assertThrows(IllegalArgumentException.class, () ->
BrokerNode.builder()
- .setBaseDirectory("foo")
- .setClusterId(Uuid.randomUuid().toString())
- .build()).getMessage());
-
- Assertions.assertEquals("The value of numLogDirectories should be at
least 1.",
- Assertions.assertThrows(IllegalArgumentException.class, () ->
BrokerNode.builder()
- .setBaseDirectory("foo")
- .setClusterId(Uuid.randomUuid().toString())
- .setId(0)
- .setNumLogDirectories(0)
- .build()).getMessage());
- }
-}
diff --git a/core/src/test/java/kafka/testkit/ControllerNode.java
b/core/src/test/java/kafka/testkit/ControllerNode.java
deleted file mode 100644
index a52a6868e53..00000000000
--- a/core/src/test/java/kafka/testkit/ControllerNode.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.testkit;
-
-import org.apache.kafka.metadata.properties.MetaProperties;
-import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
-import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-
-public class ControllerNode implements TestKitNode {
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private int id = -1;
- private String baseDirectory;
- private String clusterId;
- private boolean combined;
- private Map<String, String> propertyOverrides = Collections.emptyMap();
-
- private Builder() {}
-
- public int id() {
- return id;
- }
-
- public Builder setId(int id) {
- this.id = id;
- return this;
- }
-
- public Builder setClusterId(String clusterId) {
- this.clusterId = clusterId;
- return this;
- }
-
- public Builder setBaseDirectory(String baseDirectory) {
- this.baseDirectory = baseDirectory;
- return this;
- }
-
- public Builder setCombined(boolean combined) {
- this.combined = combined;
- return this;
- }
-
- public Builder setPropertyOverrides(Map<String, String>
propertyOverrides) {
- this.propertyOverrides = Collections.unmodifiableMap(new
HashMap<>(propertyOverrides));
- return this;
- }
-
- public ControllerNode build() {
- if (id == -1) {
- throw new IllegalArgumentException("You must set the node
id.");
- }
- if (baseDirectory == null) {
- throw new IllegalArgumentException("You must set the base
directory.");
- }
- String metadataDirectory = new File(baseDirectory,
- combined ? String.format("combined_%d_0", id) :
String.format("controller_%d", id)).getAbsolutePath();
- MetaPropertiesEnsemble.Copier copier =
- new
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
- copier.setMetaLogDir(Optional.of(metadataDirectory));
- copier.setLogDirProps(metadataDirectory, new
MetaProperties.Builder().
- setVersion(MetaPropertiesVersion.V1).
- setClusterId(clusterId.toString()).
- setNodeId(id).
- setDirectoryId(copier.generateValidDirectoryId()).
- build());
- return new ControllerNode(copier.copy(), combined,
propertyOverrides);
- }
- }
-
- private final MetaPropertiesEnsemble initialMetaPropertiesEnsemble;
-
- private final boolean combined;
-
- private final Map<String, String> propertyOverrides;
-
- private ControllerNode(
- MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
- boolean combined,
- Map<String, String> propertyOverrides
- ) {
- this.initialMetaPropertiesEnsemble =
Objects.requireNonNull(initialMetaPropertiesEnsemble);
- this.combined = combined;
- this.propertyOverrides = Objects.requireNonNull(propertyOverrides);
- }
-
- @Override
- public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
- return initialMetaPropertiesEnsemble;
- }
-
- @Override
- public boolean combined() {
- return combined;
- }
-
- public Map<String, String> propertyOverrides() {
- return propertyOverrides;
- }
-}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index e44f5ad5219..fa9eb64b0cc 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -162,8 +162,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
private KafkaConfig createNodeConfig(TestKitNode node) {
- BrokerNode brokerNode = nodes.brokerNodes().get(node.id());
- ControllerNode controllerNode =
nodes.controllerNodes().get(node.id());
+ TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
+ TestKitNode controllerNode =
nodes.controllerNodes().get(node.id());
Map<String, Object> props = new HashMap<>(configProps);
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
@@ -241,7 +241,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
baseDirectory = new File(nodes.baseDirectory());
executorService =
Executors.newFixedThreadPool(numOfExecutorThreads,
ThreadUtils.createThreadFactory("kafka-cluster-test-kit-executor-%d", false));
- for (ControllerNode node : nodes.controllerNodes().values()) {
+ for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory,
node.metadataDirectory(), Collections.emptyList());
SharedServer sharedServer = new SharedServer(
createNodeConfig(node),
@@ -273,7 +273,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
});
jointServers.put(node.id(), sharedServer);
}
- for (BrokerNode node : nodes.brokerNodes().values()) {
+ for (TestKitNode node : nodes.brokerNodes().values()) {
SharedServer sharedServer = jointServers.computeIfAbsent(
node.id(),
id -> new SharedServer(
@@ -390,7 +390,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
BrokerServer broker = entry.getValue();
futures.add(executorService.submit(() -> {
formatNode(broker.sharedServer().metaPropsEnsemble(),
- !nodes().brokerNodes().get(entry.getKey()).combined());
+
!nodes.isCombined(nodes().brokerNodes().get(entry.getKey()).id()));
}));
}
for (Future<?> future: futures) {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
b/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
index 1987c1d444a..f2b2cf2dee1 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java
@@ -96,13 +96,14 @@ public class KafkaClusterTestKitTest {
setCombined(combined).
setNumControllerNodes(3).build()).build()) {
- assertEquals(5, cluster.nodes().brokerNodes().size());
- assertEquals(3, cluster.nodes().controllerNodes().size());
+ TestKitNodes nodes = cluster.nodes();
+ assertEquals(5, nodes.brokerNodes().size());
+ assertEquals(3, nodes.controllerNodes().size());
- cluster.nodes().brokerNodes().forEach((brokerId, node) -> {
+ nodes.brokerNodes().forEach((brokerId, node) -> {
assertEquals(2, node.logDataDirectories().size());
Set<String> expected = new
HashSet<>(Arrays.asList(String.format("broker_%d_data0", brokerId),
String.format("broker_%d_data1", brokerId)));
- if (node.combined()) {
+ if (nodes.isCombined(node.id())) {
expected = new
HashSet<>(Arrays.asList(String.format("combined_%d_0", brokerId),
String.format("combined_%d_1", brokerId)));
}
assertEquals(
@@ -113,7 +114,7 @@ public class KafkaClusterTestKitTest {
);
});
- cluster.nodes().controllerNodes().forEach((controllerId, node) -> {
+ nodes.controllerNodes().forEach((controllerId, node) -> {
String expected = combined ? String.format("combined_%d_0",
controllerId) : String.format("controller_%d", controllerId);
assertEquals(expected,
Paths.get(node.metadataDirectory()).getFileName().toString());
});
diff --git a/core/src/test/java/kafka/testkit/TestKitNode.java
b/core/src/test/java/kafka/testkit/TestKitNode.java
index 17f73e82569..2293c91bd21 100644
--- a/core/src/test/java/kafka/testkit/TestKitNode.java
+++ b/core/src/test/java/kafka/testkit/TestKitNode.java
@@ -19,6 +19,9 @@ package kafka.testkit;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import java.util.Map;
+import java.util.Set;
+
public interface TestKitNode {
default int id() {
return initialMetaPropertiesEnsemble().nodeId().getAsInt();
@@ -28,7 +31,11 @@ public interface TestKitNode {
return initialMetaPropertiesEnsemble().metadataLogDir().get();
}
+ default Set<String> logDataDirectories() {
+ return initialMetaPropertiesEnsemble().logDirProps().keySet();
+ }
+
MetaPropertiesEnsemble initialMetaPropertiesEnsemble();
- boolean combined();
+ Map<String, String> propertyOverrides();
}
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java
b/core/src/test/java/kafka/testkit/TestKitNodes.java
index 1e12f25c08c..b716f664168 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -20,14 +20,20 @@ package kafka.testkit;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.properties.MetaProperties;
+import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
+import java.io.File;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -128,51 +134,47 @@ public class TestKitNodes {
.collect(Collectors.joining(", "))));
}
- TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
+ TreeMap<Integer, TestKitNode> controllerNodes = new TreeMap<>();
for (int id : controllerNodeIds) {
- ControllerNode controllerNode = ControllerNode.builder()
- .setId(id)
- .setBaseDirectory(baseDirectory)
- .setClusterId(clusterId)
- .setCombined(brokerNodeIds.contains(id))
- .setPropertyOverrides(perServerProperties.getOrDefault(id,
Collections.emptyMap()))
- .build();
+ TestKitNode controllerNode = TestKitNodes.buildControllerNode(
+ id,
+ baseDirectory,
+ clusterId,
+ brokerNodeIds.contains(id),
+ perServerProperties.getOrDefault(id,
Collections.emptyMap())
+ );
controllerNodes.put(id, controllerNode);
}
- TreeMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
+ TreeMap<Integer, TestKitNode> brokerNodes = new TreeMap<>();
for (int id : brokerNodeIds) {
- BrokerNode brokerNode = BrokerNode.builder()
- .setId(id)
- .setNumLogDirectories(numDisksPerBroker)
- .setBaseDirectory(baseDirectory)
- .setClusterId(clusterId)
- .setCombined(controllerNodeIds.contains(id))
- .setPropertyOverrides(perServerProperties.getOrDefault(id,
Collections.emptyMap()))
- .build();
+ TestKitNode brokerNode = TestKitNodes.buildBrokerNode(
+ id,
+ baseDirectory,
+ clusterId,
+ controllerNodeIds.contains(id),
+ perServerProperties.getOrDefault(id,
Collections.emptyMap()),
+ numDisksPerBroker
+ );
brokerNodes.put(id, brokerNode);
}
- return new TestKitNodes(baseDirectory,
- clusterId,
- bootstrapMetadata,
- controllerNodes,
- brokerNodes);
+ return new TestKitNodes(baseDirectory, clusterId,
bootstrapMetadata, controllerNodes, brokerNodes);
}
}
private final String baseDirectory;
private final String clusterId;
private final BootstrapMetadata bootstrapMetadata;
- private final SortedMap<Integer, ControllerNode> controllerNodes;
- private final SortedMap<Integer, BrokerNode> brokerNodes;
+ private final SortedMap<Integer, TestKitNode> controllerNodes;
+ private final SortedMap<Integer, TestKitNode> brokerNodes;
private TestKitNodes(
String baseDirectory,
String clusterId,
BootstrapMetadata bootstrapMetadata,
- SortedMap<Integer, ControllerNode> controllerNodes,
- SortedMap<Integer, BrokerNode> brokerNodes
+ SortedMap<Integer, TestKitNode> controllerNodes,
+ SortedMap<Integer, TestKitNode> brokerNodes
) {
this.baseDirectory = Objects.requireNonNull(baseDirectory);
this.clusterId = Objects.requireNonNull(clusterId);
@@ -193,7 +195,7 @@ public class TestKitNodes {
return clusterId;
}
- public SortedMap<Integer, ControllerNode> controllerNodes() {
+ public SortedMap<Integer, TestKitNode> controllerNodes() {
return controllerNodes;
}
@@ -201,7 +203,7 @@ public class TestKitNodes {
return bootstrapMetadata;
}
- public SortedMap<Integer, BrokerNode> brokerNodes() {
+ public SortedMap<Integer, TestKitNode> brokerNodes() {
return brokerNodes;
}
@@ -216,4 +218,90 @@ public class TestKitNodes {
public ListenerName controllerListenerName() {
return new ListenerName("CONTROLLER");
}
-}
+
+ private static TestKitNode buildBrokerNode(int id,
+ String baseDirectory,
+ String clusterId,
+ boolean combined,
+ Map<String, String>
propertyOverrides,
+ int numDisksPerBroker) {
+ List<String> logDataDirectories = IntStream
+ .range(0, numDisksPerBroker)
+ .mapToObj(i -> {
+ if (combined) {
+ return String.format("combined_%d_%d", id, i);
+ }
+ return String.format("broker_%d_data%d", id, i);
+ })
+ .map(logDir -> {
+ if (Paths.get(logDir).isAbsolute()) {
+ return logDir;
+ }
+ return new File(baseDirectory, logDir).getAbsolutePath();
+ })
+ .collect(Collectors.toList());
+ MetaPropertiesEnsemble.Copier copier = new
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
+
+ copier.setMetaLogDir(Optional.of(logDataDirectories.get(0)));
+ for (String logDir : logDataDirectories) {
+ copier.setLogDirProps(
+ logDir,
+ new MetaProperties.Builder()
+ .setVersion(MetaPropertiesVersion.V1)
+ .setClusterId(clusterId)
+ .setNodeId(id)
+ .setDirectoryId(copier.generateValidDirectoryId())
+ .build()
+ );
+ }
+
+ return new TestKitNode() {
+ private final MetaPropertiesEnsemble ensemble = copier.copy();
+
+ @Override
+ public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
+ return ensemble;
+ }
+
+ @Override
+ public Map<String, String> propertyOverrides() {
+ return Collections.unmodifiableMap(propertyOverrides);
+ }
+ };
+ }
+
+ private static TestKitNode buildControllerNode(int id,
+ String baseDirectory,
+ String clusterId,
+ boolean combined,
+ Map<String, String>
propertyOverrides) {
+ String metadataDirectory = new File(baseDirectory,
+ combined ? String.format("combined_%d_0", id) :
String.format("controller_%d", id)).getAbsolutePath();
+ MetaPropertiesEnsemble.Copier copier = new
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
+
+ copier.setMetaLogDir(Optional.of(metadataDirectory));
+ copier.setLogDirProps(
+ metadataDirectory,
+ new MetaProperties.Builder()
+ .setVersion(MetaPropertiesVersion.V1)
+ .setClusterId(clusterId)
+ .setNodeId(id)
+ .setDirectoryId(copier.generateValidDirectoryId())
+ .build()
+ );
+
+ return new TestKitNode() {
+ private final MetaPropertiesEnsemble ensemble = copier.copy();
+
+ @Override
+ public MetaPropertiesEnsemble initialMetaPropertiesEnsemble() {
+ return ensemble;
+ }
+
+ @Override
+ public Map<String, String> propertyOverrides() {
+ return Collections.unmodifiableMap(propertyOverrides);
+ }
+ };
+ }
+}
\ No newline at end of file