HDDS-191. Queue SCMCommands via EventQueue in SCM. Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a55d6bba Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a55d6bba Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a55d6bba Branch: refs/heads/HDFS-12943 Commit: a55d6bba71c81c1c4e9d8cd11f55c78f10a548b0 Parents: 4ffe68a Author: Anu Engineer <aengin...@apache.org> Authored: Mon Jun 25 13:05:22 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Mon Jun 25 13:05:22 2018 -0700 ---------------------------------------------------------------------- .../protocol/commands/CommandForDatanode.java | 45 ++++++++++++++++++++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 20 ++++++++- .../scm/server/StorageContainerManager.java | 8 +++- .../hadoop/hdds/scm/node/TestNodeManager.java | 39 +++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a55d6bba/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java new file mode 100644 index 0000000..0c4964a --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import java.util.UUID; + +import com.google.protobuf.GeneratedMessage; + +/** + * Command for the datanode with the destination address. + */ +public class CommandForDatanode<T extends GeneratedMessage> { + + private final UUID datanodeId; + + private final SCMCommand<T> command; + + public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) { + this.datanodeId = datanodeId; + this.command = command; + } + + public UUID getDatanodeId() { + return datanodeId; + } + + public SCMCommand<T> getCommand() { + return command; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a55d6bba/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index b339fb7..fc8b013 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -25,6 +25,10 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.TypedEvent; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -42,11 +46,14 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import com.google.protobuf.GeneratedMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +107,8 @@ import static org.apache.hadoop.util.Time.monotonicNow; * as soon as you read it. */ public class SCMNodeManager - implements NodeManager, StorageContainerNodeProtocol { + implements NodeManager, StorageContainerNodeProtocol, + EventHandler<CommandForDatanode> { @VisibleForTesting static final Logger LOG = @@ -154,6 +162,9 @@ public class SCMNodeManager private final SCMNodePoolManager nodePoolManager; private final StorageContainerManager scmManager; + public static final Event<CommandForDatanode> DATANODE_COMMAND = + new TypedEvent<>(CommandForDatanode.class, "DATANODE_COMMAND"); + /** * Constructs SCM machine Manager. */ @@ -871,4 +882,11 @@ public class SCMNodeManager public void setStaleNodeIntervalMs(long interval) { this.staleNodeIntervalMs = interval; } + + @Override + public void onMessage(CommandForDatanode commandForDatanode, + EventPublisher publisher) { + addDatanodeCommand(commandForDatanode.getDatanodeId(), + commandForDatanode.getCommand()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a55d6bba/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 78f13cb..5725d23 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; @@ -51,6 +52,7 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.common.StorageInfo; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; @@ -161,8 +163,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl throw new SCMException("SCM not initialized.", ResultCodes .SCM_NOT_INITIALIZED); } + EventQueue eventQueue = new EventQueue(); + + SCMNodeManager nm = new SCMNodeManager(conf, scmStorage.getClusterID(), this); + scmNodeManager = nm; + eventQueue.addHandler(SCMNodeManager.DATANODE_COMMAND, nm); - scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this); scmContainerManager = new ContainerMapping(conf, getScmNodeManager(), cacheSize); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a55d6bba/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index 2b04d6b..824a135 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -29,7 +30,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; @@ -1165,4 +1169,39 @@ public class TestNodeManager { assertEquals(expectedRemaining, foundRemaining); } } + + @Test + public void testHandlingSCMCommandEvent() { + OzoneConfiguration conf = getConf(); + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + String dnId = datanodeDetails.getUuidString(); + String storagePath = testDir.getAbsolutePath() + "/" + dnId; + List<StorageReportProto> reports = + TestUtils.createStorageReport(100, 10, 90, + storagePath, null, dnId, 1); + + EventQueue eq = new EventQueue(); + try (SCMNodeManager nodemanager = createNodeManager(conf)) { + eq.addHandler(SCMNodeManager.DATANODE_COMMAND, nodemanager); + + nodemanager + .register(datanodeDetails, TestUtils.createNodeReport(reports)); + eq.fireEvent(SCMNodeManager.DATANODE_COMMAND, + new CommandForDatanode(datanodeDetails.getUuid(), + new CloseContainerCommand(1L, ReplicationType.STAND_ALONE))); + + eq.processAll(1000L); + List<SCMCommand> command = + nodemanager.sendHeartbeat(datanodeDetails, null); + Assert.assertEquals(1, command.size()); + Assert + .assertEquals(command.get(0).getClass(), CloseContainerCommand.class); + } catch (IOException e) { + e.printStackTrace(); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org