HDDS-861. SCMNodeManager unit tests are broken. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/919a6e43 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/919a6e43 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/919a6e43 Branch: refs/heads/HDDS-4 Commit: 919a6e431044ecc2b9648b20baddca7b1a947e75 Parents: ebb9245 Author: Xiaoyu Yao <[email protected]> Authored: Wed Nov 21 11:46:53 2018 -0800 Committer: Xiaoyu Yao <[email protected]> Committed: Wed Nov 21 11:46:53 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hdds/scm/node/SCMNodeManager.java | 4 +- .../hadoop/hdds/scm/node/TestNodeManager.java | 977 ------------------- .../hdds/scm/node/TestSCMNodeManager.java | 977 +++++++++++++++++++ 3 files changed, 980 insertions(+), 978 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/919a6e43/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 374ff90..61ddb5b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -192,7 +192,9 @@ public class SCMNodeManager // registration we call this, after adding to nodeStateMap. And also // from eventhandler it is called only if it has node Report. DatanodeInfo datanodeInfo = nodeStateManager.getNode(dnId); - datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); + if (nodeReport != null) { + datanodeInfo.updateStorageReports(nodeReport.getStorageReportList()); + } } catch (NodeNotFoundException e) { LOG.debug("SCM updateNodeStat based on heartbeat from previous " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/919a6e43/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java deleted file mode 100644 index 1ff6655..0000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ /dev/null @@ -1,977 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.node; - -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.StorageReportProto; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.server.events.EventQueue; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.PathUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mockito; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_DEADNODE_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_STALENODE_INTERVAL; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState - .HEALTHY; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; -import static org.hamcrest.core.StringStartsWith.startsWith; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Test the Node Manager class. - */ -public class TestNodeManager { - - private File testDir; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @BeforeClass - public static void init() throws IOException { - } - - @Before - public void setup() { - testDir = PathUtils.getTestDir( - TestNodeManager.class); - } - - @After - public void cleanup() { - FileUtil.fullyDelete(testDir); - } - - /** - * Returns a new copy of Configuration. - * - * @return Config - */ - OzoneConfiguration getConf() { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, - testDir.getAbsolutePath()); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, - TimeUnit.MILLISECONDS); - return conf; - } - - /** - * Creates a NodeManager. - * - * @param config - Config for the node manager. - * @return SCNNodeManager - * @throws IOException - */ - - SCMNodeManager createNodeManager(OzoneConfiguration config) - throws IOException { - EventQueue eventQueue = new EventQueue(); - eventQueue.addHandler(SCMEvents.NEW_NODE, - Mockito.mock(NewNodeHandler.class)); - eventQueue.addHandler(SCMEvents.STALE_NODE, - Mockito.mock(StaleNodeHandler.class)); - eventQueue.addHandler(SCMEvents.DEAD_NODE, - Mockito.mock(DeadNodeHandler.class)); - SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString(), null, eventQueue); - return nodeManager; - } - - /** - * Tests that Node manager handles heartbeats correctly, and comes out of - * chill Mode. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmHeartbeat() throws IOException, - InterruptedException, TimeoutException { - - try (SCMNodeManager nodeManager = createNodeManager(getConf())) { - int registeredNodes = 5; - // Send some heartbeats from different nodes. - for (int x = 0; x < registeredNodes; x++) { - DatanodeDetails datanodeDetails = TestUtils - .createRandomDatanodeAndRegister(nodeManager); - nodeManager.processHeartbeat(datanodeDetails); - } - - //TODO: wait for heartbeat to be processed - Thread.sleep(4 * 1000); - assertTrue("Heartbeat thread should have picked up the" + - "scheduled heartbeats.", - nodeManager.getAllNodes().size() == registeredNodes); - } - } - - /** - * asserts that if we send no heartbeats node manager stays in chillmode. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmNoHeartbeats() throws IOException, - InterruptedException, TimeoutException { - - try (SCMNodeManager nodeManager = createNodeManager(getConf())) { - //TODO: wait for heartbeat to be processed - Thread.sleep(4 * 1000); - assertTrue("No heartbeats, 0 nodes should be registered", - nodeManager.getAllNodes().size() == 0); - } - } - - /** - * Asserts that adding heartbeats after shutdown does not work. This implies - * that heartbeat thread has been shutdown safely by closing the node - * manager. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmShutdown() throws IOException, InterruptedException, - TimeoutException { - OzoneConfiguration conf = getConf(); - conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, - 100, TimeUnit.MILLISECONDS); - SCMNodeManager nodeManager = createNodeManager(conf); - DatanodeDetails datanodeDetails = TestUtils - .createRandomDatanodeAndRegister(nodeManager); - nodeManager.close(); - - // These should never be processed. - nodeManager.processHeartbeat(datanodeDetails); - - // Let us just wait for 2 seconds to prove that HBs are not processed. - Thread.sleep(2 * 1000); - - //TODO: add assertion - } - - /** - * Asserts that we detect as many healthy nodes as we have generated heartbeat - * for. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmHealthyNodeCount() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration conf = getConf(); - final int count = 10; - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - - for (int x = 0; x < count; x++) { - DatanodeDetails datanodeDetails = TestUtils - .createRandomDatanodeAndRegister(nodeManager); - nodeManager.processHeartbeat(datanodeDetails); - } - //TODO: wait for heartbeat to be processed - Thread.sleep(4 * 1000); - assertEquals(count, nodeManager.getNodeCount(HEALTHY)); - } - } - - /** - * Asserts that if user provides a value less than 5 times the heartbeat - * interval as the StaleNode Value, we throw since that is a QoS that we - * cannot maintain. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - - @Test - public void testScmSanityOfUserConfig1() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration conf = getConf(); - final int interval = 100; - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, - MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); - - // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, interval, MILLISECONDS); - - thrown.expect(IllegalArgumentException.class); - - // This string is a multiple of the interval value - thrown.expectMessage( - startsWith("100 is not within min = 500 or max = 100000")); - createNodeManager(conf); - } - - /** - * Asserts that if Stale Interval value is more than 5 times the value of HB - * processing thread it is a sane value. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmSanityOfUserConfig2() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration conf = getConf(); - final int interval = 100; - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, - TimeUnit.MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, TimeUnit.SECONDS); - - // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, MILLISECONDS); - createNodeManager(conf).close(); - } - - /** - * Asserts that a single node moves from Healthy to stale node, then from - * stale node to dead node if it misses enough heartbeats. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmDetectStaleAndDeadNode() throws IOException, - InterruptedException, TimeoutException { - final int interval = 100; - final int nodeCount = 10; - - OzoneConfiguration conf = getConf(); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, - MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); - - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount); - - - DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister( - nodeManager); - - // Heartbeat once - nodeManager.processHeartbeat(staleNode); - - // Heartbeat all other nodes. - for (DatanodeDetails dn : nodeList) { - nodeManager.processHeartbeat(dn); - } - - // Wait for 2 seconds .. and heartbeat good nodes again. - Thread.sleep(2 * 1000); - - for (DatanodeDetails dn : nodeList) { - nodeManager.processHeartbeat(dn); - } - - // Wait for 2 seconds, wait a total of 4 seconds to make sure that the - // node moves into stale state. - Thread.sleep(2 * 1000); - List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE); - assertEquals("Expected to find 1 stale node", - 1, nodeManager.getNodeCount(STALE)); - assertEquals("Expected to find 1 stale node", - 1, staleNodeList.size()); - assertEquals("Stale node is not the expected ID", staleNode - .getUuid(), staleNodeList.get(0).getUuid()); - Thread.sleep(1000); - - // heartbeat good nodes again. - for (DatanodeDetails dn : nodeList) { - nodeManager.processHeartbeat(dn); - } - - // 6 seconds is the dead window for this test , so we wait a total of - // 7 seconds to make sure that the node moves into dead state. - Thread.sleep(2 * 1000); - - // the stale node has been removed - staleNodeList = nodeManager.getNodes(STALE); - assertEquals("Expected to find 1 stale node", - 0, nodeManager.getNodeCount(STALE)); - assertEquals("Expected to find 1 stale node", - 0, staleNodeList.size()); - - // Check for the dead node now. - List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD); - assertEquals("Expected to find 1 dead node", 1, - nodeManager.getNodeCount(DEAD)); - assertEquals("Expected to find 1 dead node", - 1, deadNodeList.size()); - assertEquals("Dead node is not the expected ID", staleNode - .getUuid(), deadNodeList.get(0).getUuid()); - } - } - - /** - * Check for NPE when datanodeDetails is passed null for sendHeartbeat. - * - * @throws IOException - */ - @Test - public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException { - try (SCMNodeManager nodeManager = createNodeManager(getConf())) { - nodeManager.processHeartbeat(null); - } catch (NullPointerException npe) { - GenericTestUtils.assertExceptionContains("Heartbeat is missing " + - "DatanodeDetails.", npe); - } - } - - /** - * Asserts that a dead node, stale node and healthy nodes co-exist. The counts - * , lists and node ID match the expected node state. - * <p/> - * This test is pretty complicated because it explores all states of Node - * manager in a single test. Please read thru the comments to get an idea of - * the current state of the node Manager. - * <p/> - * This test is written like a state machine to avoid threads and concurrency - * issues. This test is replicated below with the use of threads. Avoiding - * threads make it easy to debug the state machine. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - /** - * These values are very important. Here is what it means so you don't - * have to look it up while reading this code. - * - * OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - This the frequency of the - * HB processing thread that is running in the SCM. This thread must run - * for the SCM to process the Heartbeats. - * - * OZONE_SCM_HEARTBEAT_INTERVAL - This is the frequency at which - * datanodes will send heartbeats to SCM. Please note: This is the only - * config value for node manager that is specified in seconds. We don't - * want SCM heartbeat resolution to be more than in seconds. - * In this test it is not used, but we are forced to set it because we - * have validation code that checks Stale Node interval and Dead Node - * interval is larger than the value of - * OZONE_SCM_HEARTBEAT_INTERVAL. - * - * OZONE_SCM_STALENODE_INTERVAL - This is the time that must elapse - * from the last heartbeat for us to mark a node as stale. In this test - * we set that to 3. That is if a node has not heartbeat SCM for last 3 - * seconds we will mark it as stale. - * - * OZONE_SCM_DEADNODE_INTERVAL - This is the time that must elapse - * from the last heartbeat for a node to be marked dead. We have an - * additional constraint that this must be at least 2 times bigger than - * Stale node Interval. - * - * With these we are trying to explore the state of this cluster with - * various timeouts. Each section is commented so that you can keep - * track of the state of the cluster nodes. - * - */ - - @Test - public void testScmClusterIsInExpectedState1() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration conf = getConf(); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, - MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); - - - /** - * Cluster state: Healthy: All nodes are heartbeat-ing like normal. - */ - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - DatanodeDetails healthyNode = - TestUtils.createRandomDatanodeAndRegister(nodeManager); - DatanodeDetails staleNode = - TestUtils.createRandomDatanodeAndRegister(nodeManager); - DatanodeDetails deadNode = - TestUtils.createRandomDatanodeAndRegister(nodeManager); - nodeManager.processHeartbeat(healthyNode); - nodeManager.processHeartbeat(staleNode); - nodeManager.processHeartbeat(deadNode); - - // Sleep so that heartbeat processing thread gets to run. - Thread.sleep(500); - - //Assert all nodes are healthy. - assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(3, nodeManager.getNodeCount(HEALTHY)); - - /** - * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which - * means that no node is heartbeating. All nodes should move to Stale. - */ - Thread.sleep(3 * 1000); - assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(3, nodeManager.getNodeCount(STALE)); - - - /** - * Cluster State : Move healthy node back to healthy state, move other 2 - * nodes to Stale State. - * - * We heartbeat healthy node after 1 second and let other 2 nodes elapse - * the 3 second windows. - */ - - nodeManager.processHeartbeat(healthyNode); - nodeManager.processHeartbeat(staleNode); - nodeManager.processHeartbeat(deadNode); - - Thread.sleep(1500); - nodeManager.processHeartbeat(healthyNode); - Thread.sleep(2 * 1000); - assertEquals(1, nodeManager.getNodeCount(HEALTHY)); - - - // 3.5 seconds from last heartbeat for the stale and deadNode. So those - // 2 nodes must move to Stale state and the healthy node must - // remain in the healthy State. - List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY); - assertEquals("Expected one healthy node", 1, healthyList.size()); - assertEquals("Healthy node is not the expected ID", healthyNode - .getUuid(), healthyList.get(0).getUuid()); - - assertEquals(2, nodeManager.getNodeCount(STALE)); - - /** - * Cluster State: Allow healthyNode to remain in healthy state and - * staleNode to move to stale state and deadNode to move to dead state. - */ - - nodeManager.processHeartbeat(healthyNode); - nodeManager.processHeartbeat(staleNode); - Thread.sleep(1500); - nodeManager.processHeartbeat(healthyNode); - Thread.sleep(2 * 1000); - - // 3.5 seconds have elapsed for stale node, so it moves into Stale. - // 7 seconds have elapsed for dead node, so it moves into dead. - // 2 Seconds have elapsed for healthy node, so it stays in healhty state. - healthyList = nodeManager.getNodes(HEALTHY); - List<DatanodeDetails> staleList = nodeManager.getNodes(STALE); - List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); - - assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(1, nodeManager.getNodeCount(HEALTHY)); - assertEquals(1, nodeManager.getNodeCount(STALE)); - assertEquals(1, nodeManager.getNodeCount(DEAD)); - - assertEquals("Expected one healthy node", - 1, healthyList.size()); - assertEquals("Healthy node is not the expected ID", healthyNode - .getUuid(), healthyList.get(0).getUuid()); - - assertEquals("Expected one stale node", - 1, staleList.size()); - assertEquals("Stale node is not the expected ID", staleNode - .getUuid(), staleList.get(0).getUuid()); - - assertEquals("Expected one dead node", - 1, deadList.size()); - assertEquals("Dead node is not the expected ID", deadNode - .getUuid(), deadList.get(0).getUuid()); - /** - * Cluster State : let us heartbeat all the nodes and verify that we get - * back all the nodes in healthy state. - */ - nodeManager.processHeartbeat(healthyNode); - nodeManager.processHeartbeat(staleNode); - nodeManager.processHeartbeat(deadNode); - Thread.sleep(500); - //Assert all nodes are healthy. - assertEquals(3, nodeManager.getAllNodes().size()); - assertEquals(3, nodeManager.getNodeCount(HEALTHY)); - } - } - - /** - * Heartbeat a given set of nodes at a specified frequency. - * - * @param manager - Node Manager - * @param list - List of datanodeIDs - * @param sleepDuration - Duration to sleep between heartbeats. - * @throws InterruptedException - */ - private void heartbeatNodeSet(SCMNodeManager manager, - List<DatanodeDetails> list, - int sleepDuration) throws InterruptedException { - while (!Thread.currentThread().isInterrupted()) { - for (DatanodeDetails dn : list) { - manager.processHeartbeat(dn); - } - Thread.sleep(sleepDuration); - } - } - - /** - * Create a set of Nodes with a given prefix. - * - * @param count - number of nodes. - * @return List of Nodes. - */ - private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int - count) { - List<DatanodeDetails> list = new ArrayList<>(); - for (int x = 0; x < count; x++) { - DatanodeDetails datanodeDetails = TestUtils - .createRandomDatanodeAndRegister(nodeManager); - list.add(datanodeDetails); - } - return list; - } - - /** - * Function that tells us if we found the right number of stale nodes. - * - * @param nodeManager - node manager - * @param count - number of stale nodes to look for. - * @return true if we found the expected number. - */ - private boolean findNodes(NodeManager nodeManager, int count, - HddsProtos.NodeState state) { - return count == nodeManager.getNodeCount(state); - } - - /** - * Asserts that we can create a set of nodes that send its heartbeats from - * different threads and NodeManager behaves as expected. - * - * @throws IOException - * @throws InterruptedException - */ - @Test - public void testScmClusterIsInExpectedState2() throws IOException, - InterruptedException, TimeoutException { - final int healthyCount = 5000; - final int staleCount = 100; - final int deadCount = 10; - - OzoneConfiguration conf = getConf(); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, - MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); - - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager, - healthyCount); - List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager, - staleCount); - List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager, - deadCount); - - Runnable healthyNodeTask = () -> { - try { - // 2 second heartbeat makes these nodes stay healthy. - heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000); - } catch (InterruptedException ignored) { - } - }; - - Runnable staleNodeTask = () -> { - try { - // 4 second heartbeat makes these nodes go to stale and back to - // healthy again. - heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000); - } catch (InterruptedException ignored) { - } - }; - - - // No Thread just one time HBs the node manager, so that these will be - // marked as dead nodes eventually. - for (DatanodeDetails dn : deadNodeList) { - nodeManager.processHeartbeat(dn); - } - - - Thread thread1 = new Thread(healthyNodeTask); - thread1.setDaemon(true); - thread1.start(); - - - Thread thread2 = new Thread(staleNodeTask); - thread2.setDaemon(true); - thread2.start(); - - Thread.sleep(10 * 1000); - - // Assert all healthy nodes are healthy now, this has to be a greater - // than check since Stale nodes can be healthy when we check the state. - - assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount); - - assertEquals(deadCount, nodeManager.getNodeCount(DEAD)); - - List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); - - for (DatanodeDetails node : deadList) { - assertTrue(deadNodeList.contains(node)); - } - - - - // Checking stale nodes is tricky since they have to move between - // healthy and stale to avoid becoming dead nodes. So we search for - // that state for a while, if we don't find that state waitfor will - // throw. - GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), - 500, 4 * 1000); - - thread1.interrupt(); - thread2.interrupt(); - } - } - - /** - * Asserts that we can handle 6000+ nodes heartbeating SCM. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmCanHandleScale() throws IOException, - InterruptedException, TimeoutException { - final int healthyCount = 3000; - final int staleCount = 3000; - OzoneConfiguration conf = getConf(); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, - MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, - SECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, - MILLISECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6 * 1000, - MILLISECONDS); - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - List<DatanodeDetails> healthyList = createNodeSet(nodeManager, - healthyCount); - List<DatanodeDetails> staleList = createNodeSet(nodeManager, - staleCount); - - Runnable healthyNodeTask = () -> { - try { - heartbeatNodeSet(nodeManager, healthyList, 2 * 1000); - } catch (InterruptedException ignored) { - - } - }; - - Runnable staleNodeTask = () -> { - try { - heartbeatNodeSet(nodeManager, staleList, 4 * 1000); - } catch (InterruptedException ignored) { - } - }; - - Thread thread1 = new Thread(healthyNodeTask); - thread1.setDaemon(true); - thread1.start(); - - Thread thread2 = new Thread(staleNodeTask); - thread2.setDaemon(true); - thread2.start(); - Thread.sleep(3 * 1000); - - GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), - 500, 20 * 1000); - assertEquals("Node count mismatch", - healthyCount + staleCount, nodeManager.getAllNodes().size()); - - thread1.interrupt(); - thread2.interrupt(); - } - } - - /** - * Test multiple nodes sending initial heartbeat with their node report. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - @Ignore - // TODO: Enable this after we implement NodeReportEvent handler. - public void testScmStatsFromNodeReport() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration conf = getConf(); - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, - MILLISECONDS); - final int nodeCount = 10; - final long capacity = 2000; - final long used = 100; - final long remaining = capacity - used; - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - for (int x = 0; x < nodeCount; x++) { - DatanodeDetails datanodeDetails = TestUtils - .createRandomDatanodeAndRegister(nodeManager); - UUID dnId = datanodeDetails.getUuid(); - long free = capacity - used; - String storagePath = testDir.getAbsolutePath() + "/" + dnId; - StorageReportProto report = TestUtils - .createStorageReport(dnId, storagePath, capacity, used, free, null); - nodeManager.processHeartbeat(datanodeDetails); - } - //TODO: wait for heartbeat to be processed - Thread.sleep(4 * 1000); - assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); - assertEquals(capacity * nodeCount, (long) nodeManager.getStats() - .getCapacity().get()); - assertEquals(used * nodeCount, (long) nodeManager.getStats() - .getScmUsed().get()); - assertEquals(remaining * nodeCount, (long) nodeManager.getStats() - .getRemaining().get()); - } - } - - /** - * Test single node stat update based on nodereport from different heartbeat - * status (healthy, stale and dead). - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - @Ignore - // TODO: Enable this after we implement NodeReportEvent handler. - public void testScmNodeReportUpdate() throws IOException, - InterruptedException, TimeoutException { - OzoneConfiguration conf = getConf(); - final int heartbeatCount = 5; - final int nodeCount = 1; - final int interval = 100; - - conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, - MILLISECONDS); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); - conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); - - try (SCMNodeManager nodeManager = createNodeManager(conf)) { - DatanodeDetails datanodeDetails = - TestUtils.createRandomDatanodeAndRegister(nodeManager); - final long capacity = 2000; - final long usedPerHeartbeat = 100; - UUID dnId = datanodeDetails.getUuid(); - for (int x = 0; x < heartbeatCount; x++) { - long scmUsed = x * usedPerHeartbeat; - long remaining = capacity - scmUsed; - String storagePath = testDir.getAbsolutePath() + "/" + dnId; - StorageReportProto report = TestUtils - .createStorageReport(dnId, storagePath, capacity, scmUsed, - remaining, null); - - nodeManager.processHeartbeat(datanodeDetails); - Thread.sleep(100); - } - - final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1); - final long expectedRemaining = capacity - expectedScmUsed; - - GenericTestUtils.waitFor( - () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, - 100, 4 * 1000); - - long foundCapacity = nodeManager.getStats().getCapacity().get(); - assertEquals(capacity, foundCapacity); - - long foundScmUsed = nodeManager.getStats().getScmUsed().get(); - assertEquals(expectedScmUsed, foundScmUsed); - - long foundRemaining = nodeManager.getStats().getRemaining().get(); - assertEquals(expectedRemaining, foundRemaining); - - // Test NodeManager#getNodeStats - assertEquals(nodeCount, nodeManager.getNodeStats().size()); - long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get() - .getCapacity().get(); - assertEquals(capacity, nodeCapacity); - - foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() - .get(); - assertEquals(expectedScmUsed, foundScmUsed); - - foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() - .getRemaining().get(); - assertEquals(expectedRemaining, foundRemaining); - - // Compare the result from - // NodeManager#getNodeStats and NodeManager#getNodeStat - SCMNodeStat stat1 = nodeManager.getNodeStats(). - get(datanodeDetails.getUuid()); - SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get(); - assertEquals(stat1, stat2); - - // Wait up to 4s so that the node becomes stale - // Verify the usage info should be unchanged. - GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(STALE) == 1, 100, - 4 * 1000); - assertEquals(nodeCount, nodeManager.getNodeStats().size()); - - foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() - .getCapacity().get(); - assertEquals(capacity, foundCapacity); - foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get() - .getScmUsed().get(); - assertEquals(expectedScmUsed, foundScmUsed); - - foundRemaining = nodeManager.getNodeStat(datanodeDetails).get(). - getRemaining().get(); - assertEquals(expectedRemaining, foundRemaining); - - // Wait up to 4 more seconds so the node becomes dead - // Verify usage info should be updated. - GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(DEAD) == 1, 100, - 4 * 1000); - - assertEquals(0, nodeManager.getNodeStats().size()); - foundCapacity = nodeManager.getStats().getCapacity().get(); - assertEquals(0, foundCapacity); - - foundScmUsed = nodeManager.getStats().getScmUsed().get(); - assertEquals(0, foundScmUsed); - - foundRemaining = nodeManager.getStats().getRemaining().get(); - assertEquals(0, foundRemaining); - - nodeManager.processHeartbeat(datanodeDetails); - - // Wait up to 5 seconds so that the dead node becomes healthy - // Verify usage info should be updated. - GenericTestUtils.waitFor( - () -> nodeManager.getNodeCount(HEALTHY) == 1, - 100, 5 * 1000); - GenericTestUtils.waitFor( - () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, - 100, 4 * 1000); - assertEquals(nodeCount, nodeManager.getNodeStats().size()); - foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() - .getCapacity().get(); - assertEquals(capacity, foundCapacity); - foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() - .get(); - assertEquals(expectedScmUsed, foundScmUsed); - foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() - .getRemaining().get(); - assertEquals(expectedRemaining, foundRemaining); - } - } - - @Test - public void testHandlingSCMCommandEvent() throws IOException { - OzoneConfiguration conf = getConf(); - conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, - 100, TimeUnit.MILLISECONDS); - - DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); - UUID dnId = datanodeDetails.getUuid(); - String storagePath = testDir.getAbsolutePath() + "/" + dnId; - StorageReportProto report = - TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null); - - EventQueue eq = new EventQueue(); - try (SCMNodeManager nodemanager = createNodeManager(conf)) { - eq.addHandler(DATANODE_COMMAND, nodemanager); - - nodemanager - .register(datanodeDetails, TestUtils.createNodeReport(report), - TestUtils.getRandomPipelineReports()); - eq.fireEvent(DATANODE_COMMAND, - new CommandForDatanode<>(datanodeDetails.getUuid(), - new CloseContainerCommand(1L, - PipelineID.randomId()))); - - eq.processAll(1000L); - List<SCMCommand> command = - nodemanager.processHeartbeat(datanodeDetails); - Assert.assertEquals(1, command.size()); - Assert - .assertEquals(command.get(0).getClass(), CloseContainerCommand.class); - } catch (IOException e) { - e.printStackTrace(); - throw e; - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/919a6e43/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java new file mode 100644 index 0000000..9f78d9a --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -0,0 +1,977 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState + .HEALTHY; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; +import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test the SCM Node Manager class. + */ +public class TestSCMNodeManager { + + private File testDir; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void init() throws IOException { + } + + @Before + public void setup() { + testDir = PathUtils.getTestDir( + TestSCMNodeManager.class); + } + + @After + public void cleanup() { + FileUtil.fullyDelete(testDir); + } + + /** + * Returns a new copy of Configuration. + * + * @return Config + */ + OzoneConfiguration getConf() { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + TimeUnit.MILLISECONDS); + return conf; + } + + /** + * Creates a NodeManager. + * + * @param config - Config for the node manager. + * @return SCNNodeManager + * @throws IOException + */ + + SCMNodeManager createNodeManager(OzoneConfiguration config) + throws IOException { + EventQueue eventQueue = new EventQueue(); + eventQueue.addHandler(SCMEvents.NEW_NODE, + Mockito.mock(NewNodeHandler.class)); + eventQueue.addHandler(SCMEvents.STALE_NODE, + Mockito.mock(StaleNodeHandler.class)); + eventQueue.addHandler(SCMEvents.DEAD_NODE, + Mockito.mock(DeadNodeHandler.class)); + SCMNodeManager nodeManager = new SCMNodeManager(config, + UUID.randomUUID().toString(), null, eventQueue); + return nodeManager; + } + + /** + * Tests that Node manager handles heartbeats correctly, and comes out of + * chill Mode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmHeartbeat() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + int registeredNodes = 5; + // Send some heartbeats from different nodes. + for (int x = 0; x < registeredNodes; x++) { + DatanodeDetails datanodeDetails = TestUtils + .createRandomDatanodeAndRegister(nodeManager); + nodeManager.processHeartbeat(datanodeDetails); + } + + //TODO: wait for heartbeat to be processed + Thread.sleep(4 * 1000); + assertTrue("Heartbeat thread should have picked up the" + + "scheduled heartbeats.", + nodeManager.getAllNodes().size() == registeredNodes); + } + } + + /** + * asserts that if we send no heartbeats node manager stays in chillmode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNoHeartbeats() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + //TODO: wait for heartbeat to be processed + Thread.sleep(4 * 1000); + assertTrue("No heartbeats, 0 nodes should be registered", + nodeManager.getAllNodes().size() == 0); + } + } + + /** + * Asserts that adding heartbeats after shutdown does not work. This implies + * that heartbeat thread has been shutdown safely by closing the node + * manager. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmShutdown() throws IOException, InterruptedException, + TimeoutException { + OzoneConfiguration conf = getConf(); + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + SCMNodeManager nodeManager = createNodeManager(conf); + DatanodeDetails datanodeDetails = TestUtils + .createRandomDatanodeAndRegister(nodeManager); + nodeManager.close(); + + // These should never be processed. + nodeManager.processHeartbeat(datanodeDetails); + + // Let us just wait for 2 seconds to prove that HBs are not processed. + Thread.sleep(2 * 1000); + + //TODO: add assertion + } + + /** + * Asserts that we detect as many healthy nodes as we have generated heartbeat + * for. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmHealthyNodeCount() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int count = 10; + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + + for (int x = 0; x < count; x++) { + DatanodeDetails datanodeDetails = TestUtils + .createRandomDatanodeAndRegister(nodeManager); + nodeManager.processHeartbeat(datanodeDetails); + } + //TODO: wait for heartbeat to be processed + Thread.sleep(4 * 1000); + assertEquals(count, nodeManager.getNodeCount(HEALTHY)); + } + } + + /** + * Asserts that if user provides a value less than 5 times the heartbeat + * interval as the StaleNode Value, we throw since that is a QoS that we + * cannot maintain. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + + @Test + public void testScmSanityOfUserConfig1() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int interval = 100; + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, interval, MILLISECONDS); + + thrown.expect(IllegalArgumentException.class); + + // This string is a multiple of the interval value + thrown.expectMessage( + startsWith("100 is not within min = 500 or max = 100000")); + createNodeManager(conf); + } + + /** + * Asserts that if Stale Interval value is more than 5 times the value of HB + * processing thread it is a sane value. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmSanityOfUserConfig2() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int interval = 100; + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, TimeUnit.SECONDS); + + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, MILLISECONDS); + createNodeManager(conf).close(); + } + + /** + * Asserts that a single node moves from Healthy to stale node, then from + * stale node to dead node if it misses enough heartbeats. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmDetectStaleAndDeadNode() throws IOException, + InterruptedException, TimeoutException { + final int interval = 100; + final int nodeCount = 10; + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount); + + + DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister( + nodeManager); + + // Heartbeat once + nodeManager.processHeartbeat(staleNode); + + // Heartbeat all other nodes. + for (DatanodeDetails dn : nodeList) { + nodeManager.processHeartbeat(dn); + } + + // Wait for 2 seconds .. and heartbeat good nodes again. + Thread.sleep(2 * 1000); + + for (DatanodeDetails dn : nodeList) { + nodeManager.processHeartbeat(dn); + } + + // Wait for 2 seconds, wait a total of 4 seconds to make sure that the + // node moves into stale state. + Thread.sleep(2 * 1000); + List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE); + assertEquals("Expected to find 1 stale node", + 1, nodeManager.getNodeCount(STALE)); + assertEquals("Expected to find 1 stale node", + 1, staleNodeList.size()); + assertEquals("Stale node is not the expected ID", staleNode + .getUuid(), staleNodeList.get(0).getUuid()); + Thread.sleep(1000); + + // heartbeat good nodes again. + for (DatanodeDetails dn : nodeList) { + nodeManager.processHeartbeat(dn); + } + + // 6 seconds is the dead window for this test , so we wait a total of + // 7 seconds to make sure that the node moves into dead state. + Thread.sleep(2 * 1000); + + // the stale node has been removed + staleNodeList = nodeManager.getNodes(STALE); + assertEquals("Expected to find 1 stale node", + 0, nodeManager.getNodeCount(STALE)); + assertEquals("Expected to find 1 stale node", + 0, staleNodeList.size()); + + // Check for the dead node now. + List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD); + assertEquals("Expected to find 1 dead node", 1, + nodeManager.getNodeCount(DEAD)); + assertEquals("Expected to find 1 dead node", + 1, deadNodeList.size()); + assertEquals("Dead node is not the expected ID", staleNode + .getUuid(), deadNodeList.get(0).getUuid()); + } + } + + /** + * Check for NPE when datanodeDetails is passed null for sendHeartbeat. + * + * @throws IOException + */ + @Test + public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + nodeManager.processHeartbeat(null); + } catch (NullPointerException npe) { + GenericTestUtils.assertExceptionContains("Heartbeat is missing " + + "DatanodeDetails.", npe); + } + } + + /** + * Asserts that a dead node, stale node and healthy nodes co-exist. The counts + * , lists and node ID match the expected node state. + * <p/> + * This test is pretty complicated because it explores all states of Node + * manager in a single test. Please read thru the comments to get an idea of + * the current state of the node Manager. + * <p/> + * This test is written like a state machine to avoid threads and concurrency + * issues. This test is replicated below with the use of threads. Avoiding + * threads make it easy to debug the state machine. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + /** + * These values are very important. Here is what it means so you don't + * have to look it up while reading this code. + * + * OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL - This the frequency of the + * HB processing thread that is running in the SCM. This thread must run + * for the SCM to process the Heartbeats. + * + * OZONE_SCM_HEARTBEAT_INTERVAL - This is the frequency at which + * datanodes will send heartbeats to SCM. Please note: This is the only + * config value for node manager that is specified in seconds. We don't + * want SCM heartbeat resolution to be more than in seconds. + * In this test it is not used, but we are forced to set it because we + * have validation code that checks Stale Node interval and Dead Node + * interval is larger than the value of + * OZONE_SCM_HEARTBEAT_INTERVAL. + * + * OZONE_SCM_STALENODE_INTERVAL - This is the time that must elapse + * from the last heartbeat for us to mark a node as stale. In this test + * we set that to 3. That is if a node has not heartbeat SCM for last 3 + * seconds we will mark it as stale. + * + * OZONE_SCM_DEADNODE_INTERVAL - This is the time that must elapse + * from the last heartbeat for a node to be marked dead. We have an + * additional constraint that this must be at least 2 times bigger than + * Stale node Interval. + * + * With these we are trying to explore the state of this cluster with + * various timeouts. Each section is commented so that you can keep + * track of the state of the cluster nodes. + * + */ + + @Test + public void testScmClusterIsInExpectedState1() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + + /** + * Cluster state: Healthy: All nodes are heartbeat-ing like normal. + */ + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeDetails healthyNode = + TestUtils.createRandomDatanodeAndRegister(nodeManager); + DatanodeDetails staleNode = + TestUtils.createRandomDatanodeAndRegister(nodeManager); + DatanodeDetails deadNode = + TestUtils.createRandomDatanodeAndRegister(nodeManager); + nodeManager.processHeartbeat(healthyNode); + nodeManager.processHeartbeat(staleNode); + nodeManager.processHeartbeat(deadNode); + + // Sleep so that heartbeat processing thread gets to run. + Thread.sleep(500); + + //Assert all nodes are healthy. + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + + /** + * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which + * means that no node is heartbeating. All nodes should move to Stale. + */ + Thread.sleep(3 * 1000); + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(STALE)); + + + /** + * Cluster State : Move healthy node back to healthy state, move other 2 + * nodes to Stale State. + * + * We heartbeat healthy node after 1 second and let other 2 nodes elapse + * the 3 second windows. + */ + + nodeManager.processHeartbeat(healthyNode); + nodeManager.processHeartbeat(staleNode); + nodeManager.processHeartbeat(deadNode); + + Thread.sleep(1500); + nodeManager.processHeartbeat(healthyNode); + Thread.sleep(2 * 1000); + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + + + // 3.5 seconds from last heartbeat for the stale and deadNode. So those + // 2 nodes must move to Stale state and the healthy node must + // remain in the healthy State. + List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY); + assertEquals("Expected one healthy node", 1, healthyList.size()); + assertEquals("Healthy node is not the expected ID", healthyNode + .getUuid(), healthyList.get(0).getUuid()); + + assertEquals(2, nodeManager.getNodeCount(STALE)); + + /** + * Cluster State: Allow healthyNode to remain in healthy state and + * staleNode to move to stale state and deadNode to move to dead state. + */ + + nodeManager.processHeartbeat(healthyNode); + nodeManager.processHeartbeat(staleNode); + Thread.sleep(1500); + nodeManager.processHeartbeat(healthyNode); + Thread.sleep(2 * 1000); + + // 3.5 seconds have elapsed for stale node, so it moves into Stale. + // 7 seconds have elapsed for dead node, so it moves into dead. + // 2 Seconds have elapsed for healthy node, so it stays in healhty state. + healthyList = nodeManager.getNodes(HEALTHY); + List<DatanodeDetails> staleList = nodeManager.getNodes(STALE); + List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); + + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + assertEquals(1, nodeManager.getNodeCount(STALE)); + assertEquals(1, nodeManager.getNodeCount(DEAD)); + + assertEquals("Expected one healthy node", + 1, healthyList.size()); + assertEquals("Healthy node is not the expected ID", healthyNode + .getUuid(), healthyList.get(0).getUuid()); + + assertEquals("Expected one stale node", + 1, staleList.size()); + assertEquals("Stale node is not the expected ID", staleNode + .getUuid(), staleList.get(0).getUuid()); + + assertEquals("Expected one dead node", + 1, deadList.size()); + assertEquals("Dead node is not the expected ID", deadNode + .getUuid(), deadList.get(0).getUuid()); + /** + * Cluster State : let us heartbeat all the nodes and verify that we get + * back all the nodes in healthy state. + */ + nodeManager.processHeartbeat(healthyNode); + nodeManager.processHeartbeat(staleNode); + nodeManager.processHeartbeat(deadNode); + Thread.sleep(500); + //Assert all nodes are healthy. + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + } + } + + /** + * Heartbeat a given set of nodes at a specified frequency. + * + * @param manager - Node Manager + * @param list - List of datanodeIDs + * @param sleepDuration - Duration to sleep between heartbeats. + * @throws InterruptedException + */ + private void heartbeatNodeSet(SCMNodeManager manager, + List<DatanodeDetails> list, + int sleepDuration) throws InterruptedException { + while (!Thread.currentThread().isInterrupted()) { + for (DatanodeDetails dn : list) { + manager.processHeartbeat(dn); + } + Thread.sleep(sleepDuration); + } + } + + /** + * Create a set of Nodes with a given prefix. + * + * @param count - number of nodes. + * @return List of Nodes. + */ + private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int + count) { + List<DatanodeDetails> list = new ArrayList<>(); + for (int x = 0; x < count; x++) { + DatanodeDetails datanodeDetails = TestUtils + .createRandomDatanodeAndRegister(nodeManager); + list.add(datanodeDetails); + } + return list; + } + + /** + * Function that tells us if we found the right number of stale nodes. + * + * @param nodeManager - node manager + * @param count - number of stale nodes to look for. + * @return true if we found the expected number. + */ + private boolean findNodes(NodeManager nodeManager, int count, + HddsProtos.NodeState state) { + return count == nodeManager.getNodeCount(state); + } + + /** + * Asserts that we can create a set of nodes that send its heartbeats from + * different threads and NodeManager behaves as expected. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testScmClusterIsInExpectedState2() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 5000; + final int staleCount = 100; + final int deadCount = 10; + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager, + healthyCount); + List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager, + staleCount); + List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager, + deadCount); + + Runnable healthyNodeTask = () -> { + try { + // 2 second heartbeat makes these nodes stay healthy. + heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000); + } catch (InterruptedException ignored) { + } + }; + + Runnable staleNodeTask = () -> { + try { + // 4 second heartbeat makes these nodes go to stale and back to + // healthy again. + heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000); + } catch (InterruptedException ignored) { + } + }; + + + // No Thread just one time HBs the node manager, so that these will be + // marked as dead nodes eventually. + for (DatanodeDetails dn : deadNodeList) { + nodeManager.processHeartbeat(dn); + } + + + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + + Thread thread2 = new Thread(staleNodeTask); + thread2.setDaemon(true); + thread2.start(); + + Thread.sleep(10 * 1000); + + // Assert all healthy nodes are healthy now, this has to be a greater + // than check since Stale nodes can be healthy when we check the state. + + assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount); + + assertEquals(deadCount, nodeManager.getNodeCount(DEAD)); + + List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD); + + for (DatanodeDetails node : deadList) { + assertTrue(deadNodeList.contains(node)); + } + + + + // Checking stale nodes is tricky since they have to move between + // healthy and stale to avoid becoming dead nodes. So we search for + // that state for a while, if we don't find that state waitfor will + // throw. + GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), + 500, 4 * 1000); + + thread1.interrupt(); + thread2.interrupt(); + } + } + + /** + * Asserts that we can handle 6000+ nodes heartbeating SCM. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmCanHandleScale() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 3000; + final int staleCount = 3000; + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, + MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, + SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3 * 1000, + MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6 * 1000, + MILLISECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List<DatanodeDetails> healthyList = createNodeSet(nodeManager, + healthyCount); + List<DatanodeDetails> staleList = createNodeSet(nodeManager, + staleCount); + + Runnable healthyNodeTask = () -> { + try { + heartbeatNodeSet(nodeManager, healthyList, 2 * 1000); + } catch (InterruptedException ignored) { + + } + }; + + Runnable staleNodeTask = () -> { + try { + heartbeatNodeSet(nodeManager, staleList, 4 * 1000); + } catch (InterruptedException ignored) { + } + }; + + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + Thread thread2 = new Thread(staleNodeTask); + thread2.setDaemon(true); + thread2.start(); + Thread.sleep(3 * 1000); + + GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), + 500, 20 * 1000); + assertEquals("Node count mismatch", + healthyCount + staleCount, nodeManager.getAllNodes().size()); + + thread1.interrupt(); + thread2.interrupt(); + } + } + + /** + * Test multiple nodes sending initial heartbeat with their node report. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + @Ignore + // TODO: Enable this after we implement NodeReportEvent handler. + public void testScmStatsFromNodeReport() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, + MILLISECONDS); + final int nodeCount = 10; + final long capacity = 2000; + final long used = 100; + final long remaining = capacity - used; + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + for (int x = 0; x < nodeCount; x++) { + DatanodeDetails datanodeDetails = TestUtils + .createRandomDatanodeAndRegister(nodeManager); + UUID dnId = datanodeDetails.getUuid(); + long free = capacity - used; + String storagePath = testDir.getAbsolutePath() + "/" + dnId; + StorageReportProto report = TestUtils + .createStorageReport(dnId, storagePath, capacity, used, free, null); + nodeManager.processHeartbeat(datanodeDetails); + } + //TODO: wait for heartbeat to be processed + Thread.sleep(4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); + assertEquals(capacity * nodeCount, (long) nodeManager.getStats() + .getCapacity().get()); + assertEquals(used * nodeCount, (long) nodeManager.getStats() + .getScmUsed().get()); + assertEquals(remaining * nodeCount, (long) nodeManager.getStats() + .getRemaining().get()); + } + } + + /** + * Test single node stat update based on nodereport from different heartbeat + * status (healthy, stale and dead). + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + @Ignore + // TODO: Enable this after we implement NodeReportEvent handler. + public void testScmNodeReportUpdate() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = getConf(); + final int heartbeatCount = 5; + final int nodeCount = 1; + final int interval = 100; + + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, interval, + MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeDetails datanodeDetails = + TestUtils.createRandomDatanodeAndRegister(nodeManager); + final long capacity = 2000; + final long usedPerHeartbeat = 100; + UUID dnId = datanodeDetails.getUuid(); + for (int x = 0; x < heartbeatCount; x++) { + long scmUsed = x * usedPerHeartbeat; + long remaining = capacity - scmUsed; + String storagePath = testDir.getAbsolutePath() + "/" + dnId; + StorageReportProto report = TestUtils + .createStorageReport(dnId, storagePath, capacity, scmUsed, + remaining, null); + + nodeManager.processHeartbeat(datanodeDetails); + Thread.sleep(100); + } + + final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1); + final long expectedRemaining = capacity - expectedScmUsed; + + GenericTestUtils.waitFor( + () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, + 100, 4 * 1000); + + long foundCapacity = nodeManager.getStats().getCapacity().get(); + assertEquals(capacity, foundCapacity); + + long foundScmUsed = nodeManager.getStats().getScmUsed().get(); + assertEquals(expectedScmUsed, foundScmUsed); + + long foundRemaining = nodeManager.getStats().getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + + // Test NodeManager#getNodeStats + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get() + .getCapacity().get(); + assertEquals(capacity, nodeCapacity); + + foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() + .get(); + assertEquals(expectedScmUsed, foundScmUsed); + + foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() + .getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + + // Compare the result from + // NodeManager#getNodeStats and NodeManager#getNodeStat + SCMNodeStat stat1 = nodeManager.getNodeStats(). + get(datanodeDetails.getUuid()); + SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get(); + assertEquals(stat1, stat2); + + // Wait up to 4s so that the node becomes stale + // Verify the usage info should be unchanged. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(STALE) == 1, 100, + 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + + foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() + .getCapacity().get(); + assertEquals(capacity, foundCapacity); + foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get() + .getScmUsed().get(); + assertEquals(expectedScmUsed, foundScmUsed); + + foundRemaining = nodeManager.getNodeStat(datanodeDetails).get(). + getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + + // Wait up to 4 more seconds so the node becomes dead + // Verify usage info should be updated. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(DEAD) == 1, 100, + 4 * 1000); + + assertEquals(0, nodeManager.getNodeStats().size()); + foundCapacity = nodeManager.getStats().getCapacity().get(); + assertEquals(0, foundCapacity); + + foundScmUsed = nodeManager.getStats().getScmUsed().get(); + assertEquals(0, foundScmUsed); + + foundRemaining = nodeManager.getStats().getRemaining().get(); + assertEquals(0, foundRemaining); + + nodeManager.processHeartbeat(datanodeDetails); + + // Wait up to 5 seconds so that the dead node becomes healthy + // Verify usage info should be updated. + GenericTestUtils.waitFor( + () -> nodeManager.getNodeCount(HEALTHY) == 1, + 100, 5 * 1000); + GenericTestUtils.waitFor( + () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, + 100, 4 * 1000); + assertEquals(nodeCount, nodeManager.getNodeStats().size()); + foundCapacity = nodeManager.getNodeStat(datanodeDetails).get() + .getCapacity().get(); + assertEquals(capacity, foundCapacity); + foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed() + .get(); + assertEquals(expectedScmUsed, foundScmUsed); + foundRemaining = nodeManager.getNodeStat(datanodeDetails).get() + .getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); + } + } + + @Test + public void testHandlingSCMCommandEvent() throws IOException { + OzoneConfiguration conf = getConf(); + conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + 100, TimeUnit.MILLISECONDS); + + DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); + UUID dnId = datanodeDetails.getUuid(); + String storagePath = testDir.getAbsolutePath() + "/" + dnId; + StorageReportProto report = + TestUtils.createStorageReport(dnId, storagePath, 100, 10, 90, null); + + EventQueue eq = new EventQueue(); + try (SCMNodeManager nodemanager = createNodeManager(conf)) { + eq.addHandler(DATANODE_COMMAND, nodemanager); + + nodemanager + .register(datanodeDetails, TestUtils.createNodeReport(report), + TestUtils.getRandomPipelineReports()); + eq.fireEvent(DATANODE_COMMAND, + new CommandForDatanode<>(datanodeDetails.getUuid(), + new CloseContainerCommand(1L, + PipelineID.randomId()))); + + eq.processAll(1000L); + List<SCMCommand> command = + nodemanager.processHeartbeat(datanodeDetails); + Assert.assertEquals(1, command.size()); + Assert + .assertEquals(command.get(0).getClass(), CloseContainerCommand.class); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
