This is an automated email from the ASF dual-hosted git repository.
myskov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 45f9138734 HDDS-11394. Fix pipeline close --all command (#7138)
45f9138734 is described below
commit 45f9138734083733bf1152f9eadcc9b5f1a20dde
Author: Alexandr Juncevich <[email protected]>
AuthorDate: Fri Sep 20 09:23:52 2024 +0300
HDDS-11394. Fix pipeline close --all command (#7138)
---
.../scm/cli/pipeline/ClosePipelineSubcommand.java | 18 ++-
.../cli/pipeline/TestClosePipelinesSubCommand.java | 178 +++++++++++++++++++++
2 files changed, 192 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
index 7c70456995..e5392ef618 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java
@@ -59,12 +59,22 @@ public class ClosePipelineSubcommand extends ScmSubcommand {
List<Pipeline> pipelineList = new ArrayList<>();
Predicate<? super Pipeline> predicate = replicationFilter.orElse(null);
- for (Pipeline pipeline : scmClient.listPipelines()) {
- boolean filterPassed = (predicate != null) && predicate.test(pipeline);
- if (pipeline.getPipelineState() != Pipeline.PipelineState.CLOSED &&
filterPassed) {
- pipelineList.add(pipeline);
+ List<Pipeline> pipelines = scmClient.listPipelines();
+ if (predicate == null) {
+ for (Pipeline pipeline : pipelines) {
+ if (pipeline.getPipelineState() != Pipeline.PipelineState.CLOSED) {
+ pipelineList.add(pipeline);
+ }
+ }
+ } else {
+ for (Pipeline pipeline : pipelines) {
+ boolean filterPassed = predicate.test(pipeline);
+ if (pipeline.getPipelineState() != Pipeline.PipelineState.CLOSED &&
filterPassed) {
+ pipelineList.add(pipeline);
+ }
}
}
+
System.out.println("Sending close command for " + pipelineList.size() +
" pipelines...");
pipelineList.forEach(pipeline -> {
try {
diff --git
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java
new file mode 100644
index 0000000000..013350fe87
--- /dev/null
+++
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import picocli.CommandLine;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the ClosePipelineSubcommand class.
+ */
+class TestClosePipelinesSubCommand {
+
+ private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+ private final PrintStream originalOut = System.out;
+ private final PrintStream originalErr = System.err;
+ private ClosePipelineSubcommand cmd;
+ private ScmClient scmClient;
+
+ public static Stream<Arguments> values() {
+ return Stream.of(
+ arguments(
+ new String[]{"--all"},
+ "Sending close command for 2 pipelines...\n",
+ "with empty parameters"
+ ),
+ arguments(
+ new String[]{"--all", "-ffc", "THREE"},
+ "Sending close command for 1 pipelines...\n",
+ "by filter factor, opened"
+ ),
+ arguments(
+ new String[]{"--all", "-ffc", "ONE"},
+ "Sending close command for 0 pipelines...\n",
+ "by filter factor, closed"
+ ),
+ arguments(
+ new String[]{"--all", "-r", "rs-3-2-1024k", "-t", "EC"},
+ "Sending close command for 1 pipelines...\n",
+ "by replication and type, opened"
+ ),
+ arguments(
+ new String[]{"--all", "-r", "rs-6-3-1024k", "-t", "EC"},
+ "Sending close command for 0 pipelines...\n",
+ "by replication and type, closed"
+ ),
+ arguments(
+ new String[]{"--all", "-t", "EC"},
+ "Sending close command for 1 pipelines...\n",
+ "by type, opened"
+ ),
+ arguments(
+ new String[]{"--all", "-t", "RS"},
+ "Sending close command for 0 pipelines...\n",
+ "by type, closed"
+ )
+ );
+ }
+
+ @BeforeEach
+ public void setup() throws IOException {
+ cmd = new ClosePipelineSubcommand();
+ System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
+ System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
+
+ scmClient = mock(ScmClient.class);
+ when(scmClient.listPipelines()).thenAnswer(invocation ->
createPipelines());
+ }
+
+ @AfterEach
+ public void tearDown() {
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ }
+
+ @ParameterizedTest(name = "{index}. {2}")
+ @MethodSource("values")
+ void testCloseAllPipelines(String[] commands, String expectedOutput, String
testName) throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs(commands);
+ cmd.execute(scmClient);
+ assertEquals(expectedOutput, outContent.toString(DEFAULT_ENCODING));
+ }
+
+ private List<Pipeline> createPipelines() {
+ List<Pipeline> pipelines = new ArrayList<>();
+ pipelines.add(createPipeline(StandaloneReplicationConfig.getInstance(ONE),
+ Pipeline.PipelineState.CLOSED));
+ pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
+ Pipeline.PipelineState.OPEN));
+ pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
+ Pipeline.PipelineState.CLOSED));
+
+ pipelines.add(createPipeline(
+ new ECReplicationConfig(3, 2), Pipeline.PipelineState.OPEN));
+ pipelines.add(createPipeline(
+ new ECReplicationConfig(3, 2), Pipeline.PipelineState.CLOSED));
+ pipelines.add(createPipeline(
+ new ECReplicationConfig(6, 3), Pipeline.PipelineState.CLOSED));
+ pipelines.add(createPipeline(
+ RatisReplicationConfig.getInstance(THREE),
Pipeline.PipelineState.CLOSED));
+ return pipelines;
+ }
+
+ private Pipeline createPipeline(ReplicationConfig repConfig,
+ Pipeline.PipelineState state) {
+ return new Pipeline.Builder()
+ .setId(PipelineID.randomId())
+ .setCreateTimestamp(System.currentTimeMillis())
+ .setState(state)
+ .setReplicationConfig(repConfig)
+ .setNodes(createDatanodeDetails(1))
+ .build();
+ }
+
+ private List<DatanodeDetails> createDatanodeDetails(int count) {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ HddsProtos.DatanodeDetailsProto dnd =
+ HddsProtos.DatanodeDetailsProto.newBuilder()
+ .setHostName("host" + i)
+ .setIpAddress("1.2.3." + i + 1)
+ .setNetworkLocation("/default")
+ .setNetworkName("host" + i)
+ .addPorts(HddsProtos.Port.newBuilder()
+ .setName("ratis").setValue(5678).build())
+ .setUuid(UUID.randomUUID().toString())
+ .build();
+ dns.add(DatanodeDetails.getFromProtoBuf(dnd));
+ }
+ return dns;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]