HDDS-576. Move ContainerWithPipeline creation to RPC endpoint. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/18fe65d7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/18fe65d7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/18fe65d7 Branch: refs/heads/HDFS-13891 Commit: 18fe65d7560b0bd61e3cf3ffbbaf98e87d82120f Parents: 42f3a70 Author: Nanda kumar <na...@apache.org> Authored: Mon Nov 12 23:32:31 2018 +0530 Committer: Nanda kumar <na...@apache.org> Committed: Mon Nov 12 23:33:11 2018 +0530 ---------------------------------------------------------------------- .../hadoop/hdds/scm/container/ContainerID.java | 2 +- hadoop-hdds/common/src/main/proto/hdds.proto | 24 ++-- .../CloseContainerCommandHandler.java | 2 +- .../protocol/commands/CommandForDatanode.java | 1 + .../hadoop/hdds/scm/block/BlockManagerImpl.java | 77 ++++++------ .../block/DatanodeDeletedBlockTransactions.java | 40 +++---- .../hdds/scm/block/DeletedBlockLogImpl.java | 24 ++-- .../container/CloseContainerEventHandler.java | 116 +++++++++---------- .../hdds/scm/container/ContainerManager.java | 20 +--- .../scm/container/ContainerStateManager.java | 4 +- .../hdds/scm/container/SCMContainerManager.java | 63 +--------- .../scm/server/SCMClientProtocolServer.java | 32 ++++- .../scm/server/StorageContainerManager.java | 6 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 2 +- .../hadoop/hdds/scm/block/TestBlockManager.java | 2 +- .../hdds/scm/block/TestDeletedBlockLog.java | 55 +++++---- .../TestCloseContainerEventHandler.java | 35 +++--- .../container/TestContainerReportHandler.java | 6 +- .../scm/container/TestSCMContainerManager.java | 65 +++++------ .../hdds/scm/node/TestContainerPlacement.java | 7 +- .../hdds/scm/pipeline/TestNode2PipelineMap.java | 6 +- .../hdds/scm/pipeline/TestNodeFailure.java | 37 +++--- .../hdds/scm/pipeline/TestPipelineClose.java | 10 +- .../hdds/scm/pipeline/TestSCMRestart.java | 19 +-- .../org/apache/hadoop/ozone/OzoneTestUtils.java | 5 +- .../ozone/client/rest/TestOzoneRestClient.java | 7 +- .../rpc/TestCloseContainerHandlingByClient.java | 31 +++-- .../ozone/client/rpc/TestOzoneRpcClient.java | 21 ++-- .../TestCloseContainerByPipeline.java | 19 +-- .../TestCloseContainerHandler.java | 19 +-- .../freon/TestFreonWithDatanodeFastRestart.java | 8 ++ .../freon/TestFreonWithDatanodeRestart.java | 5 + .../hadoop/ozone/scm/TestContainerSQLCli.java | 2 +- 33 files changed, 381 insertions(+), 391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java index e7ac350..bb44da4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java @@ -102,6 +102,6 @@ public final class ContainerID implements Comparable<ContainerID> { @Override public String toString() { - return "id=" + id; + return "#" + id; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/common/src/main/proto/hdds.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index a0c6f16..c37683a 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -111,24 +111,18 @@ message NodePool { */ enum LifeCycleState { - ALLOCATED = 1; - CREATING = 2; // Used for container allocated/created by different client. - OPEN =3; // Mostly an update to SCM via HB or client call. - CLOSING = 4; - CLOSED = 5; // !!State after this has not been used yet. - DELETING = 6; - DELETED = 7; // object is deleted. + OPEN = 1; + CLOSING = 2; + CLOSED = 3; + DELETING = 4; + DELETED = 5; // object is deleted. } enum LifeCycleEvent { - CREATE = 1; // A request to client to create this object - CREATED = 2; - FINALIZE = 3; - CLOSE = 4; // !!Event after this has not been used yet. - UPDATE = 5; - TIMEOUT = 6; // creation has timed out from SCM's View. - DELETE = 7; - CLEANUP = 8; + FINALIZE = 1; + CLOSE = 2; // !!Event after this has not been used yet. + DELETE = 3; + CLEANUP = 4; } message ContainerInfoProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 22488d9..a7d855b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -84,7 +84,7 @@ public class CloseContainerCommandHandler implements CommandHandler { cmdExecuted = false; return; } - if (container.getContainerData().isClosed()) { + if (!container.getContainerData().isClosed()) { LOG.debug("Closing container {}.", containerID); HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID(); HddsProtos.ReplicationType replicationType = http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java index 69337fb..66bf623 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java @@ -32,6 +32,7 @@ public class CommandForDatanode<T extends GeneratedMessage> implements private final SCMCommand<T> command; + // TODO: Command for datanode should take DatanodeDetails as parameter. public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) { this.datanodeId = datanodeId; this.command = command; http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index abbe9f1..6f96f4b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -26,13 +26,15 @@ import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.metrics2.util.MBeans; @@ -70,6 +72,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, // Currently only user of the block service is Ozone, CBlock manages blocks // by itself and does not rely on the Block service offered by SCM. + private final PipelineManager pipelineManager; private final ContainerManager containerManager; private final long containerSize; @@ -87,14 +90,16 @@ public class BlockManagerImpl implements EventHandler<Boolean>, * * @param conf - configuration. * @param nodeManager - node manager. + * @param pipelineManager - pipeline manager. * @param containerManager - container manager. * @param eventPublisher - event publisher. * @throws IOException */ public BlockManagerImpl(final Configuration conf, - final NodeManager nodeManager, final ContainerManager containerManager, - EventPublisher eventPublisher) + final NodeManager nodeManager, final PipelineManager pipelineManager, + final ContainerManager containerManager, EventPublisher eventPublisher) throws IOException { + this.pipelineManager = pipelineManager; this.containerManager = containerManager; this.containerSize = (long)conf.getStorageSize( @@ -155,16 +160,15 @@ public class BlockManagerImpl implements EventHandler<Boolean>, * @throws IOException */ private synchronized void preAllocateContainers(int count, - ReplicationType type, ReplicationFactor factor, String owner) - throws IOException { + ReplicationType type, ReplicationFactor factor, String owner) { for (int i = 0; i < count; i++) { - ContainerWithPipeline containerWithPipeline; + ContainerInfo containerInfo; try { // TODO: Fix this later when Ratis is made the Default. - containerWithPipeline = containerManager.allocateContainer( + containerInfo = containerManager.allocateContainer( type, factor, owner); - if (containerWithPipeline == null) { + if (containerInfo == null) { LOG.warn("Unable to allocate container."); } } catch (IOException ex) { @@ -206,11 +210,11 @@ public class BlockManagerImpl implements EventHandler<Boolean>, use different kind of policies. */ - ContainerWithPipeline containerWithPipeline; + ContainerInfo containerInfo; // look for OPEN containers that match the criteria. - containerWithPipeline = containerManager - .getMatchingContainerWithPipeline(size, owner, type, factor, + containerInfo = containerManager + .getMatchingContainer(size, owner, type, factor, HddsProtos.LifeCycleState.OPEN); // We did not find OPEN Containers. This generally means @@ -221,27 +225,27 @@ public class BlockManagerImpl implements EventHandler<Boolean>, // Even though we have already checked the containers in OPEN // state, we have to check again as we only hold a read lock. // Some other thread might have pre-allocated container in meantime. - if (containerWithPipeline == null) { + if (containerInfo == null) { synchronized (this) { if (!containerManager.getContainers(HddsProtos.LifeCycleState.OPEN) .isEmpty()) { - containerWithPipeline = containerManager - .getMatchingContainerWithPipeline(size, owner, type, factor, + containerInfo = containerManager + .getMatchingContainer(size, owner, type, factor, HddsProtos.LifeCycleState.OPEN); } - if (containerWithPipeline == null) { + if (containerInfo == null) { preAllocateContainers(containerProvisionBatchSize, type, factor, owner); - containerWithPipeline = containerManager - .getMatchingContainerWithPipeline(size, owner, type, factor, + containerInfo = containerManager + .getMatchingContainer(size, owner, type, factor, HddsProtos.LifeCycleState.OPEN); } } } - if (containerWithPipeline != null) { - return newBlock(containerWithPipeline, HddsProtos.LifeCycleState.OPEN); + if (containerInfo != null) { + return newBlock(containerInfo); } // we have tried all strategies we know and but somehow we are not able @@ -255,29 +259,26 @@ public class BlockManagerImpl implements EventHandler<Boolean>, /** * newBlock - returns a new block assigned to a container. * - * @param containerWithPipeline - Container Info. - * @param state - Current state of the container. + * @param containerInfo - Container Info. * @return AllocatedBlock */ - private AllocatedBlock newBlock(ContainerWithPipeline containerWithPipeline, - HddsProtos.LifeCycleState state) throws IOException { - ContainerInfo containerInfo = containerWithPipeline.getContainerInfo(); - if (containerWithPipeline.getPipeline().getNodes().size() == 0) { - LOG.error("Pipeline Machine count is zero."); + private AllocatedBlock newBlock(ContainerInfo containerInfo) { + try { + final Pipeline pipeline = pipelineManager + .getPipeline(containerInfo.getPipelineID()); + // TODO : Revisit this local ID allocation when HA is added. + long localID = UniqueId.next(); + long containerID = containerInfo.getContainerID(); + AllocatedBlock.Builder abb = new AllocatedBlock.Builder() + .setContainerBlockID(new ContainerBlockID(containerID, localID)) + .setPipeline(pipeline); + LOG.trace("New block allocated : {} Container ID: {}", localID, + containerID); + return abb.build(); + } catch (PipelineNotFoundException ex) { + LOG.error("Pipeline Machine count is zero.", ex); return null; } - - // TODO : Revisit this local ID allocation when HA is added. - long localID = UniqueId.next(); - long containerID = containerInfo.getContainerID(); - - AllocatedBlock.Builder abb = - new AllocatedBlock.Builder() - .setContainerBlockID(new ContainerBlockID(containerID, localID)) - .setPipeline(containerWithPipeline.getPipeline()); - LOG.trace("New block allocated : {} Container ID: {}", localID, - containerID); - return abb.build(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index 70e9b5d..ce65a70 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hdds.scm.block; import com.google.common.collect.ArrayListMultimap; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -30,8 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; /** * A wrapper class to hold info about datanode and all deleted block @@ -58,31 +57,28 @@ public class DatanodeDeletedBlockTransactions { public boolean addTransaction(DeletedBlocksTransaction tx, Set<UUID> dnsWithTransactionCommitted) { - Pipeline pipeline = null; try { - ContainerWithPipeline containerWithPipeline = - containerManager.getContainerWithPipeline( - ContainerID.valueof(tx.getContainerID())); - if (containerWithPipeline.getContainerInfo().isOpen() - || containerWithPipeline.getPipeline().isEmpty()) { - return false; + boolean success = false; + final ContainerID id = ContainerID.valueof(tx.getContainerID()); + final ContainerInfo container = containerManager.getContainer(id); + final Set<ContainerReplica> replicas = containerManager + .getContainerReplicas(id); + if (!container.isOpen()) { + for (ContainerReplica replica : replicas) { + UUID dnID = replica.getDatanodeDetails().getUuid(); + if (dnsWithTransactionCommitted == null || + !dnsWithTransactionCommitted.contains(dnID)) { + // Transaction need not be sent to dns which have + // already committed it + success = addTransactionToDN(dnID, tx); + } + } } - pipeline = containerWithPipeline.getPipeline(); + return success; } catch (IOException e) { SCMBlockDeletingService.LOG.warn("Got container info error.", e); return false; } - - boolean success = false; - for (DatanodeDetails dd : pipeline.getNodes()) { - UUID dnID = dd.getUuid(); - if (dnsWithTransactionCommitted == null || - !dnsWithTransactionCommitted.contains(dnID)) { - // Transaction need not be sent to dns which have already committed it - success = addTransactionToDN(dnID, tx); - } - } - return success; } private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index a5ee130..766d428 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -30,8 +30,9 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.scm.command .CommandStatusReportHandler.DeleteBlockStatus; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -49,7 +50,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -101,7 +101,7 @@ public class DeletedBlockLogImpl private Map<Long, Set<UUID>> transactionToDNsCommitMap; public DeletedBlockLogImpl(Configuration conf, - ContainerManager containerManager) throws IOException { + ContainerManager containerManager) throws IOException { maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); @@ -249,7 +249,8 @@ public class DeletedBlockLogImpl long txID = transactionResult.getTxID(); // set of dns which have successfully committed transaction txId. dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); - Long containerId = transactionResult.getContainerID(); + final ContainerID containerId = ContainerID.valueof( + transactionResult.getContainerID()); if (dnsWithCommittedTxn == null) { LOG.warn("Transaction txId={} commit by dnId={} for containerID={} " + "failed. Corresponding entry not found.", txID, dnID, @@ -258,16 +259,17 @@ public class DeletedBlockLogImpl } dnsWithCommittedTxn.add(dnID); - Pipeline pipeline = - containerManager.getContainerWithPipeline( - ContainerID.valueof(containerId)).getPipeline(); - Collection<DatanodeDetails> containerDnsDetails = pipeline.getNodes(); + final ContainerInfo container = containerManager + .getContainer(containerId); + final Set<ContainerReplica> replicas = + containerManager.getContainerReplicas(containerId); // The delete entry can be safely removed from the log if all the // corresponding nodes commit the txn. It is required to check that // the nodes returned in the pipeline match the replication factor. - if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size()) - >= pipeline.getFactor().getNumber()) { - List<UUID> containerDns = containerDnsDetails.stream() + if (min(replicas.size(), dnsWithCommittedTxn.size()) + >= container.getReplicationFactor().getNumber()) { + List<UUID> containerDns = replicas.stream() + .map(ContainerReplica::getDatanodeDetails) .map(DatanodeDetails::getUuid) .collect(Collectors.toList()); if (dnsWithCommittedTxn.containsAll(containerDns)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 9b41455..719d763 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -17,9 +17,14 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; @@ -42,78 +47,67 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { public static final Logger LOG = LoggerFactory.getLogger(CloseContainerEventHandler.class); - + private final PipelineManager pipelineManager; private final ContainerManager containerManager; - public CloseContainerEventHandler(ContainerManager containerManager) { + public CloseContainerEventHandler(final PipelineManager pipelineManager, + final ContainerManager containerManager) { + this.pipelineManager = pipelineManager; this.containerManager = containerManager; } @Override public void onMessage(ContainerID containerID, EventPublisher publisher) { - - LOG.info("Close container Event triggered for container : {}", - containerID.getId()); - ContainerWithPipeline containerWithPipeline; - ContainerInfo info; + LOG.info("Close container Event triggered for container : {}", containerID); try { - containerWithPipeline = - containerManager.getContainerWithPipeline(containerID); - info = containerWithPipeline.getContainerInfo(); - if (info == null) { - LOG.error("Failed to update the container state. Container with id : {}" - + " does not exist", containerID.getId()); - return; + // If the container is in OPEN state, FINALIZE it. + if (containerManager.getContainer(containerID).getState() + == LifeCycleState.OPEN) { + containerManager.updateContainerState( + containerID, LifeCycleEvent.FINALIZE); } - } catch (IOException e) { - LOG.error("Failed to update the container state. Container with id : {} " - + "does not exist", containerID.getId(), e); - return; - } - HddsProtos.LifeCycleState state = info.getState(); - try { - switch (state) { - case OPEN: - containerManager.updateContainerState(containerID, - HddsProtos.LifeCycleEvent.FINALIZE); - fireCloseContainerEvents(containerWithPipeline, info, publisher); - break; - case CLOSING: - fireCloseContainerEvents(containerWithPipeline, info, publisher); - break; - case CLOSED: - case DELETING: - case DELETED: - LOG.info("Cannot close container #{}, it is already in {} state.", - containerID.getId(), state); - break; - default: - throw new IOException("Invalid container state for container #" - + containerID); + // ContainerInfo has to read again after the above state change. + final ContainerInfo container = containerManager + .getContainer(containerID); + // Send close command to datanodes, if the container is in CLOSING state + if (container.getState() == LifeCycleState.CLOSING) { + + final CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(containerID.getId(), + container.getReplicationType(), container.getPipelineID()); + + getNodes(container).forEach(node -> publisher.fireEvent( + DATANODE_COMMAND, + new CommandForDatanode<>(node.getUuid(), closeContainerCommand))); + } else { + LOG.warn("Cannot close container {}, which is in {} state.", + containerID, container.getState()); } + } catch (IOException ex) { - LOG.error("Failed to update the container state for container #{}" - + containerID, ex); + LOG.error("Failed to close the container {}.", containerID, ex); } } - private void fireCloseContainerEvents( - ContainerWithPipeline containerWithPipeline, ContainerInfo info, - EventPublisher publisher) { - ContainerID containerID = info.containerID(); - // fire events. - CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(containerID.getId(), - info.getReplicationType(), info.getPipelineID()); - - Pipeline pipeline = containerWithPipeline.getPipeline(); - pipeline.getNodes().stream() - .map(node -> - new CommandForDatanode<>(node.getUuid(), closeContainerCommand)) - .forEach(command -> publisher.fireEvent(DATANODE_COMMAND, command)); - - LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand, - pipeline, containerID); + /** + * Returns the list of Datanodes where this container lives. + * + * @param container ContainerInfo + * @return list of DatanodeDetails + * @throws ContainerNotFoundException + */ + private List<DatanodeDetails> getNodes(final ContainerInfo container) + throws ContainerNotFoundException { + try { + return pipelineManager.getPipeline(container.getPipelineID()).getNodes(); + } catch (PipelineNotFoundException ex) { + // Use container replica if the pipeline is not available. + return containerManager.getContainerReplicas(container.containerID()) + .stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + } } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 0906ca8..2afd645 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -19,9 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import java.io.Closeable; import java.io.IOException; @@ -63,16 +61,6 @@ public interface ContainerManager extends Closeable { throws ContainerNotFoundException; /** - * Returns the ContainerInfo from the container ID. - * - * @param containerID - ID of container. - * @return - ContainerWithPipeline such as creation state and the pipeline. - * @throws IOException - */ - ContainerWithPipeline getContainerWithPipeline(ContainerID containerID) - throws ContainerNotFoundException, PipelineNotFoundException; - - /** * Returns containers under certain conditions. * Search container IDs from start ID(exclusive), * The max size of the searching range cannot exceed the @@ -94,10 +82,10 @@ public interface ContainerManager extends Closeable { * * @param replicationFactor - replication factor of the container. * @param owner - * @return - ContainerWithPipeline. + * @return - ContainerInfo. * @throws IOException */ - ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType type, + ContainerInfo allocateContainer(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor replicationFactor, String owner) throws IOException; @@ -158,10 +146,10 @@ public interface ContainerManager extends Closeable { throws IOException; /** - * Returns the ContainerWithPipeline. + * Returns the ContainerInfo. * @return NodeManager */ - ContainerWithPipeline getMatchingContainerWithPipeline(long size, + ContainerInfo getMatchingContainer(long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 1d71d4e..f4bb082 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -227,7 +227,9 @@ public class ContainerStateManager { final List<Pipeline> pipelines = pipelineManager .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN); if (pipelines.isEmpty()) { - throw new IOException("Could not allocate container"); + throw new IOException("Could not allocate container. Cannot get any" + + " matching pipeline for Type:" + type + + ", Factor:" + replicationFactor + ", State:PipelineState.OPEN"); } pipeline = pipelines.get((int) containerCount.get() % pipelines.size()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 9d0ce7a..6c7031d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -21,15 +21,11 @@ import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -185,42 +181,6 @@ public class SCMContainerManager implements ContainerManager { } /** - * Returns the ContainerInfo and pipeline from the containerID. If container - * has no available replicas in datanodes it returns pipeline with no - * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for - * an empty pipeline. - * - * @param containerID - ID of container. - * @return - ContainerWithPipeline such as creation state and the pipeline. - * @throws IOException - */ - @Override - public ContainerWithPipeline getContainerWithPipeline(ContainerID containerID) - throws ContainerNotFoundException, PipelineNotFoundException { - lock.lock(); - try { - final ContainerInfo contInfo = getContainer(containerID); - Pipeline pipeline; - if (contInfo.isOpen()) { - // If pipeline with given pipeline Id already exist return it - pipeline = pipelineManager.getPipeline(contInfo.getPipelineID()); - } else { - // For close containers create pipeline from datanodes with replicas - Set<ContainerReplica> dnWithReplicas = containerStateManager - .getContainerReplicas(contInfo.containerID()); - List<DatanodeDetails> dns = - dnWithReplicas.stream().map(ContainerReplica::getDatanodeDetails) - .collect(Collectors.toList()); - pipeline = pipelineManager.createPipeline(ReplicationType.STAND_ALONE, - contInfo.getReplicationFactor(), dns); - } - return new ContainerWithPipeline(contInfo, pipeline); - } finally { - lock.unlock(); - } - } - - /** * {@inheritDoc} */ @Override @@ -261,16 +221,13 @@ public class SCMContainerManager implements ContainerManager { * @throws IOException - Exception */ @Override - public ContainerWithPipeline allocateContainer(final ReplicationType type, + public ContainerInfo allocateContainer(final ReplicationType type, final ReplicationFactor replicationFactor, final String owner) throws IOException { lock.lock(); try { final ContainerInfo containerInfo; containerInfo = containerStateManager .allocateContainer(pipelineManager, type, replicationFactor, owner); - final Pipeline pipeline = pipelineManager.getPipeline( - containerInfo.getPipelineID()); - try { final byte[] containerIDBytes = Longs.toByteArray( containerInfo.getContainerID()); @@ -286,7 +243,7 @@ public class SCMContainerManager implements ContainerManager { } throw ex; } - return new ContainerWithPipeline(containerInfo, pipeline); + return containerInfo; } finally { lock.unlock(); } @@ -366,12 +323,8 @@ public class SCMContainerManager implements ContainerManager { break; case CLOSE: break; - case UPDATE: - break; case DELETE: break; - case TIMEOUT: - break; case CLEANUP: break; default: @@ -434,17 +387,11 @@ public class SCMContainerManager implements ContainerManager { * @param state - State of the Container-- {Open, Allocated etc.} * @return ContainerInfo, null if there is no match found. */ - public ContainerWithPipeline getMatchingContainerWithPipeline( + public ContainerInfo getMatchingContainer( final long sizeRequired, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException { - ContainerInfo containerInfo = containerStateManager - .getMatchingContainer(sizeRequired, owner, type, factor, state); - if (containerInfo == null) { - return null; - } - Pipeline pipeline = pipelineManager - .getPipeline(containerInfo.getPipelineID()); - return new ContainerWithPipeline(containerInfo, pipeline); + return containerStateManager.getMatchingContainer( + sizeRequired, owner, type, factor, state); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 0c9b865..f141ae5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils; import org.apache.hadoop.hdds.scm.chillmode.ChillModePrecheck; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -61,6 +62,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; import static org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos @@ -161,11 +163,13 @@ public class SCMClientProtocolServer implements replicationType, HddsProtos.ReplicationFactor factor, String owner) throws IOException { ScmUtils.preCheck(ScmOps.allocateContainer, chillModePrecheck); - String remoteUser = getRpcRemoteUsername(); - getScm().checkAdminAccess(remoteUser); + getScm().checkAdminAccess(getRpcRemoteUsername()); - return scm.getContainerManager() + final ContainerInfo container = scm.getContainerManager() .allocateContainer(replicationType, factor, owner); + final Pipeline pipeline = scm.getPipelineManager() + .getPipeline(container.getPipelineID()); + return new ContainerWithPipeline(container, pipeline); } @Override @@ -191,8 +195,26 @@ public class SCMClientProtocolServer implements } } getScm().checkAdminAccess(null); - return scm.getContainerManager() - .getContainerWithPipeline(ContainerID.valueof(containerID)); + + final ContainerID id = ContainerID.valueof(containerID); + final ContainerInfo container = scm.getContainerManager().getContainer(id); + final Pipeline pipeline; + + if (container.isOpen()) { + // Ratis pipeline + pipeline = scm.getPipelineManager() + .getPipeline(container.getPipelineID()); + } else { + pipeline = scm.getPipelineManager().createPipeline( + HddsProtos.ReplicationType.STAND_ALONE, + container.getReplicationFactor(), + scm.getContainerManager() + .getContainerReplicas(id).stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList())); + } + + return new ContainerWithPipeline(container, pipeline); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 5f419d3..08ee382 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -207,13 +207,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue); containerManager = new SCMContainerManager( conf, scmNodeManager, pipelineManager, eventQueue); - scmBlockManager = new BlockManagerImpl( - conf, scmNodeManager, containerManager, eventQueue); + scmBlockManager = new BlockManagerImpl(conf, scmNodeManager, + pipelineManager, containerManager, eventQueue); replicationStatus = new ReplicationActivityStatus(); CloseContainerEventHandler closeContainerHandler = - new CloseContainerEventHandler(containerManager); + new CloseContainerEventHandler(pipelineManager, containerManager); NodeReportHandler nodeReportHandler = new NodeReportHandler(scmNodeManager); PipelineReportHandler pipelineReportHandler = http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 66ae682..8fbe7fb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -413,7 +413,7 @@ public final class TestUtils { throws IOException { return containerManager .allocateContainer(HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo(); + HddsProtos.ReplicationFactor.THREE, "root"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index aa940df..ce7fa2f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -91,7 +91,7 @@ public class TestBlockManager implements EventHandler<Boolean> { mapping = new SCMContainerManager(conf, nodeManager, pipelineManager, eventQueue); blockManager = new BlockManagerImpl(conf, - nodeManager, mapping, eventQueue); + nodeManager, pipelineManager, mapping, eventQueue); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager); eventQueue.addHandler(SCMEvents.START_REPLICATION, this); if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 268cf8b..48949be 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hdds.scm.block; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.SCMContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdfs.DFSUtil; @@ -51,11 +54,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -99,21 +104,23 @@ public class TestDeletedBlockLog { DatanodeDetails.newBuilder().setUuid(UUID.randomUUID().toString()) .build()); - ContainerInfo containerInfo = - new ContainerInfo.Builder().setContainerID(1).build(); - Pipeline pipeline = Pipeline.newBuilder() - .setType(ReplicationType.RATIS) - .setFactor(ReplicationFactor.THREE) - .setState(Pipeline.PipelineState.CLOSED) - .setId(PipelineID.randomId()) - .setNodes(dnList) - .build(); - ContainerWithPipeline containerWithPipeline = - new ContainerWithPipeline(containerInfo, pipeline); - when(containerManager.getContainerWithPipeline(anyObject())) - .thenReturn(containerWithPipeline); + final ContainerInfo container = + new ContainerInfo.Builder().setContainerID(1) + .setReplicationFactor(ReplicationFactor.THREE) + .setState(HddsProtos.LifeCycleState.CLOSED) + .build(); + final Set<ContainerReplica> replicaSet = dnList.stream() + .map(datanodeDetails -> ContainerReplica.newBuilder() + .setContainerID(container.containerID()) + .setContainerState(ContainerReplicaProto.State.OPEN) + .setDatanodeDetails(datanodeDetails) + .build()) + .collect(Collectors.toSet()); + + when(containerManager.getContainerReplicas(anyObject())) + .thenReturn(replicaSet); when(containerManager.getContainer(anyObject())) - .thenReturn(containerInfo); + .thenReturn(container); } @After @@ -383,8 +390,7 @@ public class TestDeletedBlockLog { private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException { - List<DatanodeDetails> dns = new ArrayList<>(); - dns.add(dd); + List<DatanodeDetails> dns = Collections.singletonList(dd); Pipeline pipeline = Pipeline.newBuilder() .setType(ReplicationType.STAND_ALONE) .setFactor(ReplicationFactor.ONE) @@ -399,11 +405,18 @@ public class TestDeletedBlockLog { .setReplicationFactor(pipeline.getFactor()); ContainerInfo containerInfo = builder.build(); - ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline( - containerInfo, pipeline); Mockito.doReturn(containerInfo).when(containerManager) .getContainer(ContainerID.valueof(containerID)); - Mockito.doReturn(containerWithPipeline).when(containerManager) - .getContainerWithPipeline(ContainerID.valueof(containerID)); + + final Set<ContainerReplica> replicaSet = dns.stream() + .map(datanodeDetails -> ContainerReplica.newBuilder() + .setContainerID(containerInfo.containerID()) + .setContainerState(ContainerReplicaProto.State.OPEN) + .setDatanodeDetails(datanodeDetails) + .build()) + .collect(Collectors.toSet()); + when(containerManager.getContainerReplicas( + ContainerID.valueof(containerID))) + .thenReturn(replicaSet); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index fd2dadf..fec3f84 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -24,8 +24,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers - .ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -51,6 +49,7 @@ public class TestCloseContainerEventHandler { private static Configuration configuration; private static MockNodeManager nodeManager; + private static PipelineManager pipelineManager; private static SCMContainerManager containerManager; private static long size; private static File testDir; @@ -66,14 +65,14 @@ public class TestCloseContainerEventHandler { configuration .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); nodeManager = new MockNodeManager(true, 10); - PipelineManager pipelineManager = + pipelineManager = new SCMPipelineManager(configuration, nodeManager, eventQueue); containerManager = new SCMContainerManager(configuration, nodeManager, pipelineManager, new EventQueue()); eventQueue = new EventQueue(); eventQueue.addHandler(CLOSE_CONTAINER, - new CloseContainerEventHandler(containerManager)); + new CloseContainerEventHandler(pipelineManager, containerManager)); eventQueue.addHandler(DATANODE_COMMAND, nodeManager); } @@ -105,19 +104,18 @@ public class TestCloseContainerEventHandler { new ContainerID(id)); eventQueue.processAll(1000); Assert.assertTrue(logCapturer.getOutput() - .contains("Failed to update the container state")); + .contains("Failed to close the container")); } @Test public void testCloseContainerEventWithValidContainers() throws IOException { - ContainerWithPipeline containerWithPipeline = containerManager + ContainerInfo container = containerManager .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE, "ozone"); - ContainerID id = new ContainerID( - containerWithPipeline.getContainerInfo().getContainerID()); - DatanodeDetails datanode = - containerWithPipeline.getPipeline().getFirstNode(); + ContainerID id = container.containerID(); + DatanodeDetails datanode = pipelineManager + .getPipeline(container.getPipelineID()).getFirstNode(); int closeCount = nodeManager.getCommandCount(datanode); eventQueue.fireEvent(CLOSE_CONTAINER, id); eventQueue.processAll(1000); @@ -132,23 +130,22 @@ public class TestCloseContainerEventHandler { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer .captureLogs(CloseContainerEventHandler.LOG); - ContainerWithPipeline containerWithPipeline = containerManager + ContainerInfo container = containerManager .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, "ozone"); - ContainerID id = new ContainerID( - containerWithPipeline.getContainerInfo().getContainerID()); + ContainerID id = container.containerID(); int[] closeCount = new int[3]; eventQueue.fireEvent(CLOSE_CONTAINER, id); eventQueue.processAll(1000); int i = 0; - for (DatanodeDetails details : containerWithPipeline.getPipeline() - .getNodes()) { + for (DatanodeDetails details : pipelineManager + .getPipeline(container.getPipelineID()).getNodes()) { closeCount[i] = nodeManager.getCommandCount(details); i++; } i = 0; - for (DatanodeDetails details : containerWithPipeline.getPipeline() - .getNodes()) { + for (DatanodeDetails details : pipelineManager + .getPipeline(container.getPipelineID()).getNodes()) { Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details)); i++; } @@ -156,8 +153,8 @@ public class TestCloseContainerEventHandler { eventQueue.processAll(1000); i = 0; // Make sure close is queued for each datanode on the pipeline - for (DatanodeDetails details : containerWithPipeline.getPipeline() - .getNodes()) { + for (DatanodeDetails details : pipelineManager + .getPipeline(container.getPipelineID()).getNodes()) { Assert.assertEquals(closeCount[i] + 1, nodeManager.getCommandCount(details)); Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING, http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index a2eba92..14f516d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -100,14 +100,14 @@ public class TestContainerReportHandler implements EventPublisher { ContainerInfo cont1 = containerManager .allocateContainer(ReplicationType.STAND_ALONE, - ReplicationFactor.THREE, "root").getContainerInfo(); + ReplicationFactor.THREE, "root"); ContainerInfo cont2 = containerManager .allocateContainer(ReplicationType.STAND_ALONE, - ReplicationFactor.THREE, "root").getContainerInfo(); + ReplicationFactor.THREE, "root"); // Open Container ContainerInfo cont3 = containerManager .allocateContainer(ReplicationType.STAND_ALONE, - ReplicationFactor.THREE, "root").getContainerInfo(); + ReplicationFactor.THREE, "root"); long c1 = cont1.getContainerID(); long c2 = cont2.getContainerID(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index 6e0d85b..f37b447 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -44,11 +43,13 @@ import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; +import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Tests for Container ContainerManager. @@ -109,7 +110,7 @@ public class TestSCMContainerManager { @Test public void testallocateContainer() throws Exception { - ContainerWithPipeline containerInfo = containerManager.allocateContainer( + ContainerInfo containerInfo = containerManager.allocateContainer( xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerOwner); @@ -126,14 +127,15 @@ public class TestSCMContainerManager { */ Set<UUID> pipelineList = new TreeSet<>(); for (int x = 0; x < 30; x++) { - ContainerWithPipeline containerInfo = containerManager.allocateContainer( + ContainerInfo containerInfo = containerManager.allocateContainer( xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerOwner); Assert.assertNotNull(containerInfo); - Assert.assertNotNull(containerInfo.getPipeline()); - pipelineList.add(containerInfo.getPipeline().getFirstNode() + Assert.assertNotNull(containerInfo.getPipelineID()); + pipelineList.add(pipelineManager.getPipeline( + containerInfo.getPipelineID()).getFirstNode() .getUuid()); } Assert.assertTrue(pipelineList.size() > 5); @@ -141,32 +143,27 @@ public class TestSCMContainerManager { @Test public void testGetContainer() throws IOException { - ContainerWithPipeline containerInfo = containerManager.allocateContainer( + ContainerInfo containerInfo = containerManager.allocateContainer( xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerOwner); - Pipeline pipeline = containerInfo.getPipeline(); + Assert.assertNotNull(containerInfo); + Pipeline pipeline = pipelineManager + .getPipeline(containerInfo.getPipelineID()); Assert.assertNotNull(pipeline); - Pipeline newPipeline = containerInfo.getPipeline(); - Assert.assertEquals(pipeline.getFirstNode().getUuid(), - newPipeline.getFirstNode().getUuid()); + Assert.assertEquals(containerInfo, + containerManager.getContainer(containerInfo.containerID())); } @Test public void testGetContainerWithPipeline() throws Exception { - ContainerWithPipeline containerWithPipeline = containerManager + ContainerInfo contInfo = containerManager .allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerOwner); - ContainerInfo contInfo = containerWithPipeline.getContainerInfo(); // Add dummy replicas for container. - DatanodeDetails dn1 = DatanodeDetails.newBuilder() - .setHostName("host1") - .setIpAddress("1.1.1.1") - .setUuid(UUID.randomUUID().toString()).build(); - DatanodeDetails dn2 = DatanodeDetails.newBuilder() - .setHostName("host2") - .setIpAddress("2.2.2.2") - .setUuid(UUID.randomUUID().toString()).build(); + Iterator<DatanodeDetails> nodes = pipelineManager + .getPipeline(contInfo.getPipelineID()).getNodes().iterator(); + DatanodeDetails dn1 = nodes.next(); containerManager.updateContainerState(contInfo.containerID(), LifeCycleEvent.FINALIZE); containerManager @@ -180,27 +177,21 @@ public class TestSCMContainerManager { ContainerReplica.newBuilder().setContainerID(contInfo.containerID()) .setContainerState(ContainerReplicaProto.State.CLOSED) .setDatanodeDetails(dn1).build()); - containerManager.updateContainerReplica(contInfo.containerID(), - ContainerReplica.newBuilder().setContainerID(contInfo.containerID()) - .setContainerState(ContainerReplicaProto.State.CLOSED) - .setDatanodeDetails(dn2).build()); - Assert.assertEquals(2, + Assert.assertEquals(1, containerManager.getContainerReplicas( finalContInfo.containerID()).size()); contInfo = containerManager.getContainer(contInfo.containerID()); Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED); - Pipeline pipeline = containerWithPipeline.getPipeline(); - pipelineManager.finalizePipeline(pipeline.getId()); - - ContainerWithPipeline containerWithPipeline2 = containerManager - .getContainerWithPipeline(contInfo.containerID()); - pipeline = containerWithPipeline2.getPipeline(); - Assert.assertNotEquals(containerWithPipeline, containerWithPipeline2); - Assert.assertNotNull("Pipeline should not be null", pipeline); - Assert.assertTrue(pipeline.getNodes().contains(dn1)); - Assert.assertTrue(pipeline.getNodes().contains(dn2)); + // After closing the container, we should get the replica and construct + // standalone pipeline. No more ratis pipeline. + + Set<DatanodeDetails> replicaNodes = containerManager + .getContainerReplicas(contInfo.containerID()) + .stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet()); + Assert.assertTrue(replicaNodes.contains(dn1)); } @Test @@ -232,11 +223,9 @@ public class TestSCMContainerManager { private ContainerInfo createContainer() throws IOException { nodeManager.setChillmode(false); - ContainerWithPipeline containerWithPipeline = containerManager + return containerManager .allocateContainer(xceiverClientManager.getType(), xceiverClientManager.getFactor(), containerOwner); - ContainerInfo containerInfo = containerWithPipeline.getContainerInfo(); - return containerInfo; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 7a9dbad..ad24a10 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.SCMContainerManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms @@ -155,12 +155,13 @@ public class TestContainerPlacement { assertEquals(remaining * nodeCount, (long) nodeManager.getStats().getRemaining().get()); - ContainerWithPipeline containerWithPipeline = containerManager + ContainerInfo container = containerManager .allocateContainer( xceiverClientManager.getType(), xceiverClientManager.getFactor(), "OZONE"); assertEquals(xceiverClientManager.getFactor().getNumber(), - containerWithPipeline.getPipeline().getNodes().size()); + containerManager.getContainerReplicas( + container.containerID()).size()); } finally { IOUtils.closeQuietly(containerManager); IOUtils.closeQuietly(nodeManager); http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index fd2c973..8d23afd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; @@ -61,8 +62,11 @@ public class TestNode2PipelineMap { cluster.waitForClusterToBeReady(); scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); - ratisContainer = containerManager.allocateContainer( + pipelineManager = scm.getPipelineManager(); + ContainerInfo containerInfo = containerManager.allocateContainer( RATIS, THREE, "testOwner"); + ratisContainer = new ContainerWithPipeline(containerInfo, + pipelineManager.getPipeline(containerInfo.getPipelineID())); pipelineManager = scm.getPipelineManager(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java index 618cd8e..3207878 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java @@ -22,14 +22,13 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.common.helpers - .ContainerWithPipeline; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -48,8 +47,8 @@ public class TestNodeFailure { private static MiniOzoneCluster cluster; private static OzoneConfiguration conf; - private static ContainerWithPipeline ratisContainer1; - private static ContainerWithPipeline ratisContainer2; + private static Pipeline ratisPipelineOne; + private static Pipeline ratisPipelineTwo; private static ContainerManager containerManager; private static PipelineManager pipelineManager; private static long timeForFailure; @@ -76,10 +75,12 @@ public class TestNodeFailure { StorageContainerManager scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); pipelineManager = scm.getPipelineManager(); - ratisContainer1 = containerManager.allocateContainer( - RATIS, THREE, "testOwner"); - ratisContainer2 = containerManager.allocateContainer( - RATIS, THREE, "testOwner"); + ratisPipelineOne = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, THREE, "testOwner").getPipelineID()); + ratisPipelineTwo = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, THREE, "testOwner").getPipelineID()); // At this stage, there should be 2 pipeline one with 1 open container each. // Try closing the both the pipelines, one with a closed container and // the other with an open container. @@ -99,12 +100,15 @@ public class TestNodeFailure { } } + @Ignore + // Enable this after we implement teardown pipeline logic once a datanode + // dies. @Test(timeout = 300_000L) public void testPipelineFail() throws InterruptedException, IOException, TimeoutException { - Assert.assertEquals(ratisContainer1.getPipeline().getPipelineState(), + Assert.assertEquals(ratisPipelineOne.getPipelineState(), Pipeline.PipelineState.OPEN); - Pipeline pipelineToFail = ratisContainer1.getPipeline(); + Pipeline pipelineToFail = ratisPipelineOne; DatanodeDetails dnToFail = pipelineToFail.getFirstNode(); cluster.shutdownHddsDatanode(dnToFail); @@ -112,18 +116,19 @@ public class TestNodeFailure { Thread.sleep(3 * timeForFailure); Assert.assertEquals(Pipeline.PipelineState.CLOSED, - pipelineManager.getPipeline(ratisContainer1.getPipeline().getId()) + pipelineManager.getPipeline(ratisPipelineOne.getId()) .getPipelineState()); Assert.assertEquals(Pipeline.PipelineState.OPEN, - pipelineManager.getPipeline(ratisContainer2.getPipeline().getId()) + pipelineManager.getPipeline(ratisPipelineTwo.getId()) .getPipelineState()); // Now restart the datanode and make sure that a new pipeline is created. cluster.setWaitForClusterToBeReadyTimeout(300000); cluster.restartHddsDatanode(dnToFail, true); - ContainerWithPipeline ratisContainer3 = - containerManager.allocateContainer(RATIS, THREE, "testOwner"); + Pipeline ratisPipelineThree = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, THREE, "testOwner").getPipelineID()); //Assert that new container is not created from the ratis 2 pipeline - Assert.assertNotEquals(ratisContainer3.getPipeline().getId(), - ratisContainer2.getPipeline().getId()); + Assert.assertNotEquals(ratisPipelineThree.getId(), + ratisPipelineTwo.getId()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 211782b..66bdb5b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; @@ -62,10 +63,15 @@ public class TestPipelineClose { cluster.waitForClusterToBeReady(); scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); - ratisContainer1 = containerManager + pipelineManager = scm.getPipelineManager(); + ContainerInfo containerInfo1 = containerManager .allocateContainer(RATIS, THREE, "testOwner"); - ratisContainer2 = containerManager + ratisContainer1 = new ContainerWithPipeline(containerInfo1, + pipelineManager.getPipeline(containerInfo1.getPipelineID())); + ContainerInfo containerInfo2 = containerManager .allocateContainer(RATIS, THREE, "testOwner"); + ratisContainer2 = new ContainerWithPipeline(containerInfo2, + pipelineManager.getPipeline(containerInfo2.getPipelineID())); pipelineManager = scm.getPipelineManager(); // At this stage, there should be 2 pipeline one with 1 open container each. // Try closing the both the pipelines, one with a closed container and http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index 0fa8649..ab70ed1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -65,10 +66,12 @@ public class TestSCMRestart { StorageContainerManager scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); pipelineManager = scm.getPipelineManager(); - ratisPipeline1 = containerManager.allocateContainer( - RATIS, THREE, "Owner1").getPipeline(); - ratisPipeline2 = containerManager.allocateContainer( - RATIS, ONE, "Owner2").getPipeline(); + ratisPipeline1 = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, THREE, "Owner1").getPipelineID()); + ratisPipeline2 = pipelineManager.getPipeline( + containerManager.allocateContainer( + RATIS, ONE, "Owner2").getPipelineID()); // At this stage, there should be 2 pipeline one with 1 open container // each. Try restarting the SCM and then discover that pipeline are in // correct state. @@ -100,10 +103,10 @@ public class TestSCMRestart { Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1); Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2); - // Try creating a new ratis pipeline, it should be from the same pipeline + // Try creating a new container, it should be from the same pipeline // as was before restart - Pipeline newRatisPipeline = newContainerManager - .allocateContainer(RATIS, THREE, "Owner1").getPipeline(); - Assert.assertEquals(newRatisPipeline.getId(), ratisPipeline1.getId()); + ContainerInfo containerInfo = newContainerManager + .allocateContainer(RATIS, THREE, "Owner1"); + Assert.assertEquals(containerInfo.getPipelineID(), ratisPipeline1.getId()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java index d7e5360..9982da4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java @@ -52,9 +52,8 @@ public class OzoneTestUtils { .updateContainerState(ContainerID.valueof(blockID.getContainerID()), HddsProtos.LifeCycleEvent.CLOSE); Assert.assertFalse(scm.getContainerManager() - .getContainerWithPipeline(ContainerID.valueof( - blockID.getContainerID())) - .getContainerInfo().isOpen()); + .getContainer(ContainerID.valueof( + blockID.getContainerID())).isOpen()); } catch (IOException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java index fe060a6..db3a197 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java @@ -446,9 +446,10 @@ public class TestOzoneRestClient { // Sum the data size from chunks in Container via containerID // and localID, make sure the size equals to the actually value size. Pipeline pipeline = cluster.getStorageContainerManager() - .getContainerManager().getContainerWithPipeline( - ContainerID.valueof(containerID)) - .getPipeline(); + .getPipelineManager().getPipeline( + cluster.getStorageContainerManager() + .getContainerManager().getContainer( + ContainerID.valueof(containerID)).getPipelineID()); List<DatanodeDetails> datanodes = pipeline.getNodes(); Assert.assertEquals(datanodes.size(), 1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index df2fd1f..576801d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; @@ -377,10 +378,12 @@ public class TestCloseContainerHandlingByClient { cluster.getStorageContainerManager().getEventQueue() .fireEvent(SCMEvents.CLOSE_CONTAINER, ContainerID.valueof(containerID)); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerID)); Pipeline pipeline = - cluster.getStorageContainerManager().getContainerManager() - .getContainerWithPipeline(ContainerID.valueof(containerID)) - .getPipeline(); + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); pipelineList.add(pipeline); List<DatanodeDetails> datanodes = pipeline.getNodes(); for (DatanodeDetails details : datanodes) { @@ -435,10 +438,13 @@ public class TestCloseContainerHandlingByClient { List<OmKeyLocationInfo> locationInfos = new ArrayList<>(groupOutputStream.getLocationInfoList()); long containerID = locationInfos.get(0).getContainerID(); - List<DatanodeDetails> datanodes = - cluster.getStorageContainerManager().getContainerManager() - .getContainerWithPipeline(ContainerID.valueof(containerID)) - .getPipeline().getNodes(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List<DatanodeDetails> datanodes = pipeline.getNodes(); Assert.assertEquals(1, datanodes.size()); waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE); dataString = fixedLengthString(keyString, (1 * blockSize)); @@ -538,10 +544,13 @@ public class TestCloseContainerHandlingByClient { List<OmKeyLocationInfo> locationInfos = groupOutputStream.getLocationInfoList(); long containerID = locationInfos.get(0).getContainerID(); - List<DatanodeDetails> datanodes = - cluster.getStorageContainerManager().getContainerManager() - .getContainerWithPipeline(ContainerID.valueof(containerID)) - .getPipeline().getNodes(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager() + .getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = + cluster.getStorageContainerManager().getPipelineManager() + .getPipeline(container.getPipelineID()); + List<DatanodeDetails> datanodes = pipeline.getNodes(); Assert.assertEquals(1, datanodes.size()); // move the container on the datanode to Closing state, this will ensure // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying http://git-wip-us.apache.org/repos/asf/hadoop/blob/18fe65d7/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 0b51bb3..555b895 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.XceiverClientSpi; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.*; @@ -645,10 +644,10 @@ public class TestOzoneRpcClient { Assert .assertEquals(value.getBytes().length, keyLocations.get(0).getLength()); - ContainerWithPipeline container = - cluster.getStorageContainerManager().getContainerManager() - .getContainerWithPipeline(new ContainerID(containerID)); - Pipeline pipeline = container.getPipeline(); + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager().getContainer(ContainerID.valueof(containerID)); + Pipeline pipeline = cluster.getStorageContainerManager() + .getPipelineManager().getPipeline(container.getPipelineID()); List<DatanodeDetails> datanodes = pipeline.getNodes(); DatanodeDetails datanodeDetails = datanodes.get(0); @@ -662,17 +661,17 @@ public class TestOzoneRpcClient { // shutdown the datanode cluster.shutdownHddsDatanode(datanodeDetails); - Assert.assertTrue(container.getContainerInfo().getState() + Assert.assertTrue(container.getState() == HddsProtos.LifeCycleState.OPEN); // try to read, this shouls be successful readKey(bucket, keyName, value); - Assert.assertTrue(container.getContainerInfo().getState() + Assert.assertTrue(container.getState() == HddsProtos.LifeCycleState.OPEN); // shutdown the second datanode datanodeDetails = datanodes.get(1); cluster.shutdownHddsDatanode(datanodeDetails); - Assert.assertTrue(container.getContainerInfo().getState() + Assert.assertTrue(container.getState() == HddsProtos.LifeCycleState.OPEN); // the container is open and with loss of 2 nodes we still should be able @@ -750,10 +749,10 @@ public class TestOzoneRpcClient { // Second, sum the data size from chunks in Container via containerID // and localID, make sure the size equals to the size from keyDetails. + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager().getContainer(ContainerID.valueof(containerID)); Pipeline pipeline = cluster.getStorageContainerManager() - .getContainerManager().getContainerWithPipeline( - ContainerID.valueof(containerID)) - .getPipeline(); + .getPipelineManager().getPipeline(container.getPipelineID()); List<DatanodeDetails> datanodes = pipeline.getNodes(); Assert.assertEquals(datanodes.size(), 1); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org