HDDS-81. Moving ContainerReport inside Datanode heartbeat. Contributed by Nanda Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/201440b9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/201440b9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/201440b9 Branch: refs/heads/HDDS-48 Commit: 201440b987d5ef3910c2045b2411c213ed6eec1f Parents: 4827e9a Author: Anu Engineer <aengin...@apache.org> Authored: Tue May 29 12:40:27 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Tue May 29 12:48:50 2018 -0700 ---------------------------------------------------------------------- .../common/impl/ContainerManagerImpl.java | 22 +- .../common/impl/StorageLocationReport.java | 8 +- .../common/interfaces/ContainerManager.java | 8 +- .../statemachine/DatanodeStateMachine.java | 7 +- .../common/statemachine/StateContext.java | 16 +- .../CloseContainerCommandHandler.java | 113 ++++++++ .../commandhandler/CloseContainerHandler.java | 113 -------- .../commandhandler/CommandDispatcher.java | 5 +- .../commandhandler/CommandHandler.java | 8 +- .../DeleteBlocksCommandHandler.java | 12 +- .../states/endpoint/HeartbeatEndpointTask.java | 30 +- .../states/endpoint/RegisterEndpointTask.java | 12 +- .../container/ozoneimpl/OzoneContainer.java | 10 +- .../StorageContainerDatanodeProtocol.java | 30 +- .../protocol/StorageContainerNodeProtocol.java | 15 +- .../commands/CloseContainerCommand.java | 18 +- .../protocol/commands/DeleteBlocksCommand.java | 18 +- .../protocol/commands/RegisteredCommand.java | 26 +- .../protocol/commands/ReregisterCommand.java | 16 +- .../ozone/protocol/commands/SCMCommand.java | 4 +- ...rDatanodeProtocolClientSideTranslatorPB.java | 50 +--- ...rDatanodeProtocolServerSideTranslatorPB.java | 53 ++-- .../StorageContainerDatanodeProtocol.proto | 256 ++++++++--------- .../ozone/container/common/ScmTestMock.java | 78 ++---- .../hdds/scm/container/ContainerMapping.java | 10 +- .../hadoop/hdds/scm/container/Mapping.java | 6 +- .../replication/ContainerSupervisor.java | 13 +- .../container/replication/InProgressPool.java | 15 +- .../hdds/scm/node/HeartbeatQueueItem.java | 14 +- .../hadoop/hdds/scm/node/SCMNodeManager.java | 58 ++-- .../hdds/scm/node/SCMNodeStorageStatMap.java | 14 +- .../scm/server/SCMDatanodeProtocolServer.java | 195 +++++++------ .../org/apache/hadoop/hdds/scm/TestUtils.java | 19 +- .../hdds/scm/container/MockNodeManager.java | 26 +- .../scm/container/TestContainerMapping.java | 24 +- .../container/closer/TestContainerCloser.java | 12 +- .../hdds/scm/node/TestContainerPlacement.java | 6 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 83 +++--- .../scm/node/TestSCMNodeStorageStatMap.java | 16 +- .../ozone/container/common/TestEndPoint.java | 113 ++------ .../replication/TestContainerSupervisor.java | 275 ------------------- .../ReplicationDatanodeStateManager.java | 101 ------- .../testutils/ReplicationNodeManagerMock.java | 14 +- .../ozone/TestStorageContainerManager.java | 11 +- .../apache/hadoop/ozone/scm/TestSCMMetrics.java | 68 ++--- 45 files changed, 706 insertions(+), 1315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java index 9355364..af47015 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java @@ -35,11 +35,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -854,11 +854,11 @@ public class ContainerManagerImpl implements ContainerManager { * @return node report. */ @Override - public SCMNodeReport getNodeReport() throws IOException { + public NodeReportProto getNodeReport() throws IOException { StorageLocationReport[] reports = locationManager.getLocationReport(); - SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); + NodeReportProto.Builder nrb = NodeReportProto.newBuilder(); for (int i = 0; i < reports.length; i++) { - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + StorageReportProto.Builder srb = StorageReportProto.newBuilder(); nrb.addStorageReport(reports[i].getProtoBufMessage()); } return nrb.build(); @@ -891,7 +891,7 @@ public class ContainerManagerImpl implements ContainerManager { * @throws IOException */ @Override - public ContainerReportsRequestProto getContainerReport() throws IOException { + public ContainerReportsProto getContainerReport() throws IOException { LOG.debug("Starting container report iteration."); // No need for locking since containerMap is a ConcurrentSkipListMap // And we can never get the exact state since close might happen @@ -899,12 +899,8 @@ public class ContainerManagerImpl implements ContainerManager { List<ContainerData> containers = containerMap.values().stream() .collect(Collectors.toList()); - ContainerReportsRequestProto.Builder crBuilder = - ContainerReportsRequestProto.newBuilder(); - - // TODO: support delta based container report - crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setType(ContainerReportsRequestProto.reportType.fullReport); + ContainerReportsProto.Builder crBuilder = + ContainerReportsProto.newBuilder(); for (ContainerData container: containers) { long containerId = container.getContainerID(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java index a5ad6c2..87b9656 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/StorageLocationReport.java @@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.protocol.proto. - StorageContainerDatanodeProtocolProtos.SCMStorageReport; + StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.StorageTypeProto; @@ -137,8 +137,8 @@ public class StorageLocationReport { * @return SCMStorageReport * @throws IOException In case, the storage type specified is invalid. */ - public SCMStorageReport getProtoBufMessage() throws IOException{ - SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); + public StorageReportProto getProtoBufMessage() throws IOException{ + StorageReportProto.Builder srb = StorageReportProto.newBuilder(); return srb.setStorageUuid(getId()) .setCapacity(getCapacity()) .setScmUsed(getScmUsed()) @@ -156,7 +156,7 @@ public class StorageLocationReport { * @throws IOException in case of invalid storage type */ - public static StorageLocationReport getFromProtobuf(SCMStorageReport report) + public static StorageLocationReport getFromProtobuf(StorageReportProto report) throws IOException { StorageLocationReport.Builder builder = StorageLocationReport.newBuilder(); builder.setId(report.getStorageUuid()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java index ba70953..49b68dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java @@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.util.RwLock; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import java.io.IOException; @@ -171,14 +171,14 @@ public interface ContainerManager extends RwLock { * Get the Node Report of container storage usage. * @return node report. */ - SCMNodeReport getNodeReport() throws IOException; + NodeReportProto getNodeReport() throws IOException; /** * Gets container report. * @return container report. * @throws IOException */ - ContainerReportsRequestProto getContainerReport() throws IOException; + ContainerReportsProto getContainerReport() throws IOException; /** * Gets container reports. http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index a8fe494..d0a4217 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -21,8 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler - .CloseContainerHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler @@ -86,7 +85,7 @@ public class DatanodeStateMachine implements Closeable { // When we add new handlers just adding a new handler here should do the // trick. commandDispatcher = CommandDispatcher.newBuilder() - .addHandler(new CloseContainerHandler()) + .addHandler(new CloseContainerCommandHandler()) .addHandler(new DeleteBlocksCommandHandler( container.getContainerManager(), conf)) .setConnectionManager(connectionManager) @@ -131,7 +130,7 @@ public class DatanodeStateMachine implements Closeable { try { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); nextHB.set(Time.monotonicNow() + heartbeatFrequency); - context.setReportState(container.getNodeReport()); + context.setNodeReport(container.getNodeReport()); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); now = Time.monotonicNow(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 27eb57e..4e3c610 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.container.common.statemachine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .InitDatanodeState; @@ -52,7 +52,7 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final Configuration conf; private DatanodeStateMachine.DatanodeStates state; - private SCMNodeReport nrState; + private NodeReportProto dnReport; /** * Constructs a StateContext. @@ -69,7 +69,7 @@ public class StateContext { commandQueue = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); - nrState = SCMNodeReport.getDefaultInstance(); + dnReport = NodeReportProto.getDefaultInstance(); } /** @@ -144,16 +144,16 @@ public class StateContext { * Returns the node report of the datanode state context. * @return the node report. */ - public SCMNodeReport getNodeReport() { - return nrState; + public NodeReportProto getNodeReport() { + return dnReport; } /** * Sets the storage location report of the datanode state context. - * @param nrReport - node report + * @param nodeReport node report */ - public void setReportState(SCMNodeReport nrReport) { - this.nrState = nrReport; + public void setNodeReport(NodeReportProto nodeReport) { + this.dnReport = nodeReport; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java new file mode 100644 index 0000000..e8c602d --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler for close container command received from SCM. + */ +public class CloseContainerCommandHandler implements CommandHandler { + static final Logger LOG = + LoggerFactory.getLogger(CloseContainerCommandHandler.class); + private int invocationCount; + private long totalTime; + + /** + * Constructs a ContainerReport handler. + */ + public CloseContainerCommandHandler() { + } + + /** + * Handles a given SCM command. + * + * @param command - SCM Command + * @param container - Ozone Container. + * @param context - Current Context. + * @param connectionManager - The SCMs that we are talking to. + */ + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + LOG.debug("Processing Close Container command."); + invocationCount++; + long startTime = Time.monotonicNow(); + // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA) + long containerID = -1; + try { + + CloseContainerCommandProto + closeContainerProto = + CloseContainerCommandProto + .parseFrom(command.getProtoBufMessage()); + containerID = closeContainerProto.getContainerID(); + + container.getContainerManager().closeContainer(containerID); + + } catch (Exception e) { + LOG.error("Can't close container " + containerID, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + } + + /** + * Returns the command type that this command handler handles. + * + * @return Type + */ + @Override + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.closeContainerCommand; + } + + /** + * Returns number of times this handler has been invoked. + * + * @return int + */ + @Override + public int getInvocationCount() { + return invocationCount; + } + + /** + * Returns the average time this function takes to run. + * + * @return long + */ + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java deleted file mode 100644 index d8adc7d..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Container Report handler. - */ -public class CloseContainerHandler implements CommandHandler { - static final Logger LOG = - LoggerFactory.getLogger(CloseContainerHandler.class); - private int invocationCount; - private long totalTime; - - /** - * Constructs a ContainerReport handler. - */ - public CloseContainerHandler() { - } - - /** - * Handles a given SCM command. - * - * @param command - SCM Command - * @param container - Ozone Container. - * @param context - Current Context. - * @param connectionManager - The SCMs that we are talking to. - */ - @Override - public void handle(SCMCommand command, OzoneContainer container, - StateContext context, SCMConnectionManager connectionManager) { - LOG.debug("Processing Close Container command."); - invocationCount++; - long startTime = Time.monotonicNow(); - // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA) - long containerID = -1; - try { - - SCMCloseContainerCmdResponseProto - closeContainerProto = - SCMCloseContainerCmdResponseProto - .parseFrom(command.getProtoBufMessage()); - containerID = closeContainerProto.getContainerID(); - - container.getContainerManager().closeContainer(containerID); - - } catch (Exception e) { - LOG.error("Can't close container " + containerID, e); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; - } - } - - /** - * Returns the command type that this command handler handles. - * - * @return Type - */ - @Override - public SCMCmdType getCommandType() { - return SCMCmdType.closeContainerCommand; - } - - /** - * Returns number of times this handler has been invoked. - * - * @return int - */ - @Override - public int getInvocationCount() { - return invocationCount; - } - - /** - * Returns the average time this function takes to run. - * - * @return long - */ - @Override - public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 40feca3..aedd78f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -18,7 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -38,7 +39,7 @@ public final class CommandDispatcher { static final Logger LOG = LoggerFactory.getLogger(CommandDispatcher.class); private final StateContext context; - private final Map<SCMCmdType, CommandHandler> handlerMap; + private final Map<Type, CommandHandler> handlerMap; private final OzoneContainer container; private final SCMConnectionManager connectionManager; http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 13d9f72..60e2dc4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -17,8 +17,10 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType; -import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -42,7 +44,7 @@ public interface CommandHandler { * Returns the command type that this command handler handles. * @return Type */ - SCMCmdType getCommandType(); + SCMCommandProto.Type getCommandType(); /** * Returns number of times this handler has been invoked. http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 5231660..ab69bdc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; @@ -26,8 +28,6 @@ import org.apache.hadoop.hdds.protocol.proto .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.helpers @@ -73,10 +73,10 @@ public class DeleteBlocksCommandHandler implements CommandHandler { @Override public void handle(SCMCommand command, OzoneContainer container, StateContext context, SCMConnectionManager connectionManager) { - if (command.getType() != SCMCmdType.deleteBlocksCommand) { + if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { LOG.warn("Skipping handling command, expected command " + "type {} but found {}", - SCMCmdType.deleteBlocksCommand, command.getType()); + SCMCommandProto.Type.deleteBlocksCommand, command.getType()); return; } LOG.debug("Processing block deletion command."); @@ -193,8 +193,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler { } @Override - public SCMCmdType getCommandType() { - return SCMCmdType.deleteBlocksCommand; + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.deleteBlocksCommand; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 01b4c72..337cdfb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.container.common.helpers @@ -97,8 +99,13 @@ public class HeartbeatEndpointTask try { Preconditions.checkState(this.datanodeDetailsProto != null); + SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetailsProto) + .setNodeReport(context.getNodeReport()) + .build(); + SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() - .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport()); + .sendHeartbeat(request); processResponse(reponse, datanodeDetailsProto); rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.zeroMissedCount(); @@ -125,13 +132,13 @@ public class HeartbeatEndpointTask */ private void processResponse(SCMHeartbeatResponseProto response, final DatanodeDetailsProto datanodeDetails) { - for (SCMCommandResponseProto commandResponseProto : response + Preconditions.checkState(response.getDatanodeUUID() + .equalsIgnoreCase(datanodeDetails.getUuid()), + "Unexpected datanode ID in the response."); + // Verify the response is indeed for this datanode. + for (SCMCommandProto commandResponseProto : response .getCommandsList()) { - // Verify the response is indeed for this datanode. - Preconditions.checkState(commandResponseProto.getDatanodeUUID() - .equalsIgnoreCase(datanodeDetails.getUuid()), - "Unexpected datanode ID in the response."); - switch (commandResponseProto.getCmdType()) { + switch (commandResponseProto.getCommandType()) { case reregisterCommand: if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { if (LOG.isDebugEnabled()) { @@ -148,7 +155,8 @@ public class HeartbeatEndpointTask break; case deleteBlocksCommand: DeleteBlocksCommand db = DeleteBlocksCommand - .getFromProtobuf(commandResponseProto.getDeleteBlocksProto()); + .getFromProtobuf( + commandResponseProto.getDeleteBlocksCommandProto()); if (!db.blocksTobeDeleted().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug(DeletedContainerBlocksSummary @@ -161,7 +169,7 @@ public class HeartbeatEndpointTask case closeContainerCommand: CloseContainerCommand closeContainer = CloseContainerCommand.getFromProtobuf( - commandResponseProto.getCloseContainerProto()); + commandResponseProto.getCloseContainerCommandProto()); if (LOG.isDebugEnabled()) { LOG.debug("Received SCM container close request for container {}", closeContainer.getContainerID()); @@ -170,7 +178,7 @@ public class HeartbeatEndpointTask break; default: throw new IllegalArgumentException("Unknown response : " - + commandResponseProto.getCmdType().name()); + + commandResponseProto.getCommandType().name()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index 77a7084..12b48ab 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,11 +104,11 @@ public final class RegisterEndpointTask implements rpcEndPoint.lock(); try { - ContainerReportsRequestProto contianerReport = datanodeContainerManager + ContainerReportsProto contianerReport = datanodeContainerManager .getContainerReport(); - SCMNodeReport nodeReport = datanodeContainerManager.getNodeReport(); + NodeReportProto nodeReport = datanodeContainerManager.getNodeReport(); // TODO : Add responses to the command Queue. - SCMRegisteredCmdResponseProto response = rpcEndPoint.getEndPoint() + SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint() .register(datanodeDetails.getProtoBufMessage(), nodeReport, contianerReport); Preconditions.checkState(UUID.fromString(response.getDatanodeUUID()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 6758479..b357fef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -19,14 +19,14 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; @@ -219,7 +219,7 @@ public class OzoneContainer { /** * Returns node report of container storage usage. */ - public SCMNodeReport getNodeReport() throws IOException { + public NodeReportProto getNodeReport() throws IOException { return this.manager.getNodeReport(); } @@ -255,7 +255,7 @@ public class OzoneContainer { * @return - container report. * @throws IOException */ - public ContainerReportsRequestProto getContainerReport() throws IOException { + public ContainerReportsProto getContainerReport() throws IOException { return this.manager.getContainerReport(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index e2a3bf5..a950a31 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -19,20 +19,20 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -55,13 +55,12 @@ public interface StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. - * @param datanodeDetails - Datanode Details. - * @param nodeReport - node report state + * @param heartbeat Heartbeat * @return - SCMHeartbeatResponseProto * @throws IOException */ - SCMHeartbeatResponseProto sendHeartbeat(DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport) throws IOException; + SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat) + throws IOException; /** * Register Datanode. @@ -70,20 +69,11 @@ public interface StorageContainerDatanodeProtocol { * @param containerReportsRequestProto - Container Reports. * @return SCM Command. */ - SCMRegisteredCmdResponseProto register(DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport, ContainerReportsRequestProto + SCMRegisteredResponseProto register(DatanodeDetailsProto datanodeDetails, + NodeReportProto nodeReport, ContainerReportsProto containerReportsRequestProto) throws IOException; /** - * Send a container report. - * @param reports -- Container report. - * @return container reports response. - * @throws IOException - */ - ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) throws IOException; - - /** * Used by datanode to send block deletion ACK to SCM. * @param request block deletion transactions. * @return block deletion transaction response. http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java index 14038fb..790f58a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java @@ -18,11 +18,12 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import java.util.List; @@ -49,11 +50,11 @@ public interface StorageContainerNodeProtocol { /** * Register the node if the node finds that it is not registered with any SCM. * @param datanodeDetails DatanodeDetails - * @param nodeReport SCMNodeReport + * @param nodeReport NodeReportProto * @return SCMHeartbeatResponseProto */ - SCMCommand register(DatanodeDetailsProto datanodeDetails, SCMNodeReport - nodeReport); + RegisteredCommand register(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport); /** * Send heartbeat to indicate the datanode is alive and doing well. @@ -61,7 +62,7 @@ public interface StorageContainerNodeProtocol { * @param nodeReport - node report. * @return SCMheartbeat response list */ - List<SCMCommand> sendHeartbeat(DatanodeDetailsProto datanodeDetails, - SCMNodeReport nodeReport); + List<SCMCommand> sendHeartbeat(DatanodeDetails datanodeDetails, + NodeReportProto nodeReport); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index d1d6488..4f4f82b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -19,18 +19,16 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType.closeContainerCommand; /** * Asks datanode to close a container. */ public class CloseContainerCommand - extends SCMCommand<SCMCloseContainerCmdResponseProto> { + extends SCMCommand<CloseContainerCommandProto> { private long containerID; @@ -44,8 +42,8 @@ public class CloseContainerCommand * @return Type */ @Override - public SCMCmdType getType() { - return closeContainerCommand; + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.closeContainerCommand; } /** @@ -58,13 +56,13 @@ public class CloseContainerCommand return getProto().toByteArray(); } - public SCMCloseContainerCmdResponseProto getProto() { - return SCMCloseContainerCmdResponseProto.newBuilder() + public CloseContainerCommandProto getProto() { + return CloseContainerCommandProto.newBuilder() .setContainerID(containerID).build(); } public static CloseContainerCommand getFromProtobuf( - SCMCloseContainerCmdResponseProto closeContainerProto) { + CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); return new CloseContainerCommand(closeContainerProto.getContainerID()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java index a11ca25..4fa33f6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java @@ -18,11 +18,11 @@ package org.apache.hadoop.ozone.protocol.commands; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto; import java.util.List; @@ -30,7 +30,7 @@ import java.util.List; * A SCM command asks a datanode to delete a number of blocks. */ public class DeleteBlocksCommand extends - SCMCommand<SCMDeleteBlocksCmdResponseProto> { + SCMCommand<DeleteBlocksCommandProto> { private List<DeletedBlocksTransaction> blocksTobeDeleted; @@ -44,8 +44,8 @@ public class DeleteBlocksCommand extends } @Override - public SCMCmdType getType() { - return SCMCmdType.deleteBlocksCommand; + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.deleteBlocksCommand; } @Override @@ -54,13 +54,13 @@ public class DeleteBlocksCommand extends } public static DeleteBlocksCommand getFromProtobuf( - SCMDeleteBlocksCmdResponseProto deleteBlocksProto) { + DeleteBlocksCommandProto deleteBlocksProto) { return new DeleteBlocksCommand(deleteBlocksProto .getDeletedBlocksTransactionsList()); } - public SCMDeleteBlocksCmdResponseProto getProto() { - return SCMDeleteBlocksCmdResponseProto.newBuilder() + public DeleteBlocksCommandProto getProto() { + return DeleteBlocksCommandProto.newBuilder() .addAllDeletedBlocksTransactions(blocksTobeDeleted).build(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java index 69f2c18..3a5da72 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -19,18 +19,15 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto .ErrorCode; /** * Response to Datanode Register call. */ -public class RegisteredCommand extends - SCMCommand<SCMRegisteredCmdResponseProto> { +public class RegisteredCommand { private String datanodeUUID; private String clusterID; private ErrorCode error; @@ -60,16 +57,6 @@ public class RegisteredCommand extends } /** - * Returns the type of this command. - * - * @return Type - */ - @Override - public SCMCmdType getType() { - return SCMCmdType.registeredCommand; - } - - /** * Returns datanode UUID. * * @return - Datanode ID. @@ -117,10 +104,9 @@ public class RegisteredCommand extends * * @return A protobuf message. */ - @Override public byte[] getProtoBufMessage() { - SCMRegisteredCmdResponseProto.Builder builder = - SCMRegisteredCmdResponseProto.newBuilder() + SCMRegisteredResponseProto.Builder builder = + SCMRegisteredResponseProto.newBuilder() .setClusterID(this.clusterID) .setDatanodeUUID(this.datanodeUUID) .setErrorCode(this.error); @@ -157,7 +143,7 @@ public class RegisteredCommand extends * @param response - RegisteredCmdResponseProto * @return RegisteredCommand */ - public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto + public RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto response) { Preconditions.checkNotNull(response); if (response.hasHostname() && response.hasIpAddress()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java index c167d59..953e31a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java @@ -18,18 +18,16 @@ package org.apache.hadoop.ozone.protocol.commands; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType.reregisterCommand; -import static org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto; /** * Informs a datanode to register itself with SCM again. */ public class ReregisterCommand extends - SCMCommand<SCMReregisterCmdResponseProto>{ + SCMCommand<ReregisterCommandProto>{ /** * Returns the type of this command. @@ -37,8 +35,8 @@ public class ReregisterCommand extends * @return Type */ @Override - public SCMCmdType getType() { - return reregisterCommand; + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.reregisterCommand; } /** @@ -51,8 +49,8 @@ public class ReregisterCommand extends return getProto().toByteArray(); } - public SCMReregisterCmdResponseProto getProto() { - return SCMReregisterCmdResponseProto + public ReregisterCommandProto getProto() { + return ReregisterCommandProto .newBuilder() .build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 73e4194..35ca802 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCmdType; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; /** * A class that acts as the base class to convert between Java and SCM @@ -31,7 +31,7 @@ public abstract class SCMCommand<T extends GeneratedMessage> { * Returns the type of this command. * @return Type */ - public abstract SCMCmdType getType(); + public abstract SCMCommandProto.Type getType(); /** * Gets the protobuf message of this object. http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index a56c57a..40fe189 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -20,24 +20,23 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -123,22 +122,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB /** * Send by datanode to SCM. * - * @param datanodeDetailsProto - Datanode Details - * @param nodeReport - node report + * @param heartbeat node heartbeat * @throws IOException */ @Override public SCMHeartbeatResponseProto sendHeartbeat( - DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) throws IOException { - SCMHeartbeatRequestProto.Builder req = SCMHeartbeatRequestProto - .newBuilder(); - req.setDatanodeDetails(datanodeDetailsProto); - req.setNodeReport(nodeReport); + SCMHeartbeatRequestProto heartbeat) throws IOException { final SCMHeartbeatResponseProto resp; try { - resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); + resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -154,16 +147,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB * @return SCM Command. */ @Override - public SCMRegisteredCmdResponseProto register( - DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, - ContainerReportsRequestProto containerReportsRequestProto) + public SCMRegisteredResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, + ContainerReportsProto containerReportsRequestProto) throws IOException { SCMRegisterRequestProto.Builder req = SCMRegisterRequestProto.newBuilder(); req.setDatanodeDetails(datanodeDetailsProto); req.setContainerReport(containerReportsRequestProto); req.setNodeReport(nodeReport); - final SCMRegisteredCmdResponseProto response; + final SCMRegisteredResponseProto response; try { response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); } catch (ServiceException e) { @@ -172,25 +165,6 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB return response; } - /** - * Send a container report. - * - * @param reports -- Container report - * @return HeartbeatRespose.nullcommand. - * @throws IOException - */ - @Override - public ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) throws IOException { - final ContainerReportsResponseProto resp; - try { - resp = rpcProxy.sendContainerReport(NULL_RPC_CONTROLLER, reports); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return resp; - } - @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( ContainerBlocksDeletionACKProto deletedBlocks) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 07dba57..7e8bd8a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -19,18 +19,22 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -55,9 +59,8 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } @Override - public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto - getVersion(RpcController controller, - StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request) + public SCMVersionResponseProto getVersion(RpcController controller, + SCMVersionRequestProto request) throws ServiceException { try { return impl.getVersion(request); @@ -67,15 +70,13 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } @Override - public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto - register(RpcController controller, StorageContainerDatanodeProtocolProtos - .SCMRegisterRequestProto request) throws ServiceException { + public SCMRegisteredResponseProto register(RpcController controller, + SCMRegisterRequestProto request) throws ServiceException { try { - ContainerReportsRequestProto containerRequestProto = null; - SCMNodeReport scmNodeReport = null; - containerRequestProto = request.getContainerReport(); - scmNodeReport = request.getNodeReport(); - return impl.register(request.getDatanodeDetails(), scmNodeReport, + ContainerReportsProto containerRequestProto = request + .getContainerReport(); + NodeReportProto dnNodeReport = request.getNodeReport(); + return impl.register(request.getDatanodeDetails(), dnNodeReport, containerRequestProto); } catch (IOException e) { throw new ServiceException(e); @@ -83,27 +84,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } @Override - public SCMHeartbeatResponseProto - sendHeartbeat(RpcController controller, + public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller, SCMHeartbeatRequestProto request) throws ServiceException { try { - return impl.sendHeartbeat(request.getDatanodeDetails(), - request.getNodeReport()); + return impl.sendHeartbeat(request); } catch (IOException e) { throw new ServiceException(e); } } - @Override - public ContainerReportsResponseProto sendContainerReport( - RpcController controller, ContainerReportsRequestProto request) - throws ServiceException { - try { - return impl.sendContainerReport(request); - } catch (IOException e) { - throw new ServiceException(e); - } - } @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 20e6af8..cc131e0 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -34,81 +34,74 @@ package hadoop.hdds; import "hdds.proto"; +/** + * Request for version info of the software stack on the server. + */ +message SCMVersionRequestProto {} /** -* This message is send by data node to indicate that it is alive or it is -* registering with the node manager. +* Generic response that is send to a version request. This allows keys to be +* added on the fly and protocol to remain stable. */ -message SCMHeartbeatRequestProto { - required DatanodeDetailsProto datanodeDetails = 1; - optional SCMNodeReport nodeReport = 2; +message SCMVersionResponseProto { + required uint32 softwareVersion = 1; + repeated hadoop.hdds.KeyValue keys = 2; } -/** -A container report contains the following information. -*/ -message ContainerInfo { - optional string finalhash = 2; - optional int64 size = 3; - optional int64 used = 4; - optional int64 keyCount = 5; - // TODO: move the io count to separate message - optional int64 readCount = 6; - optional int64 writeCount = 7; - optional int64 readBytes = 8; - optional int64 writeBytes = 9; - required int64 containerID = 10; - optional hadoop.hdds.LifeCycleState state = 11; +message SCMRegisterRequestProto { + required DatanodeDetailsProto datanodeDetails = 1; + required NodeReportProto nodeReport = 2; + required ContainerReportsProto containerReport = 3; } -// The deleted blocks which are stored in deletedBlock.db of scm. -// We don't use BlockID because this only contians multiple localIDs -// of the same containerID. -message DeletedBlocksTransaction { - required int64 txID = 1; - required int64 containerID = 2; - repeated int64 localID = 3; - // the retry time of sending deleting command to datanode. - required int32 count = 4; +/** + * Datanode ID returned by the SCM. This is similar to name node + * registeration of a datanode. + */ +message SCMRegisteredResponseProto { + enum ErrorCode { + success = 1; + errorNodeNotPermitted = 2; + } + required ErrorCode errorCode = 1; + required string datanodeUUID = 2; + required string clusterID = 3; + optional SCMNodeAddressList addressList = 4; + optional string hostname = 5; + optional string ipAddress = 6; } /** -A set of container reports, max count is generally set to -8192 since that keeps the size of the reports under 1 MB. +* This message is send by data node to indicate that it is alive or it is +* registering with the node manager. */ -message ContainerReportsRequestProto { - enum reportType { - fullReport = 0; - deltaReport = 1; - } +message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; - repeated ContainerInfo reports = 2; - required reportType type = 3; + optional NodeReportProto nodeReport = 2; + optional ContainerReportsProto containerReport = 3; } -message ContainerReportsResponseProto { +/* + * A group of commands for the datanode to execute + */ +message SCMHeartbeatResponseProto { + required string datanodeUUID = 1; + repeated SCMCommandProto commands = 2; } -/** -* This message is send along with the heart beat to report datanode -* storage utilization by SCM. -*/ -message SCMNodeReport { - repeated SCMStorageReport storageReport = 1; +message SCMNodeAddressList { + repeated string addressList = 1; } /** - * Types of recognized storage media. - */ -enum StorageTypeProto { - DISK = 1; - SSD = 2; - ARCHIVE = 3; - RAM_DISK = 4; - PROVIDED = 5; +* This message is send along with the heart beat to report datanode +* storage utilization to SCM. +*/ +message NodeReportProto { + repeated StorageReportProto storageReport = 1; } -message SCMStorageReport { +message StorageReportProto { required string storageUuid = 1; required string storageLocation = 2; optional uint64 capacity = 3 [default = 0]; @@ -118,107 +111,82 @@ message SCMStorageReport { optional bool failed = 7 [default = false]; } -message SCMRegisterRequestProto { - required DatanodeDetailsProto datanodeDetails = 1; - required SCMNodeReport nodeReport = 2; - required ContainerReportsRequestProto containerReport = 3; -} - -/** - * Request for version info of the software stack on the server. - */ -message SCMVersionRequestProto { - -} - -/** -* Generic response that is send to a version request. This allows keys to be -* added on the fly and protocol to remain stable. -*/ -message SCMVersionResponseProto { - required uint32 softwareVersion = 1; - repeated hadoop.hdds.KeyValue keys = 2; -} - -message SCMNodeAddressList { - repeated string addressList = 1; -} - /** - * Datanode ID returned by the SCM. This is similar to name node - * registeration of a datanode. + * Types of recognized storage media. */ -message SCMRegisteredCmdResponseProto { - enum ErrorCode { - success = 1; - errorNodeNotPermitted = 2; - } - required ErrorCode errorCode = 2; - required string datanodeUUID = 3; - required string clusterID = 4; - optional SCMNodeAddressList addressList = 5; - optional string hostname = 6; - optional string ipAddress = 7; +enum StorageTypeProto { + DISK = 1; + SSD = 2; + ARCHIVE = 3; + RAM_DISK = 4; + PROVIDED = 5; } /** - * SCM informs a datanode to register itself again. - * With recieving this command, datanode will transit to REGISTER state. - */ -message SCMReregisterCmdResponseProto {} - -/** -This command tells the data node to send in the container report when possible +A set of container reports, max count is generally set to +8192 since that keeps the size of the reports under 1 MB. */ -message SendContainerReportProto { +message ContainerReportsProto { + repeated ContainerInfo reports = 2; } -/** -This command asks the datanode to close a specific container. -*/ -message SCMCloseContainerCmdResponseProto { - required int64 containerID = 1; -} /** -Type of commands supported by SCM to datanode protocol. +A container report contains the following information. */ -enum SCMCmdType { - versionCommand = 2; - registeredCommand = 3; - reregisterCommand = 4; - deleteBlocksCommand = 5; - closeContainerCommand = 6; +message ContainerInfo { + optional string finalhash = 1; + optional int64 size = 2; + optional int64 used = 3; + optional int64 keyCount = 4; + // TODO: move the io count to separate message + optional int64 readCount = 5; + optional int64 writeCount = 6; + optional int64 readBytes = 7; + optional int64 writeBytes = 8; + required int64 containerID = 9; + optional hadoop.hdds.LifeCycleState state = 10; } /* * These are commands returned by SCM for to the datanode to execute. */ -message SCMCommandResponseProto { - required SCMCmdType cmdType = 2; // Type of the command - optional SCMRegisteredCmdResponseProto registeredProto = 3; - optional SCMVersionResponseProto versionProto = 4; - optional SCMReregisterCmdResponseProto reregisterProto = 5; - optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 6; - required string datanodeUUID = 7; - optional SCMCloseContainerCmdResponseProto closeContainerProto = 8; +message SCMCommandProto { + enum Type { + reregisterCommand = 1; + deleteBlocksCommand = 2; + closeContainerCommand = 3; + deleteContainerCommand = 4; + } + // TODO: once we start using protoc 3.x, refactor this message using "oneof" + required Type commandType = 1; + optional ReregisterCommandProto reregisterCommandProto = 2; + optional DeleteBlocksCommandProto deleteBlocksCommandProto = 3; + optional CloseContainerCommandProto closeContainerCommandProto = 4; + optional DeleteContainerCommandProto deleteContainerCommandProto = 5; } - -/* - * A group of commands for the datanode to execute +/** + * SCM informs a datanode to register itself again. + * With recieving this command, datanode will transit to REGISTER state. */ -message SCMHeartbeatResponseProto { - repeated SCMCommandResponseProto commands = 1; -} +message ReregisterCommandProto {} + // HB response from SCM, contains a list of block deletion transactions. -message SCMDeleteBlocksCmdResponseProto { +message DeleteBlocksCommandProto { repeated DeletedBlocksTransaction deletedBlocksTransactions = 1; } -// SendACK response returned by datanode to SCM, currently empty. -message ContainerBlocksDeletionACKResponseProto { +// The deleted blocks which are stored in deletedBlock.db of scm. +// We don't use BlockID because this only contians multiple localIDs +// of the same containerID. +message DeletedBlocksTransaction { + required int64 txID = 1; + required int64 containerID = 2; + repeated int64 localID = 3; + // the retry time of sending deleting command to datanode. + required int32 count = 4; } // ACK message datanode sent to SCM, contains the result of @@ -231,6 +199,24 @@ message ContainerBlocksDeletionACKProto { repeated DeleteBlockTransactionResult results = 1; } +// SendACK response returned by datanode to SCM, currently empty. +message ContainerBlocksDeletionACKResponseProto { +} + +/** +This command asks the datanode to close a specific container. +*/ +message CloseContainerCommandProto { + required int64 containerID = 1; +} + +/** +This command asks the datanode to close a specific container. +*/ +message DeleteContainerCommandProto { + required int64 containerID = 1; +} + /** * Protocol used from a datanode to StorageContainerManager. * @@ -305,7 +291,7 @@ service StorageContainerDatanodeProtocolService { /** * Registers a data node with SCM. */ - rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto); + rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto); /** * Send heartbeat from datanode to SCM. HB's under SCM looks more @@ -315,12 +301,6 @@ service StorageContainerDatanodeProtocolService { rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); /** - send container reports sends the container report to SCM. This will - return a null command as response. - */ - rpc sendContainerReport(ContainerReportsRequestProto) returns (ContainerReportsResponseProto); - - /** * Sends the block deletion ACK to SCM. */ rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto); http://git-wip-us.apache.org/repos/asf/hadoop/blob/201440b9/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index c57a366..0ee6321 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -16,12 +16,12 @@ */ package org.apache.hadoop.ozone.container.common; -import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto @@ -30,13 +30,13 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMNodeReport; + .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; + .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; @@ -56,7 +56,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { // Map of datanode to containers private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers = new HashMap(); - private Map<DatanodeDetails, SCMNodeReport> nodeReports = new HashMap<>(); + private Map<DatanodeDetails, NodeReportProto> nodeReports = new HashMap<>(); /** * Returns the number of heartbeats made to this class. * @@ -166,20 +166,17 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { /** * Used by data node to send a Heartbeat. * - * @param datanodeDetailsProto - DatanodeDetailsProto. - * @param nodeReport - node report. + * @param heartbeat - node heartbeat. * @return - SCMHeartbeatResponseProto * @throws IOException */ @Override public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto - sendHeartbeat(DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) - throws IOException { + sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); sleepIfNeeded(); - List<SCMCommandResponseProto> + List<SCMCommandProto> cmdResponses = new LinkedList<>(); return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) .build(); @@ -193,21 +190,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { */ @Override public StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto register( - DatanodeDetailsProto datanodeDetailsProto, SCMNodeReport nodeReport, - StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto + .SCMRegisteredResponseProto register( + DatanodeDetailsProto datanodeDetailsProto, NodeReportProto nodeReport, + StorageContainerDatanodeProtocolProtos.ContainerReportsProto containerReportsRequestProto) throws IOException { rpcCount.incrementAndGet(); - sendContainerReport(containerReportsRequestProto); updateNodeReport(datanodeDetailsProto, nodeReport); sleepIfNeeded(); - return StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto + return StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto .newBuilder().setClusterID(UUID.randomUUID().toString()) .setDatanodeUUID(datanodeDetailsProto.getUuid()).setErrorCode( StorageContainerDatanodeProtocolProtos - .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); + .SCMRegisteredResponseProto.ErrorCode.success).build(); } /** @@ -216,19 +211,19 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { * @param nodeReport */ public void updateNodeReport(DatanodeDetailsProto datanodeDetailsProto, - SCMNodeReport nodeReport) { + NodeReportProto nodeReport) { DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf( datanodeDetailsProto); - SCMNodeReport.Builder datanodeReport = SCMNodeReport.newBuilder(); + NodeReportProto.Builder nodeReportProto = NodeReportProto.newBuilder(); - List<SCMStorageReport> storageReports = + List<StorageReportProto> storageReports = nodeReport.getStorageReportList(); - for(SCMStorageReport report : storageReports) { - datanodeReport.addStorageReport(report); + for(StorageReportProto report : storageReports) { + nodeReportProto.addStorageReport(report); } - nodeReports.put(datanode, datanodeReport.build()); + nodeReports.put(datanode, nodeReportProto.build()); } @@ -254,39 +249,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { return 0; } - /** - * Send a container report. - * - * @param reports -- Container report - * @return HeartbeatResponse.nullcommand. - * @throws IOException - */ - @Override - public StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto - sendContainerReport(StorageContainerDatanodeProtocolProtos - .ContainerReportsRequestProto reports) throws IOException { - Preconditions.checkNotNull(reports); - containerReportsCount.incrementAndGet(); - - DatanodeDetails datanode = DatanodeDetails.getFromProtoBuf( - reports.getDatanodeDetails()); - if (reports.getReportsCount() > 0) { - Map containers = nodeContainers.get(datanode); - if (containers == null) { - containers = new LinkedHashMap(); - nodeContainers.put(datanode, containers); - } - - for (StorageContainerDatanodeProtocolProtos.ContainerInfo report: - reports.getReportsList()) { - containers.put(report.getContainerID(), report); - } - } - - return StorageContainerDatanodeProtocolProtos - .ContainerReportsResponseProto.newBuilder().build(); - } - @Override public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( ContainerBlocksDeletionACKProto request) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org