Repository: hadoop Updated Branches: refs/heads/ozone-0.3 0674f11fc -> 346afb0a5
HDDS-695. Introduce new SCM Commands to list and close Pipelines. Contributed by Nanda kumar. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/346afb0a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/346afb0a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/346afb0a Branch: refs/heads/ozone-0.3 Commit: 346afb0a5ce681642e1ba0a27b19944afd00df54 Parents: 0674f11 Author: Nanda kumar <[email protected]> Authored: Tue Oct 23 18:41:30 2018 +0530 Committer: Nanda kumar <[email protected]> Committed: Tue Oct 23 18:41:30 2018 +0530 ---------------------------------------------------------------------- .../scm/client/ContainerOperationClient.java | 11 ++ .../hadoop/hdds/scm/client/ScmClient.java | 18 ++ .../scm/container/common/helpers/Pipeline.java | 14 +- .../StorageContainerLocationProtocol.java | 17 ++ ...rLocationProtocolClientSideTranslatorPB.java | 33 ++++ ...rLocationProtocolServerSideTranslatorPB.java | 37 ++++ .../StorageContainerLocationProtocol.proto | 25 +++ .../hdds/scm/pipelines/PipelineSelector.java | 39 +++- .../scm/server/SCMClientProtocolServer.java | 14 ++ .../scm/pipelines/TestPipelineSelector.java | 186 +++++++++++++++++++ .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java | 6 +- .../cli/pipeline/ClosePipelineSubcommand.java | 53 ++++++ .../cli/pipeline/ListPipelinesSubcommand.java | 48 +++++ .../hdds/scm/cli/pipeline/package-info.java | 22 +++ 14 files changed, 513 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index c2bfb42..1911e42 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -255,6 +255,17 @@ public class ContainerOperationClient implements ScmClient { } @Override + public List<Pipeline> listPipelines() throws IOException { + return storageContainerLocationClient.listPipelines(); + } + + @Override + public void closePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + storageContainerLocationClient.closePipeline(pipelineID); + } + + @Override public void close() { try { xceiverClientManager.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 184c547..2bd1119 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -171,4 +171,22 @@ public interface ScmClient extends Closeable { Pipeline createReplicationPipeline(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) throws IOException; + + /** + * Returns the list of active PipelineIDs. + * + * @return list of PipelineID + * + * @throws IOException in case of any exception + */ + List<Pipeline> listPipelines() throws IOException; + + /** + * Closes the pipeline given a pipeline ID. + * + * @param pipelineID PipelineID to close. + * + * @throws IOException In case of exception while closing the pipeline + */ + void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index c36ca1f..49c00a8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -284,18 +284,20 @@ public class Pipeline { public String toString() { final StringBuilder b = new StringBuilder(getClass().getSimpleName()) .append("["); - getDatanodes().keySet().forEach( - node -> b.append(node.endsWith(getLeaderID()) ? "*" + id : id)); - b.append(" id:").append(id); + b.append(" Id: ").append(id.getId()); + b.append(", Nodes: "); + getDatanodes().values().forEach(b::append); + if (getType() != null) { - b.append(" type:").append(getType().toString()); + b.append(", Type:").append(getType().toString()); } if (getFactor() != null) { - b.append(" factor:").append(getFactor().toString()); + b.append(", Factor:").append(getFactor().toString()); } if (getLifeCycleState() != null) { - b.append(" State:").append(getLifeCycleState().toString()); + b.append(", State:").append(getLifeCycleState().toString()); } + b.append("]"); return b.toString(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index e38077f..87a9245 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -127,6 +127,23 @@ public interface StorageContainerLocationProtocol { throws IOException; /** + * Returns the list of active PipelineIDs. + * + * @return list of PipelineID + * + * @throws IOException in case of any exception + */ + List<Pipeline> listPipelines() throws IOException; + + /** + * Closes a pipeline given the pipelineID. + * + * @param pipelineID ID of the pipeline to demolish + * @throws IOException + */ + void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + + /** * Returns information about SCM. * * @return {@link ScmInfo} http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 16819e9..bbb12b8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -20,6 +20,9 @@ import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto; @@ -64,6 +67,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * This class is the client-side translator to translate the requests made on @@ -305,6 +309,35 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } @Override + public List<Pipeline> listPipelines() throws IOException { + try { + ListPipelineRequestProto request = ListPipelineRequestProto + .newBuilder().build(); + ListPipelineResponseProto response = rpcProxy.listPipelines( + NULL_RPC_CONTROLLER, request); + return response.getPipelinesList().stream() + .map(Pipeline::getFromProtoBuf) + .collect(Collectors.toList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void closePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + try { + ClosePipelineRequestProto request = + ClosePipelineRequestProto.newBuilder() + .setPipelineID(pipelineID) + .build(); + rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override public ScmInfo getScmInfo() throws IOException { HddsProtos.GetScmInfoRequestProto request = HddsProtos.GetScmInfoRequestProto.getDefaultInstance(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index d2723f0..7376ebc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -44,6 +45,14 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.ContainerResponseProto; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ClosePipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ListPipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ListPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.GetContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.GetContainerResponseProto; @@ -212,6 +221,34 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB } @Override + public ListPipelineResponseProto listPipelines( + RpcController controller, ListPipelineRequestProto request) + throws ServiceException { + try { + ListPipelineResponseProto.Builder builder = ListPipelineResponseProto + .newBuilder(); + List<Pipeline> pipelineIDs = impl.listPipelines(); + pipelineIDs.stream().map(Pipeline::getProtobufMessage) + .forEach(builder::addPipelines); + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ClosePipelineResponseProto closePipeline( + RpcController controller, ClosePipelineRequestProto request) + throws ServiceException { + try { + impl.closePipeline(request.getPipelineID()); + return ClosePipelineResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override public HddsProtos.GetScmInfoRespsonseProto getScmInfo( RpcController controller, HddsProtos.GetScmInfoRequestProto req) throws ServiceException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index 49d1975..9396ccd 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -149,6 +149,19 @@ message PipelineResponseProto { optional string errorMessage = 3; } +message ListPipelineRequestProto { +} + +message ListPipelineResponseProto { + repeated Pipeline pipelines = 1; +} + +message ClosePipelineRequestProto { + required PipelineID pipelineID = 1; +} + +message ClosePipelineResponseProto { +} message InChillModeRequestProto { } @@ -219,6 +232,18 @@ service StorageContainerLocationProtocolService { returns (PipelineResponseProto); /** + * Returns the list of Pipelines managed by SCM. + */ + rpc listPipelines(ListPipelineRequestProto) + returns (ListPipelineResponseProto); + + /** + * Closes a pipeline. + */ + rpc closePipeline(ClosePipelineRequestProto) + returns (ClosePipelineResponseProto); + + /** * Returns information about SCM. */ rpc getScmInfo(GetScmInfoRequestProto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index c8d22ff..7a54047 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -54,6 +54,8 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.HashMap; @@ -167,9 +169,15 @@ public class PipelineSelector { public void removeContainerFromPipeline(PipelineID pipelineID, long containerID) throws IOException { - pipeline2ContainerMap.get(pipelineID) - .remove(ContainerID.valueof(containerID)); - closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID)); + if (pipeline2ContainerMap.containsKey(pipelineID)) { + pipeline2ContainerMap.get(pipelineID) + .remove(ContainerID.valueof(containerID)); + closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID)); + } else { + LOG.warn("Cannot remove container #{} from pipeline." + + " Pipeline #{} not found.", + containerID, pipelineID.getId()); + } } /** @@ -341,6 +349,31 @@ public class PipelineSelector { manager.closePipeline(pipeline); } + public List<Pipeline> listPipelines() { + return Collections.unmodifiableList(new ArrayList<>(pipelineMap.values())); + } + + /** + * Closes the given pipeline. + */ + public void closePipeline(PipelineID pipelineID) throws IOException { + final Pipeline pipeline = pipelineMap.get(pipelineID); + if (pipeline == null) { + // pipeline not found; + LOG.warn("Cannot close the pipeline. {} not found!", pipelineID); + return; + } + LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId()); + finalizePipeline(pipeline); + if (pipeline.getLifeCycleState() != LifeCycleState.CLOSED) { + pipelineManagerMap.get(pipeline.getType()).closePipeline(pipeline); + pipeline2ContainerMap.remove(pipeline.getId()); + nodeManager.removePipeline(pipeline); + pipelineMap.remove(pipeline.getId()); + } + pipelineStore.delete(pipelineID.getProtobuf().toByteArray()); + } + /** * Add to a given pipeline. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 3523499..3d228fa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; @@ -58,6 +59,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.UUID; import static org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos @@ -293,6 +295,18 @@ public class SCMClientProtocolServer implements } @Override + public List<Pipeline> listPipelines() { + return scm.getContainerManager().getPipelineSelector().listPipelines(); + } + + @Override + public void closePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + PipelineID id = PipelineID.valueOf(UUID.fromString(pipelineID.getId())); + scm.getContainerManager().getPipelineSelector().closePipeline(id); + } + + @Override public ScmInfo getScmInfo() throws IOException { ScmInfo.Builder builder = new ScmInfo.Builder() http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java new file mode 100644 index 0000000..aa9b382 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipelines/TestPipelineSelector.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipelines; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +/** + * Tests the functionality of PipelineSelector. + */ +public class TestPipelineSelector { + + @Test + public void testListPipelinesWithNoPipeline() throws IOException { + String storageDir = GenericTestUtils.getTempPath( + TestPipelineSelector.class.getName() + UUID.randomUUID()); + try { + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir); + PipelineSelector selector = new PipelineSelector( + Mockito.mock(NodeManager.class), conf, + Mockito.mock(EventPublisher.class), + ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + Assert.assertTrue(selector.listPipelines().isEmpty()); + } finally { + FileUtil.fullyDelete(new File(storageDir)); + } + } + + @Test + public void testListPipelines() throws IOException { + String storageDir = GenericTestUtils.getTempPath( + TestPipelineSelector.class.getName() + UUID.randomUUID()); + try { + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir); + PipelineSelector selector = new PipelineSelector( + Mockito.mock(NodeManager.class), conf, + Mockito.mock(EventPublisher.class), + ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + getRandomPipeline(selector); + getRandomPipeline(selector); + getRandomPipeline(selector); + getRandomPipeline(selector); + getRandomPipeline(selector); + Assert.assertEquals(5, selector.listPipelines().size()); + } finally { + FileUtil.fullyDelete(new File(storageDir)); + } + } + + @Test + public void testCloseEmptyPipeline() throws IOException { + String storageDir = GenericTestUtils.getTempPath( + TestPipelineSelector.class.getName() + UUID.randomUUID()); + try { + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir); + PipelineSelector selector = new PipelineSelector( + Mockito.mock(NodeManager.class), conf, + Mockito.mock(EventPublisher.class), + ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + + // Create and add pipeline to selector. + Pipeline pipelineOne = getRandomPipeline(selector); + Pipeline pipelineTwo = getRandomPipeline(selector); + + Assert.assertNotNull(selector.getPipeline(pipelineOne.getId())); + Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId())); + + selector.closePipeline(pipelineOne.getId()); + + Assert.assertNull(selector.getPipeline(pipelineOne.getId())); + Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId())); + } finally { + FileUtil.fullyDelete(new File(storageDir)); + } + } + + @Test + public void testClosePipelineWithContainer() throws IOException { + String storageDir = GenericTestUtils.getTempPath( + TestPipelineSelector.class.getName() + UUID.randomUUID()); + try { + Configuration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir); + PipelineSelector selector = new PipelineSelector( + Mockito.mock(NodeManager.class), conf, + Mockito.mock(EventPublisher.class), + ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + + // Create and add pipeline to selector. + Pipeline pipelineOne = getRandomPipeline(selector); + Pipeline pipelineTwo = getRandomPipeline(selector); + + selector.addContainerToPipeline(pipelineOne.getId(), 1L); + selector.addContainerToPipeline(pipelineOne.getId(), 2L); + selector.addContainerToPipeline(pipelineOne.getId(), 3L); + selector.addContainerToPipeline(pipelineOne.getId(), 4L); + selector.addContainerToPipeline(pipelineOne.getId(), 5L); + + Assert.assertNotNull(selector.getPipeline(pipelineOne.getId())); + Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId())); + + Assert.assertEquals(5, + selector.getOpenContainerIDsByPipeline(pipelineOne.getId()).size()); + + selector.closePipeline(pipelineOne.getId()); + + Assert.assertNull(selector.getPipeline(pipelineOne.getId())); + Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId())); + } finally { + FileUtil.fullyDelete(new File(storageDir)); + } + } + + /** + * Creates a random pipeline and registers with PipelineSelector. + * + * @param selector PipelineSelector + * @return Pipeline + * @throws IOException + */ + private Pipeline getRandomPipeline(PipelineSelector selector) + throws IOException{ + DatanodeDetails ddOne = TestUtils.randomDatanodeDetails(); + DatanodeDetails ddTwo = TestUtils.randomDatanodeDetails(); + DatanodeDetails ddThree = TestUtils.randomDatanodeDetails(); + Pipeline pipeline = new Pipeline(ddOne.getUuidString(), + LifeCycleState.ALLOCATED, ReplicationType.RATIS, + ReplicationFactor.THREE, PipelineID.randomId()); + pipeline.addMember(ddOne); + pipeline.addMember(ddTwo); + pipeline.addMember(ddThree); + selector.updatePipelineState(pipeline, LifeCycleEvent.CREATE); + selector.updatePipelineState(pipeline, LifeCycleEvent.CREATED); + PipelineReport reportOne = PipelineReport.newBuilder() + .setPipelineID(pipeline.getId().getProtobuf()).build(); + PipelineReportsProto reportsOne = PipelineReportsProto.newBuilder() + .addPipelineReport(reportOne).build(); + pipeline.getDatanodes().values().forEach( + dd -> selector.processPipelineReport(dd, reportsOne)); + return pipeline; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java index 59cd0ba..24eab9e 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand; import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand; import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand; import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand; +import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand; +import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand; import org.apache.hadoop.hdds.scm.client.ContainerOperationClient; import org.apache.hadoop.hdds.scm.client.ScmClient; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; @@ -77,7 +79,9 @@ import picocli.CommandLine.Option; InfoSubcommand.class, DeleteSubcommand.class, CreateSubcommand.class, - CloseSubcommand.class + CloseSubcommand.class, + ListPipelinesSubcommand.class, + ClosePipelineSubcommand.class }, mixinStandardHelpOptions = true) public class SCMCLI extends GenericCli { http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java new file mode 100644 index 0000000..d99823b --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.pipeline; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.cli.SCMCLI; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +import java.util.concurrent.Callable; + +/** + * Handler of closePipeline command. + */ [email protected]( + name = "closePipeline", + description = "Close pipeline", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ClosePipelineSubcommand implements Callable<Void> { + + @CommandLine.ParentCommand + private SCMCLI parent; + + @CommandLine.Parameters(description = "ID of the pipeline to close") + private String pipelineId; + + @Override + public Void call() throws Exception { + try (ScmClient scmClient = parent.createScmClient()) { + scmClient.closePipeline( + HddsProtos.PipelineID.newBuilder().setId(pipelineId).build()); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java new file mode 100644 index 0000000..0f8cf28 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.pipeline; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.cli.SCMCLI; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +import java.util.concurrent.Callable; + +/** + * Handler of listPipelines command. + */ [email protected]( + name = "listPipelines", + description = "List all active pipelines", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ListPipelinesSubcommand implements Callable<Void> { + + @CommandLine.ParentCommand + private SCMCLI parent; + + @Override + public Void call() throws Exception { + try (ScmClient scmClient = parent.createScmClient()) { + scmClient.listPipelines().forEach(System.out::println); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/346afb0a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java new file mode 100644 index 0000000..64924d1 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains all of the pipeline related scm commands. + */ +package org.apache.hadoop.hdds.scm.cli.pipeline; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
