HDDS-801. Quasi close the container when close is not executed via Ratis. Contributed by Nanda kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c4d96400 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c4d96400 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c4d96400 Branch: refs/heads/trunk Commit: c4d96400280d9a1595b82435297dbbed2668c485 Parents: ad5256e Author: Nanda kumar <[email protected]> Authored: Fri Nov 16 23:07:45 2018 +0530 Committer: Nanda kumar <[email protected]> Committed: Fri Nov 16 23:11:33 2018 +0530 ---------------------------------------------------------------------- .../main/proto/DatanodeContainerProtocol.proto | 4 +- .../container/common/impl/ContainerData.java | 17 +- .../container/common/impl/HddsDispatcher.java | 14 +- .../container/common/interfaces/Container.java | 15 +- .../container/common/interfaces/Handler.java | 68 +++++- .../common/report/ContainerReportPublisher.java | 3 +- .../statemachine/DatanodeStateMachine.java | 2 +- .../common/statemachine/StateContext.java | 20 -- .../CloseContainerCommandHandler.java | 138 ++++++------ .../states/endpoint/RegisterEndpointTask.java | 2 +- .../transport/server/XceiverServerGrpc.java | 5 + .../transport/server/XceiverServerSpi.java | 7 + .../server/ratis/XceiverServerRatis.java | 18 +- .../container/keyvalue/KeyValueContainer.java | 41 +++- .../container/keyvalue/KeyValueHandler.java | 119 +++++++---- .../ozoneimpl/ContainerController.java | 124 +++++++++++ .../container/ozoneimpl/OzoneContainer.java | 124 +++++------ .../DownloadAndImportReplicator.java | 16 +- .../OnDemandContainerReplicationSource.java | 10 +- .../commands/CloseContainerCommand.java | 12 +- .../StorageContainerDatanodeProtocol.proto | 8 +- .../common/impl/TestHddsDispatcher.java | 26 ++- .../common/interfaces/TestHandler.java | 16 +- .../TestCloseContainerCommandHandler.java | 210 +++++++++++++++++++ .../container/keyvalue/TestKeyValueHandler.java | 8 +- .../hadoop/hdds/scm/block/BlockManagerImpl.java | 2 +- .../container/CloseContainerEventHandler.java | 4 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 3 +- .../ozone/container/common/TestEndPoint.java | 8 +- .../dist/src/main/smoketest/basic/basic.robot | 2 +- .../hadoop/ozone/TestMiniOzoneCluster.java | 18 +- .../rpc/TestCloseContainerHandlingByClient.java | 26 +-- .../TestCloseContainerByPipeline.java | 80 ++++--- .../TestCloseContainerHandler.java | 3 +- .../container/metrics/TestContainerMetrics.java | 22 +- .../container/server/TestContainerServer.java | 26 ++- .../genesis/BenchMarkDatanodeDispatcher.java | 19 +- .../freon/TestFreonWithDatanodeFastRestart.java | 3 +- 38 files changed, 875 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 5eecdcb..ac2d277 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -229,8 +229,8 @@ message ContainerDataProto { enum State { OPEN = 1; CLOSING = 2; - CLOSED = 3; - QUASI_CLOSED = 4; + QUASI_CLOSED =3; + CLOSED = 4; UNHEALTHY = 5; INVALID = 6; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index ad199f0..6f51517 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -237,11 +237,26 @@ public abstract class ContainerData { * checks if the container is closed. * @return - boolean */ - public synchronized boolean isClosed() { + public synchronized boolean isClosed() { return ContainerDataProto.State.CLOSED == state; } /** + * checks if the container is quasi closed. + * @return - boolean + */ + public synchronized boolean isQuasiClosed() { + return ContainerDataProto.State.QUASI_CLOSED == state; + } + + /** + * Marks this container as quasi closed. + */ + public synchronized void quasiCloseContainer() { + setState(ContainerDataProto.State.QUASI_CLOSED); + } + + /** * Marks this container as closed. */ public synchronized void closeContainer() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index c52d973..29f9b20 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.impl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -76,18 +75,14 @@ public class HddsDispatcher implements ContainerDispatcher { * XceiverServerHandler. */ public HddsDispatcher(Configuration config, ContainerSet contSet, - VolumeSet volumes, StateContext context) { + VolumeSet volumes, Map<ContainerType, Handler> handlers, + StateContext context, ContainerMetrics metrics) { this.conf = config; this.containerSet = contSet; this.volumeSet = volumes; this.context = context; - this.handlers = Maps.newHashMap(); - this.metrics = ContainerMetrics.create(conf); - for (ContainerType containerType : ContainerType.values()) { - handlers.put(containerType, - Handler.getHandlerForContainerType( - containerType, conf, containerSet, volumeSet, metrics)); - } + this.handlers = handlers; + this.metrics = metrics; this.containerCloseThreshold = conf.getFloat( HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD, HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -347,6 +342,7 @@ public class HddsDispatcher implements ContainerDispatcher { } } + @VisibleForTesting public Container getContainer(long containerID) { return containerSet.getContainer(containerID); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 65147cc..2bda747 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -82,11 +82,24 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock { ContainerProtos.ContainerDataProto.State getContainerState(); /** - * Closes a open container, if it is already closed or does not exist a + * Marks the container for closing. Moves the container to CLOSING state. + */ + void markContainerForClose() throws StorageContainerException; + + /** + * Quasi Closes a open container, if it is already closed or does not exist a * StorageContainerException is thrown. * * @throws StorageContainerException */ + void quasiClose() throws StorageContainerException; + + /** + * Closes a open/quasi closed container, if it is already closed or does not + * exist a StorageContainerException is thrown. + * + * @throws StorageContainerException + */ void close() throws StorageContainerException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 53e1c68..19e61c4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -29,8 +29,12 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; @@ -47,26 +51,47 @@ public abstract class Handler { protected String scmID; protected final ContainerMetrics metrics; - protected Handler(Configuration config, ContainerSet contSet, - VolumeSet volumeSet, ContainerMetrics containerMetrics) { - conf = config; - containerSet = contSet; + private final StateContext context; + + protected Handler(Configuration config, StateContext context, + ContainerSet contSet, VolumeSet volumeSet, + ContainerMetrics containerMetrics) { + this.conf = config; + this.context = context; + this.containerSet = contSet; this.volumeSet = volumeSet; this.metrics = containerMetrics; } - public static Handler getHandlerForContainerType(ContainerType containerType, - Configuration config, ContainerSet contSet, VolumeSet volumeSet, - ContainerMetrics metrics) { + public static Handler getHandlerForContainerType( + final ContainerType containerType, final Configuration config, + final StateContext context, final ContainerSet contSet, + final VolumeSet volumeSet, final ContainerMetrics metrics) { switch (containerType) { case KeyValueContainer: - return new KeyValueHandler(config, contSet, volumeSet, metrics); + return new KeyValueHandler(config, context, contSet, volumeSet, metrics); default: throw new IllegalArgumentException("Handler for ContainerType: " + containerType + "doesn't exist."); } } + /** + * This should be called whenever there is state change. It will trigger + * an ICR to SCM. + * + * @param container Container for which ICR has to be sent + */ + protected void sendICR(final Container container) + throws StorageContainerException { + IncrementalContainerReportProto icr = IncrementalContainerReportProto + .newBuilder() + .addReport(container.getContainerReport()) + .build(); + context.addReport(icr); + context.getParent().triggerHeartbeat(); + } + public abstract ContainerCommandResponseProto handle( ContainerCommandRequestProto msg, Container container); @@ -80,6 +105,33 @@ public abstract class Handler { TarContainerPacker packer) throws IOException; + /** + * Marks the container for closing. Moves the container to CLOSING state. + * + * @param container container to update + * @throws IOException in case of exception + */ + public abstract void markContainerForClose(Container container) + throws IOException; + + /** + * Moves the Container to QUASI_CLOSED state. + * + * @param container container to be quasi closed + * @throws IOException + */ + public abstract void quasiCloseContainer(Container container) + throws IOException; + + /** + * Moves the Container to CLOSED state. + * + * @param container container to be closed + * @throws IOException + */ + public abstract void closeContainer(Container container) + throws IOException; + public void setScmID(String scmId) { this.scmID = scmId; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java index ccb9a9a..b92e3b0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java @@ -80,6 +80,7 @@ public class ContainerReportPublisher extends @Override protected ContainerReportsProto getReport() throws IOException { - return getContext().getParent().getContainer().getContainerReport(); + return getContext().getParent().getContainer() + .getController().getContainerReport(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 12c33ff..1d87071 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -99,7 +99,7 @@ public class DatanodeStateMachine implements Closeable { ContainerReplicator replicator = new DownloadAndImportReplicator(container.getContainerSet(), - container.getDispatcher(), + container.getController(), new SimpleContainerDownloader(conf), new TarContainerPacker()); supervisor = http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index e928824..953f730 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -59,8 +59,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; - /** * Current Context of State Machine. */ @@ -116,24 +114,6 @@ public class StateContext { } /** - * Get the container server port. - * @return The container server port if available, return -1 if otherwise - */ - public int getContainerPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getContainerServerPort(); - } - - /** - * Gets the Ratis Port. - * @return int , return -1 if not valid. - */ - public int getRatisPort() { - return parent == null ? - INVALID_PORT : parent.getContainer().getRatisContainerServerPort(); - } - - /** * Returns true if we are entering a new state. * * @return boolean http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 0838be2..9d25812 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -16,21 +16,20 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. - ContainerDataProto.State; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -38,17 +37,19 @@ import org.apache.ratis.protocol.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.UUID; /** * Handler for close container command received from SCM. */ public class CloseContainerCommandHandler implements CommandHandler { - static final Logger LOG = + + private static final Logger LOG = LoggerFactory.getLogger(CloseContainerCommandHandler.class); + private int invocationCount; private long totalTime; - private boolean cmdExecuted; /** * Constructs a ContainerReport handler. @@ -67,79 +68,70 @@ public class CloseContainerCommandHandler implements CommandHandler { @Override public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Close Container command."); - invocationCount++; - long startTime = Time.monotonicNow(); - // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA) - long containerID = -1; try { - CloseContainerCommandProto closeContainerProto = + LOG.debug("Processing Close Container command."); + invocationCount++; + final long startTime = Time.monotonicNow(); + final DatanodeDetails datanodeDetails = context.getParent() + .getDatanodeDetails(); + final CloseContainerCommandProto closeCommand = CloseContainerCommandProto.parseFrom(command.getProtoBufMessage()); - containerID = closeContainerProto.getContainerID(); - // CloseContainer operation is idempotent, if the container is already - // closed, then do nothing. - // TODO: Non-existent container should be handled properly - Container container = - ozoneContainer.getContainerSet().getContainer(containerID); - if (container == null) { - LOG.error("Container {} does not exist in datanode. " - + "Container close failed.", containerID); - cmdExecuted = false; - return; - } - ContainerData containerData = container.getContainerData(); - State containerState = container.getContainerData().getState(); - if (containerState != State.CLOSED) { - LOG.debug("Closing container {}.", containerID); - // when a closeContainerCommand arrives at a Datanode and if the - // container is open, each replica will be moved to closing state first. - if (containerState == State.OPEN) { - containerData.setState(State.CLOSING); + final ContainerController controller = ozoneContainer.getController(); + final long containerId = closeCommand.getContainerID(); + try { + // TODO: Closing of QUASI_CLOSED container. + + final Container container = controller.getContainer(containerId); + + if (container == null) { + LOG.error("Container #{} does not exist in datanode. " + + "Container close failed.", containerId); + return; } - // if the container is already closed, it will be just ignored. - // ICR will get triggered to change the replica state in SCM. - HddsProtos.PipelineID pipelineID = closeContainerProto.getPipelineID(); - HddsProtos.ReplicationType replicationType = - closeContainerProto.getReplicationType(); - - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CloseContainer); - request.setContainerID(containerID); - request.setCloseContainer( - ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid( - context.getParent().getDatanodeDetails().getUuidString()); - // submit the close container request for the XceiverServer to handle - ozoneContainer.submitContainerRequest(request.build(), replicationType, - pipelineID); - // Since the container is closed, we trigger an ICR - IncrementalContainerReportProto icr = - IncrementalContainerReportProto.newBuilder().addReport( - ozoneContainer.getContainerSet().getContainer(containerID) - .getContainerReport()).build(); - context.addReport(icr); - context.getParent().triggerHeartbeat(); - } - } catch (Exception e) { - if (e instanceof NotLeaderException) { - // If the particular datanode is not the Ratis leader, the close - // container command will not be executed by the follower but will be - // executed by Ratis stateMachine transactions via leader to follower. - // There can also be case where the datanode is in candidate state. - // In these situations, NotLeaderException is thrown. - LOG.info("Follower cannot close the container {}.", containerID); - } else { - LOG.error("Can't close container " + containerID, e); + // Move the container to CLOSING state + controller.markContainerForClose(containerId); + + // If the container is part of open pipeline, close it via write channel + if (ozoneContainer.getWriteChannel() + .isExist(closeCommand.getPipelineID())) { + ContainerCommandRequestProto request = + getContainerCommandRequestProto(datanodeDetails, + closeCommand.getContainerID()); + ozoneContainer.getWriteChannel().submitRequest( + request, closeCommand.getPipelineID()); + return; + } + + // The container is not part of any open pipeline. + // QUASI_CLOSE the container using ContainerController. + controller.quasiCloseContainer(containerId); + } catch (NotLeaderException e) { + LOG.debug("Follower cannot close container #{}.", containerId); + } catch (IOException e) { + LOG.error("Can't close container #{}", containerId, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; } - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + } catch (InvalidProtocolBufferException ex) { + LOG.error("Exception while closing container", ex); } } + private ContainerCommandRequestProto getContainerCommandRequestProto( + final DatanodeDetails datanodeDetails, final long containerId) { + final ContainerCommandRequestProto.Builder command = + ContainerCommandRequestProto.newBuilder(); + command.setCmdType(ContainerProtos.Type.CloseContainer); + command.setContainerID(containerId); + command.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + command.setTraceID(UUID.randomUUID().toString()); + command.setDatanodeUuid(datanodeDetails.getUuidString()); + return command.build(); + } + /** * Returns the command type that this command handler handles. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index acd4af9..9918f9d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -111,7 +111,7 @@ public final class RegisterEndpointTask implements try { ContainerReportsProto containerReport = datanodeContainerManager - .getContainerReport(); + .getController().getContainerReport(); NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); PipelineReportsProto pipelineReportsProto = datanodeContainerManager.getPipelineReport(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index ab9f42f..992f3cb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -141,6 +141,11 @@ public final class XceiverServerGrpc implements XceiverServerSpi { } @Override + public boolean isExist(HddsProtos.PipelineID pipelineId) { + return PipelineID.valueOf(id).getProtobuf().equals(pipelineId); + } + + @Override public List<PipelineReport> getPipelineReport() { return Collections.singletonList( PipelineReport.newBuilder() http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index 8c3fa5c..4e0d343 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -54,6 +54,13 @@ public interface XceiverServerSpi { throws IOException; /** + * Returns true if the given pipeline exist. + * + * @return true if pipeline present, else false + */ + boolean isExist(HddsProtos.PipelineID pipelineId); + + /** * Get pipeline report for the XceiverServer instance. * @return list of report for each pipeline. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7bf4da9..434d330 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -86,7 +86,8 @@ import java.util.concurrent.atomic.AtomicLong; * Ozone containers. */ public final class XceiverServerRatis implements XceiverServerSpi { - static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + private static final Logger LOG = LoggerFactory + .getLogger(XceiverServerRatis.class); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); private static long nextCallId() { @@ -458,6 +459,21 @@ public final class XceiverServerRatis implements XceiverServerSpi { } @Override + public boolean isExist(HddsProtos.PipelineID pipelineId) { + try { + for (RaftGroupId groupId : server.getGroupIds()) { + if (PipelineID.valueOf( + groupId.getUuid()).getProtobuf().equals(pipelineId)) { + return true; + } + } + return false; + } catch (IOException e) { + return false; + } + } + + @Override public List<PipelineReport> getPipelineReport() { try { Iterable<RaftGroupId> gids = server.getGroupIds(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index b82c12f..f725d22 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerDataProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerType; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -267,29 +269,47 @@ public class KeyValueContainer implements Container<KeyValueContainerData> { } @Override + public void markContainerForClose() throws StorageContainerException { + updateContainerData(() -> + containerData.setState(ContainerDataProto.State.CLOSING)); + } + + @Override + public void quasiClose() throws StorageContainerException { + updateContainerData(containerData::quasiCloseContainer); + } + + @Override public void close() throws StorageContainerException { + updateContainerData(containerData::closeContainer); + // It is ok if this operation takes a bit of time. + // Close container is not expected to be instantaneous. + compactDB(); + } - //TODO: writing .container file and compaction can be done - // asynchronously, otherwise rpc call for this will take a lot of time to - // complete this action + private void updateContainerData(Runnable update) + throws StorageContainerException { + ContainerDataProto.State oldState = null; try { writeLock(); - - containerData.closeContainer(); + oldState = containerData.getState(); + update.run(); File containerFile = getContainerFile(); // update the new container data to .container File updateContainerFile(containerFile); } catch (StorageContainerException ex) { - // Failed to update .container file. Reset the state to CLOSING - containerData.setState(ContainerProtos.ContainerDataProto.State.CLOSING); + if (oldState != null) { + // Failed to update .container file. Reset the state to CLOSING + containerData.setState(oldState); + } throw ex; } finally { writeUnlock(); } + } - // It is ok if this operation takes a bit of time. - // Close container is not expected to be instantaneous. + private void compactDB() throws StorageContainerException { try { MetadataStore db = BlockUtils.getDB(containerData, config); db.compactDB(); @@ -549,6 +569,9 @@ public class KeyValueContainer implements Container<KeyValueContainerData> { case CLOSING: state = ContainerReplicaProto.State.CLOSING; break; + case QUASI_CLOSED: + state = ContainerReplicaProto.State.QUASI_CLOSED; + break; case CLOSED: state = ContainerReplicaProto.State.CLOSED; break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index f970c72..4a5ef18 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -33,7 +33,7 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerDataProto; + .ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume .RoundRobinVolumeChoosingPolicy; @@ -109,9 +110,9 @@ public class KeyValueHandler extends Handler { private final long maxContainerSize; private final AutoCloseableLock handlerLock; - public KeyValueHandler(Configuration config, ContainerSet contSet, - VolumeSet volSet, ContainerMetrics metrics) { - super(config, contSet, volSet, metrics); + public KeyValueHandler(Configuration config, StateContext context, + ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { + super(config, context, contSet, volSet, metrics); containerType = ContainerType.KeyValueContainer; blockManager = new BlockManagerImpl(config); chunkManager = new ChunkManagerImpl(); @@ -372,20 +373,10 @@ public class KeyValueHandler extends Handler { request.getTraceID()); return ContainerUtils.malformedRequest(request); } - - long containerID = kvContainer.getContainerData().getContainerID(); try { - checkContainerOpen(kvContainer); - // TODO : The close command should move the container to either quasi - // closed/closed depending upon how the closeContainer gets executed. - // If it arrives by Standalone, it will be moved to Quasi Closed or - // otherwise moved to Closed state if it gets executed via Ratis. - kvContainer.close(); + markContainerForClose(kvContainer); + closeContainer(kvContainer); } catch (StorageContainerException ex) { - if (ex.getResult() == CLOSED_CONTAINER_IO) { - LOG.debug("Container {} is already closed.", containerID); - return ContainerUtils.getSuccessResponse(request); - } return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { return ContainerUtils.logAndReturnError(LOG, @@ -745,38 +736,39 @@ public class KeyValueHandler extends Handler { private void checkContainerOpen(KeyValueContainer kvContainer) throws StorageContainerException { - ContainerDataProto.State containerState = kvContainer.getContainerState(); + final State containerState = kvContainer.getContainerState(); - /** + /* * In a closing state, follower will receive transactions from leader. * Once the leader is put to closing state, it will reject further requests * from clients. Only the transactions which happened before the container * in the leader goes to closing state, will arrive here even the container * might already be in closing state here. */ - if (containerState == ContainerDataProto.State.OPEN - || containerState == ContainerDataProto.State.CLOSING) { + if (containerState == State.OPEN || containerState == State.CLOSING) { return; - } else { - String msg = "Requested operation not allowed as ContainerState is " + - containerState; - ContainerProtos.Result result = null; - switch (containerState) { - case CLOSED: - result = CLOSED_CONTAINER_IO; - break; - case UNHEALTHY: - result = CONTAINER_UNHEALTHY; - break; - case INVALID: - result = INVALID_CONTAINER_STATE; - break; - default: - result = CONTAINER_INTERNAL_ERROR; - } + } - throw new StorageContainerException(msg, result); + final ContainerProtos.Result result; + switch (containerState) { + case QUASI_CLOSED: + result = CLOSED_CONTAINER_IO; + break; + case CLOSED: + result = CLOSED_CONTAINER_IO; + break; + case UNHEALTHY: + result = CONTAINER_UNHEALTHY; + break; + case INVALID: + result = INVALID_CONTAINER_STATE; + break; + default: + result = CONTAINER_INTERNAL_ERROR; } + String msg = "Requested operation not allowed as ContainerState is " + + containerState; + throw new StorageContainerException(msg, result); } public Container importContainer(long containerID, long maxSize, @@ -796,4 +788,55 @@ public class KeyValueHandler extends Handler { return container; } + + @Override + public void markContainerForClose(Container container) + throws IOException { + State currentState = container.getContainerState(); + // Move the container to CLOSING state only if it's OPEN + if (currentState == State.OPEN) { + container.markContainerForClose(); + sendICR(container); + } + } + + @Override + public void quasiCloseContainer(Container container) + throws IOException { + final State state = container.getContainerState(); + // Quasi close call is idempotent. + if (state == State.QUASI_CLOSED) { + return; + } + // The container has to be in CLOSING state. + if (state != State.CLOSING) { + ContainerProtos.Result error = state == State.INVALID ? + INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR; + throw new StorageContainerException("Cannot quasi close container #" + + container.getContainerData().getContainerID() + " while in " + + state + " state.", error); + } + container.quasiClose(); + sendICR(container); + } + + @Override + public void closeContainer(Container container) + throws IOException { + final State state = container.getContainerState(); + // Close call is idempotent. + if (state == State.CLOSED) { + return; + } + // The container has to be either in CLOSING or in QUASI_CLOSED state. + if (state != State.CLOSING && state != State.QUASI_CLOSED) { + ContainerProtos.Result error = state == State.INVALID ? + INVALID_CONTAINER_STATE : CONTAINER_INTERNAL_ERROR; + throw new StorageContainerException("Cannot close container #" + + container.getContainerData().getContainerID() + " while in " + + state + " state.", error); + } + container.close(); + sendICR(container); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java new file mode 100644 index 0000000..41b74e7 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.ozoneimpl; + +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerDataProto.State; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Map; + +/** + * Control plane for container management in datanode. + */ +public class ContainerController { + + private final ContainerSet containerSet; + private final Map<ContainerType, Handler> handlers; + + public ContainerController(final ContainerSet containerSet, + final Map<ContainerType, Handler> handlers) { + this.containerSet = containerSet; + this.handlers = handlers; + } + + /** + * Returns the Container given a container id. + * + * @param containerId ID of the container + * @return Container + */ + public Container getContainer(final long containerId) { + return containerSet.getContainer(containerId); + } + + /** + * Marks the container for closing. Moves the container to CLOSING state. + * + * @param containerId Id of the container to update + * @throws IOException in case of exception + */ + public void markContainerForClose(final long containerId) + throws IOException { + Container container = containerSet.getContainer(containerId); + + if (container.getContainerState() == State.OPEN) { + getHandler(container).markContainerForClose(container); + } + } + + /** + * Returns the container report. + * + * @return ContainerReportsProto + * @throws IOException in case of exception + */ + public ContainerReportsProto getContainerReport() + throws IOException { + return containerSet.getContainerReport(); + } + + /** + * Quasi closes a container given its id. + * + * @param containerId Id of the container to quasi close + * @throws IOException in case of exception + */ + public void quasiCloseContainer(final long containerId) throws IOException { + final Container container = containerSet.getContainer(containerId); + getHandler(container).quasiCloseContainer(container); + } + + /** + * Closes a container given its id. + * + * @param containerId Id of the container to close + * @throws IOException in case of exception + */ + public void closeContainer(final long containerId) throws IOException { + final Container container = containerSet.getContainer(containerId); + getHandler(container).closeContainer(container); + } + + public Container importContainer(final ContainerType type, + final long containerId, final long maxSize, + final FileInputStream rawContainerStream, final TarContainerPacker packer) + throws IOException { + return handlers.get(type).importContainer( + containerId, maxSize, rawContainerStream, packer); + } + + /** + * Given a container, returns its handler instance. + * + * @param container Container + * @return handler of the container + */ + private Handler getHandler(final Container container) { + return handlers.get(container.getContainerType()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 9fac3cb..a89b50a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,18 +19,20 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; @@ -47,27 +49,26 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; - /** * Ozone main class sets up the network servers and initializes the container * layer. */ public class OzoneContainer { - public static final Logger LOG = LoggerFactory.getLogger( + private static final Logger LOG = LoggerFactory.getLogger( OzoneContainer.class); private final HddsDispatcher hddsDispatcher; - private final DatanodeDetails dnDetails; + private final Map<ContainerType, Handler> handlers; private final OzoneConfiguration config; private final VolumeSet volumeSet; private final ContainerSet containerSet; - private final Map<ReplicationType, XceiverServerSpi> servers; + private final XceiverServerSpi writeChannel; + private final XceiverServerSpi readChannel; + private final ContainerController controller; /** * Construct OzoneContainer object. @@ -78,31 +79,42 @@ public class OzoneContainer { */ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration conf, StateContext context) throws IOException { - this.dnDetails = datanodeDetails; this.config = conf; this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); this.containerSet = new ContainerSet(); buildContainerSet(); - hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, - context); - servers = new HashMap<>(); - servers.put(ReplicationType.STAND_ALONE, - new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher, - createReplicationService())); - servers.put(ReplicationType.RATIS, XceiverServerRatis - .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher, - context)); + final ContainerMetrics metrics = ContainerMetrics.create(conf); + this.handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, context, containerSet, volumeSet, metrics)); + } + this.hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, + handlers, context, metrics); + + /* + * ContainerController is the control plane + * XceiverServerRatis is the write channel + * XceiverServerGrpc is the read channel + */ + this.controller = new ContainerController(containerSet, handlers); + this.writeChannel = XceiverServerRatis.newXceiverServerRatis( + datanodeDetails, config, hddsDispatcher, context); + this.readChannel = new XceiverServerGrpc( + datanodeDetails, config, hddsDispatcher, createReplicationService()); + } private GrpcReplicationService createReplicationService() { return new GrpcReplicationService( - new OnDemandContainerReplicationSource(containerSet)); + new OnDemandContainerReplicationSource(controller)); } /** * Build's container map. */ - public void buildContainerSet() { + private void buildContainerSet() { Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList() .iterator(); ArrayList<Thread> volumeThreads = new ArrayList<Thread>(); @@ -111,7 +123,6 @@ public class OzoneContainer { // And also handle disk failure tolerance need to be added while (volumeSetIterator.hasNext()) { HddsVolume volume = volumeSetIterator.next(); - File hddsVolumeRootDir = volume.getHddsRootDir(); Thread thread = new Thread(new ContainerReader(volumeSet, volume, containerSet, config)); thread.start(); @@ -135,9 +146,8 @@ public class OzoneContainer { */ public void start() throws IOException { LOG.info("Attempting to start container services."); - for (XceiverServerSpi serverinstance : servers.values()) { - serverinstance.start(); - } + writeChannel.start(); + readChannel.start(); hddsDispatcher.init(); } @@ -147,9 +157,8 @@ public class OzoneContainer { public void stop() { //TODO: at end of container IO integration work. LOG.info("Attempting to stop container services."); - for(XceiverServerSpi serverinstance: servers.values()) { - serverinstance.stop(); - } + writeChannel.stop(); + readChannel.stop(); hddsDispatcher.shutdown(); } @@ -163,58 +172,24 @@ public class OzoneContainer { * @return - container report. * @throws IOException */ - public StorageContainerDatanodeProtocolProtos.ContainerReportsProto - getContainerReport() throws IOException { - return this.containerSet.getContainerReport(); - } public PipelineReportsProto getPipelineReport() { PipelineReportsProto.Builder pipelineReportsProto = - PipelineReportsProto.newBuilder(); - for (XceiverServerSpi serverInstance : servers.values()) { - pipelineReportsProto - .addAllPipelineReport(serverInstance.getPipelineReport()); - } + PipelineReportsProto.newBuilder(); + pipelineReportsProto.addAllPipelineReport(writeChannel.getPipelineReport()); return pipelineReportsProto.build(); } - /** - * Submit ContainerRequest. - * @param request - * @param replicationType - * @param pipelineID - */ - public void submitContainerRequest( - ContainerProtos.ContainerCommandRequestProto request, - ReplicationType replicationType, - PipelineID pipelineID) throws IOException { - LOG.info("submitting {} request over {} server for container {}", - request.getCmdType(), replicationType, request.getContainerID()); - Preconditions.checkState(servers.containsKey(replicationType)); - servers.get(replicationType).submitRequest(request, pipelineID); - } - - private int getPortByType(ReplicationType replicationType) { - return servers.containsKey(replicationType) ? - servers.get(replicationType).getIPCPort() : INVALID_PORT; + public XceiverServerSpi getWriteChannel() { + return writeChannel; } - /** - * Returns the container servers IPC port. - * - * @return Container servers IPC port. - */ - public int getContainerServerPort() { - return getPortByType(ReplicationType.STAND_ALONE); + public XceiverServerSpi getReadChannel() { + return readChannel; } - /** - * Returns the Ratis container Server IPC port. - * - * @return Ratis port. - */ - public int getRatisContainerServerPort() { - return getPortByType(ReplicationType.RATIS); + public ContainerController getController() { + return controller; } /** @@ -230,11 +205,6 @@ public class OzoneContainer { return this.hddsDispatcher; } - @VisibleForTesting - public XceiverServerSpi getServer(ReplicationType replicationType) { - return servers.get(replicationType); - } - public VolumeSet getVolumeSet() { return volumeSet; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 5ef5841..e8f0d17 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -28,9 +28,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; import org.slf4j.Logger; @@ -49,7 +48,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator { private final ContainerSet containerSet; - private final ContainerDispatcher containerDispatcher; + private final ContainerController controller; private final ContainerDownloader downloader; @@ -57,11 +56,11 @@ public class DownloadAndImportReplicator implements ContainerReplicator { public DownloadAndImportReplicator( ContainerSet containerSet, - ContainerDispatcher containerDispatcher, + ContainerController controller, ContainerDownloader downloader, TarContainerPacker packer) { this.containerSet = containerSet; - this.containerDispatcher = containerDispatcher; + this.controller = controller; this.downloader = downloader; this.packer = packer; } @@ -80,10 +79,9 @@ public class DownloadAndImportReplicator implements ContainerReplicator { try (FileInputStream tempContainerTarStream = new FileInputStream( tarFilePath.toFile())) { - Handler handler = containerDispatcher.getHandler( - originalContainerData.getContainerType()); - - Container container = handler.importContainer(containerID, + Container container = controller.importContainer( + originalContainerData.getContainerType(), + containerID, originalContainerData.getMaxSize(), tempContainerTarStream, packer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java index d557b54..28b8713 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -20,12 +20,12 @@ package org.apache.hadoop.ozone.container.replication; import java.io.IOException; import java.io.OutputStream; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import com.google.common.base.Preconditions; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,13 +39,13 @@ public class OnDemandContainerReplicationSource private static final Logger LOG = LoggerFactory.getLogger(ContainerReplicationSource.class); - private ContainerSet containerSet; + private ContainerController controller; private ContainerPacker packer = new TarContainerPacker(); public OnDemandContainerReplicationSource( - ContainerSet containerSet) { - this.containerSet = containerSet; + ContainerController controller) { + this.controller = controller; } @Override @@ -57,7 +57,7 @@ public class OnDemandContainerReplicationSource public void copyData(long containerId, OutputStream destination) throws IOException { - Container container = containerSet.getContainer(containerId); + Container container = controller.getContainer(containerId); Preconditions .checkNotNull(container, "Container is not found " + containerId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index 7849bcd..4fe6ae4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -30,14 +29,11 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; public class CloseContainerCommand extends SCMCommand<CloseContainerCommandProto> { - private HddsProtos.ReplicationType replicationType; - private PipelineID pipelineID; + private final PipelineID pipelineID; - public CloseContainerCommand(long containerID, - HddsProtos.ReplicationType replicationType, - PipelineID pipelineID) { + public CloseContainerCommand(final long containerID, + final PipelineID pipelineID) { super(containerID); - this.replicationType = replicationType; this.pipelineID = pipelineID; } @@ -65,7 +61,6 @@ public class CloseContainerCommand return CloseContainerCommandProto.newBuilder() .setContainerID(getId()) .setCmdId(getId()) - .setReplicationType(replicationType) .setPipelineID(pipelineID.getProtobuf()) .build(); } @@ -74,7 +69,6 @@ public class CloseContainerCommand CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); return new CloseContainerCommand(closeContainerProto.getCmdId(), - closeContainerProto.getReplicationType(), PipelineID.getFromProtobuf(closeContainerProto.getPipelineID())); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 33ea307..9fdef7d 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -140,8 +140,8 @@ message ContainerReplicaProto { enum State { OPEN = 1; CLOSING = 2; - CLOSED = 3; - QUASI_CLOSED = 4; + QUASI_CLOSED = 3; + CLOSED = 4; UNHEALTHY = 5; INVALID = 6; } @@ -289,9 +289,9 @@ This command asks the datanode to close a specific container. */ message CloseContainerCommandProto { required int64 containerID = 1; - required hadoop.hdds.ReplicationType replicationType = 2; + required PipelineID pipelineID = 2; + // cmdId will be removed required int64 cmdId = 3; - required PipelineID pipelineID = 4; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 35cda00..085e6f9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.impl; +import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; @@ -26,6 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto + .ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto .ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; @@ -33,7 +36,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; @@ -47,6 +52,7 @@ import org.mockito.Mockito; import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.UUID; import static java.nio.charset.StandardCharsets.UTF_8; @@ -77,8 +83,15 @@ public class TestHddsDispatcher { container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId.toString()); containerSet.addContainer(container); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map<ContainerType, Handler> handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } HddsDispatcher hddsDispatcher = new HddsDispatcher( - conf, containerSet, volumeSet, context); + conf, containerSet, volumeSet, handlers, context, metrics); hddsDispatcher.setScmId(scmId.toString()); ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch( getWriteChunkRequest(dd.getUuidString(), 1L, 1L)); @@ -113,8 +126,15 @@ public class TestHddsDispatcher { ContainerSet containerSet = new ContainerSet(); VolumeSet volumeSet = new VolumeSet(dd.getUuidString(), conf); StateContext context = Mockito.mock(StateContext.class); - HddsDispatcher hddsDispatcher = - new HddsDispatcher(conf, containerSet, volumeSet, context); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map<ContainerType, Handler> handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } + HddsDispatcher hddsDispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, handlers, context, metrics); hddsDispatcher.setScmId(scmId.toString()); ContainerCommandRequestProto writeChunkRequest = getWriteChunkRequest(dd.getUuidString(), 1L, 1L); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java index b658295..e763289 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java @@ -18,8 +18,10 @@ package org.apache.hadoop.ozone.container.common.interfaces; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; @@ -32,6 +34,8 @@ import org.junit.rules.TestRule; import org.junit.rules.Timeout; import org.mockito.Mockito; +import java.util.Map; + /** * Tests Handler interface. */ @@ -50,8 +54,16 @@ public class TestHandler { this.conf = new Configuration(); this.containerSet = Mockito.mock(ContainerSet.class); this.volumeSet = Mockito.mock(VolumeSet.class); - - this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, null); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap(); + for (ContainerProtos.ContainerType containerType : + ContainerProtos.ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType( + containerType, conf, null, containerSet, volumeSet, metrics)); + } + this.dispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, handlers, null, metrics); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java new file mode 100644 index 0000000..0cf95b7 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.retry.RetryPolicy; +import org.apache.ratis.rpc.SupportedRpcType; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; + +/** + * Test cases to verify CloseContainerCommandHandler in datanode. + */ +public class TestCloseContainerCommandHandler { + + private static final StateContext CONTEXT = Mockito.mock(StateContext.class); + private static File testDir; + + + private OzoneContainer getOzoneContainer(final OzoneConfiguration conf, + final DatanodeDetails datanodeDetails) throws IOException { + testDir = GenericTestUtils.getTestDir( + TestCloseContainerCommandHandler.class.getName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath()); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, testDir.getPath()); + + return new OzoneContainer(datanodeDetails, conf, CONTEXT); + } + + + @Test + public void testCloseContainerViaRatis() + throws IOException, InterruptedException { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer container = getOzoneContainer(conf, datanodeDetails); + container.getDispatcher().setScmId(UUID.randomUUID().toString()); + container.start(); + // Give some time for ratis for leader election. + final PipelineID pipelineID = PipelineID.randomId(); + final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails); + final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, + Collections.singleton(datanodeDetails)); + final RaftClient client = RatisHelper.newRaftClient( + SupportedRpcType.GRPC, peer, retryPolicy); + System.out.println(client.groupAdd(group, peer.getId()).isSuccess()); + Thread.sleep(2000); + final ContainerID containerId = ContainerID.valueof(1); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CreateContainer); + request.setContainerID(containerId.getId()); + request.setCreateContainer( + ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(datanodeDetails.getUuidString()); + container.getWriteChannel().submitRequest( + request.build(), pipelineID.getProtobuf()); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + // We have created a container via ratis. Now close the container on ratis. + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + final CloseContainerCommand command = new CloseContainerCommand( + containerId.getId(), pipelineID); + final DatanodeStateMachine datanodeStateMachine = Mockito.mock( + DatanodeStateMachine.class); + + Mockito.when(datanodeStateMachine.getDatanodeDetails()) + .thenReturn(datanodeDetails); + Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine); + + closeHandler.handle(command, container, CONTEXT, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat(); + container.stop(); + } + + @Test + public void testCloseContainerViaStandalone() + throws IOException, InterruptedException { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer container = getOzoneContainer(conf, datanodeDetails); + container.getDispatcher().setScmId(UUID.randomUUID().toString()); + container.start(); + // Give some time for ratis for leader election. + final PipelineID pipelineID = PipelineID.randomId(); + final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); + final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); + final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails); + final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, + Collections.singleton(datanodeDetails)); + final RaftClient client = RatisHelper.newRaftClient( + SupportedRpcType.GRPC, peer, retryPolicy); + System.out.println(client.groupAdd(group, peer.getId()).isSuccess()); + Thread.sleep(2000); + final ContainerID containerId = ContainerID.valueof(2); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CreateContainer); + request.setContainerID(containerId.getId()); + request.setCreateContainer( + ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid(datanodeDetails.getUuidString()); + container.getWriteChannel().submitRequest( + request.build(), pipelineID.getProtobuf()); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + // We have created a container via ratis. Now quasi close it + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + // Specify a pipeline which doesn't exist in the datanode. + final CloseContainerCommand command = new CloseContainerCommand( + containerId.getId(), PipelineID.randomId()); + final DatanodeStateMachine datanodeStateMachine = Mockito.mock( + DatanodeStateMachine.class); + + Mockito.when(datanodeStateMachine.getDatanodeDetails()) + .thenReturn(datanodeDetails); + Mockito.when(CONTEXT.getParent()).thenReturn(datanodeStateMachine); + + closeHandler.handle(command, container, CONTEXT, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED, + container.getContainerSet().getContainer( + containerId.getId()).getContainerState()); + + Mockito.verify(datanodeStateMachine, Mockito.times(2)).triggerHeartbeat(); + container.stop(); + } + + /** + * Creates a random DatanodeDetails. + * @return DatanodeDetails + */ + private static DatanodeDetails randomDatanodeDetails() { + String ipAddress = "127.0.0.1"; + DatanodeDetails.Port containerPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.STANDALONE, 0); + DatanodeDetails.Port ratisPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.RATIS, 0); + DatanodeDetails.Port restPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.REST, 0); + DatanodeDetails.Builder builder = DatanodeDetails.newBuilder(); + builder.setUuid(UUID.randomUUID().toString()) + .setHostName("localhost") + .setIpAddress(ipAddress) + .addPort(containerPort) + .addPort(ratisPort) + .addPort(restPort); + return builder.build(); + } + + @AfterClass + public static void teardown() throws IOException { + FileUtils.deleteDirectory(testDir); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 7fc065f..265fb18 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -54,6 +54,7 @@ import static org.mockito.Mockito.times; import java.io.File; +import java.io.IOException; import java.util.UUID; /** @@ -224,7 +225,7 @@ public class TestKeyValueHandler { interval[0] = 2; ContainerMetrics metrics = new ContainerMetrics(interval); VolumeSet volumeSet = new VolumeSet(UUID.randomUUID().toString(), conf); - KeyValueHandler keyValueHandler = new KeyValueHandler(conf, cset, + KeyValueHandler keyValueHandler = new KeyValueHandler(conf, null, cset, volumeSet, metrics); assertEquals("org.apache.hadoop.ozone.container.common" + ".volume.RoundRobinVolumeChoosingPolicy", @@ -235,7 +236,7 @@ public class TestKeyValueHandler { conf.set(HDDS_DATANODE_VOLUME_CHOOSING_POLICY, "org.apache.hadoop.ozone.container.common.impl.HddsDispatcher"); try { - new KeyValueHandler(conf, cset, volumeSet, metrics); + new KeyValueHandler(conf, null, cset, volumeSet, metrics); } catch (RuntimeException ex) { GenericTestUtils.assertExceptionContains("class org.apache.hadoop" + ".ozone.container.common.impl.HddsDispatcher not org.apache" + @@ -261,7 +262,7 @@ public class TestKeyValueHandler { @Test - public void testCloseInvalidContainer() { + public void testCloseInvalidContainer() throws IOException { long containerID = 1234L; Configuration conf = new Configuration(); KeyValueContainerData kvData = new KeyValueContainerData(containerID, @@ -282,6 +283,7 @@ public class TestKeyValueHandler { Mockito.when(handler.handleCloseContainer(any(), any())) .thenCallRealMethod(); + doCallRealMethod().when(handler).closeContainer(any()); // Closing invalid container should return error response. ContainerProtos.ContainerCommandResponseProto response = handler.handleCloseContainer(closeContainerRequest, container); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java index 6f96f4b..34030e8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -172,7 +172,7 @@ public class BlockManagerImpl implements EventHandler<Boolean>, LOG.warn("Unable to allocate container."); } } catch (IOException ex) { - LOG.warn("Unable to allocate container: {}", ex); + LOG.warn("Unable to allocate container.", ex); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4d96400/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index 719d763..fd73711 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -74,8 +74,8 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> { if (container.getState() == LifeCycleState.CLOSING) { final CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(containerID.getId(), - container.getReplicationType(), container.getPipelineID()); + new CloseContainerCommand( + containerID.getId(), container.getPipelineID()); getNodes(container).forEach(node -> publisher.fireEvent( DATANODE_COMMAND, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
