Repository: hadoop Updated Branches: refs/heads/trunk ff64d3571 -> ab90248b3
HDDS-400. Check global replication state for containers of dead node. Contributed by Elek, Marton. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab90248b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab90248b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab90248b Branch: refs/heads/trunk Commit: ab90248b30c2355cd8ae6660ea8af9758f95356a Parents: ff64d35 Author: Hanisha Koneru <hanishakon...@apache.org> Authored: Fri Sep 7 11:20:25 2018 -0700 Committer: Hanisha Koneru <hanishakon...@apache.org> Committed: Fri Sep 7 11:20:25 2018 -0700 ---------------------------------------------------------------------- .../scm/container/ContainerReportHandler.java | 49 +-- .../scm/container/ContainerStateManager.java | 38 +- .../hadoop/hdds/scm/node/DeadNodeHandler.java | 12 + .../org/apache/hadoop/hdds/scm/TestUtils.java | 45 ++ .../container/TestContainerReportHandler.java | 20 +- .../container/TestContainerStateManager.java | 96 +++++ .../hdds/scm/node/TestDeadNodeHandler.java | 95 +++-- .../container/TestContainerStateManager.java | 415 ------------------- .../TestContainerStateManagerIntegration.java | 415 +++++++++++++++++++ 9 files changed, 694 insertions(+), 491 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 5ca2bcb..dcbd49c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hdds.scm.container; import java.io.IOException; @@ -23,18 +22,16 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.replication - .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.scm.node.states.ReportResult; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -59,22 +56,21 @@ public class ContainerReportHandler implements private ReplicationActivityStatus replicationStatus; - public ContainerReportHandler(Mapping containerMapping, Node2ContainerMap node2ContainerMap, ReplicationActivityStatus replicationActivityStatus) { Preconditions.checkNotNull(containerMapping); Preconditions.checkNotNull(node2ContainerMap); Preconditions.checkNotNull(replicationActivityStatus); + this.containerStateManager = containerMapping.getStateManager(); this.containerMapping = containerMapping; this.node2ContainerMap = node2ContainerMap; - this.containerStateManager = containerMapping.getStateManager(); this.replicationStatus = replicationActivityStatus; } @Override public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, - EventPublisher publisher) { + EventPublisher publisher) { DatanodeDetails datanodeOrigin = containerReportFromDatanode.getDatanodeDetails(); @@ -88,7 +84,8 @@ public class ContainerReportHandler implements .processContainerReports(datanodeOrigin, containerReport, false); Set<ContainerID> containerIds = containerReport.getReportsList().stream() - .map(containerProto -> containerProto.getContainerID()) + .map(StorageContainerDatanodeProtocolProtos + .ContainerInfo::getContainerID) .map(ContainerID::new) .collect(Collectors.toSet()); @@ -102,13 +99,12 @@ public class ContainerReportHandler implements for (ContainerID containerID : reportResult.getMissingContainers()) { containerStateManager .removeContainerReplica(containerID, datanodeOrigin); - emitReplicationRequestEvent(containerID, publisher); + checkReplicationState(containerID, publisher); } for (ContainerID containerID : reportResult.getNewContainers()) { containerStateManager.addContainerReplica(containerID, datanodeOrigin); - - emitReplicationRequestEvent(containerID, publisher); + checkReplicationState(containerID, publisher); } } catch (IOException e) { @@ -119,8 +115,9 @@ public class ContainerReportHandler implements } - private void emitReplicationRequestEvent(ContainerID containerID, - EventPublisher publisher) throws SCMException { + private void checkReplicationState(ContainerID containerID, + EventPublisher publisher) + throws SCMException { ContainerInfo container = containerStateManager.getContainer(containerID); if (container == null) { @@ -134,18 +131,18 @@ public class ContainerReportHandler implements if (container.isContainerOpen()) { return; } - if (replicationStatus.isReplicationEnabled()) { - - int existingReplicas = - containerStateManager.getContainerReplicas(containerID).size(); - - int expectedReplicas = container.getReplicationFactor().getNumber(); - - if (existingReplicas != expectedReplicas) { + ReplicationRequest replicationState = + containerStateManager.checkReplicationState(containerID); + if (replicationState != null) { + if (replicationStatus.isReplicationEnabled()) { publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - new ReplicationRequest(containerID.getId(), existingReplicas, - container.getReplicationFactor().getNumber())); + replicationState); + } else { + LOG.warn( + "Over/under replicated container but the replication is not " + + "(yet) enabled: " + + replicationState.toString()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 421d34e..eb8f2e3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -27,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; @@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.common.statemachine .InvalidStateTransitionException; import org.apache.hadoop.ozone.common.statemachine.StateMachine; import org.apache.hadoop.util.Time; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +151,7 @@ public class ContainerStateManager implements Closeable { finalStates); initializeStateMachine(); - this.containerSize =(long)configuration.getStorageSize( + this.containerSize = (long) configuration.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); @@ -399,7 +402,7 @@ public class ContainerStateManager implements Closeable { // container ID. ContainerState key = new ContainerState(owner, type, factor); ContainerID lastID = lastUsedMap.get(key); - if(lastID == null) { + if (lastID == null) { lastID = matchingSet.first(); } @@ -426,7 +429,7 @@ public class ContainerStateManager implements Closeable { selectedContainer = findContainerWithSpace(size, resultSet, owner); } // Update the allocated Bytes on this container. - if(selectedContainer != null) { + if (selectedContainer != null) { selectedContainer.updateAllocatedBytes(size); } return selectedContainer; @@ -539,9 +542,36 @@ public class ContainerStateManager implements Closeable { DatanodeDetails dn) throws SCMException { return containers.removeContainerReplica(containerID, dn); } - + + /** + * Compare the existing replication number with the expected one. + */ + public ReplicationRequest checkReplicationState(ContainerID containerID) + throws SCMException { + int existingReplicas = getContainerReplicas(containerID).size(); + int expectedReplicas = getContainer(containerID) + .getReplicationFactor().getNumber(); + if (existingReplicas != expectedReplicas) { + return new ReplicationRequest(containerID.getId(), existingReplicas, + expectedReplicas); + } + return null; + } + + /** + * Checks if the container is open. + */ + public boolean isOpen(ContainerID containerID) { + Preconditions.checkNotNull(containerID); + ContainerInfo container = Preconditions + .checkNotNull(getContainer(containerID), + "Container can't be found " + containerID); + return container.isContainerOpen(); + } + @VisibleForTesting public ContainerStateMap getContainerStateMap() { return containers; } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index c853b3b..d694a10 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -23,6 +23,8 @@ import java.util.Set; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -62,6 +64,16 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> { try { containerStateManager.removeContainerReplica(container, datanodeDetails); + + if (!containerStateManager.isOpen(container)) { + ReplicationRequest replicationRequest = + containerStateManager.checkReplicationState(container); + + if (replicationRequest != null) { + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + replicationRequest); + } + } } catch (SCMException e) { LOG.error("Can't remove container from containerStateMap {}", container .getId(), e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index d617680..7af9dda 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -17,6 +17,11 @@ package org.apache.hadoop.hdds.scm; import com.google.common.base.Preconditions; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto @@ -31,12 +36,18 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageTypeProto; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -376,5 +387,39 @@ public final class TestUtils { return report.build(); } + public static + org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo + allocateContainer(ContainerStateManager containerStateManager) + throws IOException { + + PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); + + Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED, + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE, + PipelineID.randomId()); + when(pipelineSelector + .getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline); + + return containerStateManager + .allocateContainer(pipelineSelector, + HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE, "root").getContainerInfo(); + + } + + public static void closeContainer(ContainerStateManager containerStateManager, + org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo + container) + throws SCMException { + + containerStateManager.getContainerStateMap() + .updateState(container, container.getState(), LifeCycleState.CLOSING); + + containerStateManager.getContainerStateMap() + .updateState(container, container.getState(), LifeCycleState.CLOSED); + + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 66f0966..d74a32f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -71,9 +71,7 @@ public class TestContainerReportHandler implements EventPublisher { @Test public void test() throws IOException { - - //given - + //GIVEN OzoneConfiguration conf = new OzoneConfiguration(); Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); Mapping mapping = Mockito.mock(Mapping.class); @@ -133,19 +131,9 @@ public class TestContainerReportHandler implements EventPublisher { long c3 = cont3.getContainerID(); // Close remaining containers - try { - containerStateManager.getContainerStateMap() - .updateState(cont1, cont1.getState(), LifeCycleState.CLOSING); - containerStateManager.getContainerStateMap() - .updateState(cont1, cont1.getState(), LifeCycleState.CLOSED); - containerStateManager.getContainerStateMap() - .updateState(cont2, cont2.getState(), LifeCycleState.CLOSING); - containerStateManager.getContainerStateMap() - .updateState(cont2, cont2.getState(), LifeCycleState.CLOSED); - - } catch (IOException e) { - LOG.info("Failed to change state of open containers.", e); - } + TestUtils.closeContainer(containerStateManager, cont1); + TestUtils.closeContainer(containerStateManager, cont2); + //when //initial reports before replication is enabled. 2 containers w 3 replicas. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java new file mode 100644 index 0000000..fe92ee5 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Testing ContainerStatemanager. + */ +public class TestContainerStateManager { + + private ContainerStateManager containerStateManager; + + @Before + public void init() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + Mapping mapping = Mockito.mock(Mapping.class); + containerStateManager = new ContainerStateManager(conf, mapping); + + } + + @Test + public void checkReplicationStateOK() throws IOException { + //GIVEN + ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager); + + DatanodeDetails d1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails d2 = TestUtils.randomDatanodeDetails(); + DatanodeDetails d3 = TestUtils.randomDatanodeDetails(); + + addReplica(c1, d1); + addReplica(c1, d2); + addReplica(c1, d3); + + //WHEN + ReplicationRequest replicationRequest = containerStateManager + .checkReplicationState(new ContainerID(c1.getContainerID())); + + //THEN + Assert.assertNull(replicationRequest); + } + + @Test + public void checkReplicationStateMissingReplica() throws IOException { + //GIVEN + + ContainerInfo c1 = TestUtils.allocateContainer(containerStateManager); + + DatanodeDetails d1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails d2 = TestUtils.randomDatanodeDetails(); + + addReplica(c1, d1); + addReplica(c1, d2); + + //WHEN + ReplicationRequest replicationRequest = containerStateManager + .checkReplicationState(new ContainerID(c1.getContainerID())); + + Assert + .assertEquals(c1.getContainerID(), replicationRequest.getContainerId()); + Assert.assertEquals(2, replicationRequest.getReplicationCount()); + Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); + } + + private void addReplica(ContainerInfo c1, DatanodeDetails d1) { + containerStateManager + .addContainerReplica(new ContainerID(c1.getContainerID()), d1); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 4be10e1..0b69f5f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -18,76 +18,76 @@ package org.apache.hadoop.hdds.scm.node; -import java.util.HashSet; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerStateManager; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.eq; import org.mockito.Mockito; /** * Test DeadNodeHandler. */ public class TestDeadNodeHandler { + + private List<ReplicationRequest> sentEvents = new ArrayList<>(); + @Test - public void testOnMessage() throws SCMException { + public void testOnMessage() throws IOException { //GIVEN DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails(); DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails(); - ContainerInfo container1 = TestUtils.getRandomContainerInfo(1); - ContainerInfo container2 = TestUtils.getRandomContainerInfo(2); - ContainerInfo container3 = TestUtils.getRandomContainerInfo(3); - Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); ContainerStateManager containerStateManager = new ContainerStateManager( new OzoneConfiguration(), Mockito.mock(Mapping.class) ); + + ContainerInfo container1 = + TestUtils.allocateContainer(containerStateManager); + ContainerInfo container2 = + TestUtils.allocateContainer(containerStateManager); + ContainerInfo container3 = + TestUtils.allocateContainer(containerStateManager); + DeadNodeHandler handler = new DeadNodeHandler(node2ContainerMap, containerStateManager); - node2ContainerMap - .insertNewDatanode(datanode1.getUuid(), new HashSet<ContainerID>() {{ - add(new ContainerID(container1.getContainerID())); - add(new ContainerID(container2.getContainerID())); - }}); + registerReplicas(node2ContainerMap, datanode1, container1, container2); + registerReplicas(node2ContainerMap, datanode2, container1, container3); - node2ContainerMap - .insertNewDatanode(datanode2.getUuid(), new HashSet<ContainerID>() {{ - add(new ContainerID(container1.getContainerID())); - add(new ContainerID(container3.getContainerID())); - }}); + registerReplicas(containerStateManager, container1, datanode1, datanode2); + registerReplicas(containerStateManager, container2, datanode1); + registerReplicas(containerStateManager, container3, datanode2); - containerStateManager.getContainerStateMap() - .addContainerReplica(new ContainerID(container1.getContainerID()), - datanode1, datanode2); + TestUtils.closeContainer(containerStateManager, container1); - containerStateManager.getContainerStateMap() - .addContainerReplica(new ContainerID(container2.getContainerID()), - datanode1); - - containerStateManager.getContainerStateMap() - .addContainerReplica(new ContainerID(container3.getContainerID()), - datanode2); + EventPublisher publisher = Mockito.mock(EventPublisher.class); //WHEN datanode1 is dead - handler.onMessage(datanode1, Mockito.mock(EventPublisher.class)); + handler.onMessage(datanode1, publisher); //THEN - //node2ContainerMap has not been changed Assert.assertEquals(2, node2ContainerMap.size()); @@ -108,5 +108,40 @@ public class TestDeadNodeHandler { Assert.assertEquals(1, container3Replicas.size()); Assert.assertEquals(datanode2, container3Replicas.iterator().next()); + ArgumentCaptor<ReplicationRequest> replicationRequestParameter = + ArgumentCaptor.forClass(ReplicationRequest.class); + + Mockito.verify(publisher) + .fireEvent(eq(SCMEvents.REPLICATE_CONTAINER), + replicationRequestParameter.capture()); + + Assert + .assertEquals(container1.getContainerID(), + replicationRequestParameter.getValue().getContainerId()); + Assert + .assertEquals(1, + replicationRequestParameter.getValue().getReplicationCount()); + Assert + .assertEquals(3, + replicationRequestParameter.getValue().getExpecReplicationCount()); + } + + private void registerReplicas(ContainerStateManager containerStateManager, + ContainerInfo container, DatanodeDetails... datanodes) { + containerStateManager.getContainerStateMap() + .addContainerReplica(new ContainerID(container.getContainerID()), + datanodes); } + + private void registerReplicas(Node2ContainerMap node2ContainerMap, + DatanodeDetails datanode, + ContainerInfo... containers) + throws SCMException { + node2ContainerMap + .insertNewDatanode(datanode.getUuid(), + Arrays.stream(containers) + .map(container -> new ContainerID(container.getContainerID())) + .collect(Collectors.toSet())); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java deleted file mode 100644 index 9e209af..0000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ /dev/null @@ -1,415 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import com.google.common.primitives.Longs; -import java.util.Set; -import java.util.UUID; -import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.hdds.scm.XceiverClientManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.LambdaTestUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableSet; -import java.util.Random; -import org.slf4j.event.Level; - -/** - * Tests for ContainerStateManager. - */ -public class TestContainerStateManager { - - private OzoneConfiguration conf; - private MiniOzoneCluster cluster; - private XceiverClientManager xceiverClientManager; - private StorageContainerManager scm; - private Mapping scmContainerMapping; - private ContainerStateManager containerStateManager; - private String containerOwner = "OZONE"; - - - @Before - public void setup() throws Exception { - conf = new OzoneConfiguration(); - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); - cluster.waitForClusterToBeReady(); - xceiverClientManager = new XceiverClientManager(conf); - scm = cluster.getStorageContainerManager(); - scmContainerMapping = scm.getScmContainerManager(); - containerStateManager = scmContainerMapping.getStateManager(); - } - - @After - public void cleanUp() { - if (cluster != null) { - cluster.shutdown(); - } - } - - @Test - public void testAllocateContainer() throws IOException { - // Allocate a container and verify the container info - ContainerWithPipeline container1 = scm.getClientProtocolServer() - .allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - ContainerInfo info = containerStateManager - .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.ALLOCATED); - Assert.assertEquals(container1.getContainerInfo().getContainerID(), - info.getContainerID()); - Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes()); - Assert.assertEquals(containerOwner, info.getOwner()); - Assert.assertEquals(xceiverClientManager.getType(), - info.getReplicationType()); - Assert.assertEquals(xceiverClientManager.getFactor(), - info.getReplicationFactor()); - Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState()); - - // Check there are two containers in ALLOCATED state after allocation - ContainerWithPipeline container2 = scm.getClientProtocolServer() - .allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - int numContainers = containerStateManager - .getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.ALLOCATED).size(); - Assert.assertNotEquals(container1.getContainerInfo().getContainerID(), - container2.getContainerInfo().getContainerID()); - Assert.assertEquals(2, numContainers); - } - - @Test - public void testContainerStateManagerRestart() throws IOException { - // Allocate 5 containers in ALLOCATED state and 5 in CREATING state - - List<ContainerInfo> containers = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - ContainerWithPipeline container = scm.getClientProtocolServer() - .allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - containers.add(container.getContainerInfo()); - if (i >= 5) { - scm.getScmContainerManager().updateContainerState(container - .getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - } - } - - // New instance of ContainerStateManager should load all the containers in - // container store. - ContainerStateManager stateManager = - new ContainerStateManager(conf, scmContainerMapping - ); - int matchCount = stateManager - .getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.ALLOCATED).size(); - Assert.assertEquals(5, matchCount); - matchCount = stateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.CREATING).size(); - Assert.assertEquals(5, matchCount); - } - - @Test - public void testGetMatchingContainer() throws IOException { - ContainerWithPipeline container1 = scm.getClientProtocolServer(). - allocateContainer(xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATED); - - ContainerWithPipeline container2 = scm.getClientProtocolServer(). - allocateContainer(xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - - ContainerInfo info = containerStateManager - .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.OPEN); - Assert.assertEquals(container1.getContainerInfo().getContainerID(), - info.getContainerID()); - - info = containerStateManager - .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.ALLOCATED); - Assert.assertEquals(container2.getContainerInfo().getContainerID(), - info.getContainerID()); - - scmContainerMapping - .updateContainerState(container2.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - scmContainerMapping - .updateContainerState(container2.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATED); - - // space has already been allocated in container1, now container 2 should - // be chosen. - info = containerStateManager - .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.OPEN); - Assert.assertEquals(container2.getContainerInfo().getContainerID(), - info.getContainerID()); - } - - @Test - public void testUpdateContainerState() throws IOException { - NavigableSet<ContainerID> containerList = containerStateManager - .getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.ALLOCATED); - int containers = containerList == null ? 0 : containerList.size(); - Assert.assertEquals(0, containers); - - // Allocate container1 and update its state from ALLOCATED -> CREATING -> - // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED - ContainerWithPipeline container1 = scm.getClientProtocolServer() - .allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.ALLOCATED).size(); - Assert.assertEquals(1, containers); - - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.CREATING).size(); - Assert.assertEquals(1, containers); - - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATED); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.OPEN).size(); - Assert.assertEquals(1, containers); - - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.FINALIZE); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.CLOSING).size(); - Assert.assertEquals(1, containers); - - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CLOSE); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.CLOSED).size(); - Assert.assertEquals(1, containers); - - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.DELETE); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.DELETING).size(); - Assert.assertEquals(1, containers); - - scmContainerMapping - .updateContainerState(container1.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CLEANUP); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.DELETED).size(); - Assert.assertEquals(1, containers); - - // Allocate container1 and update its state from ALLOCATED -> CREATING -> - // DELETING - ContainerWithPipeline container2 = scm.getClientProtocolServer() - .allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - scmContainerMapping - .updateContainerState(container2.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - scmContainerMapping - .updateContainerState(container2.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.TIMEOUT); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.DELETING).size(); - Assert.assertEquals(1, containers); - - // Allocate container1 and update its state from ALLOCATED -> CREATING -> - // OPEN -> CLOSING -> CLOSED - ContainerWithPipeline container3 = scm.getClientProtocolServer() - .allocateContainer( - xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - scmContainerMapping - .updateContainerState(container3.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - scmContainerMapping - .updateContainerState(container3.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATED); - scmContainerMapping - .updateContainerState(container3.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.FINALIZE); - scmContainerMapping - .updateContainerState(container3.getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CLOSE); - containers = containerStateManager.getMatchingContainerIDs(containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.CLOSED).size(); - Assert.assertEquals(1, containers); - } - - @Test - public void testUpdatingAllocatedBytes() throws Exception { - ContainerWithPipeline container1 = scm.getClientProtocolServer() - .allocateContainer(xceiverClientManager.getType(), - xceiverClientManager.getFactor(), containerOwner); - scmContainerMapping.updateContainerState(container1 - .getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATE); - scmContainerMapping.updateContainerState(container1 - .getContainerInfo().getContainerID(), - HddsProtos.LifeCycleEvent.CREATED); - - Random ran = new Random(); - long allocatedSize = 0; - for (int i = 0; i<5; i++) { - long size = Math.abs(ran.nextLong() % OzoneConsts.GB); - allocatedSize += size; - // trigger allocating bytes by calling getMatchingContainer - ContainerInfo info = containerStateManager - .getMatchingContainer(size, containerOwner, - xceiverClientManager.getType(), xceiverClientManager.getFactor(), - HddsProtos.LifeCycleState.OPEN); - Assert.assertEquals(container1.getContainerInfo().getContainerID(), - info.getContainerID()); - - ContainerMapping containerMapping = - (ContainerMapping) scmContainerMapping; - // manually trigger a flush, this will persist the allocated bytes value - // to disk - containerMapping.flushContainerInfo(); - - // the persisted value should always be equal to allocated size. - byte[] containerBytes = containerMapping.getContainerStore().get( - Longs.toByteArray(container1.getContainerInfo().getContainerID())); - HddsProtos.SCMContainerInfo infoProto = - HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); - ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto); - Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes()); - } - } - - @Test - public void testReplicaMap() throws Exception { - GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG); - GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer - .captureLogs(ContainerStateMap.getLOG()); - DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1") - .setIpAddress("1.1.1.1") - .setUuid(UUID.randomUUID().toString()).build(); - DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2") - .setIpAddress("2.2.2.2") - .setUuid(UUID.randomUUID().toString()).build(); - - // Test 1: no replica's exist - ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong()); - Set<DatanodeDetails> replicaSet; - LambdaTestUtils.intercept(SCMException.class, "", () -> { - containerStateManager.getContainerReplicas(containerID); - }); - - // Test 2: Add replica nodes and then test - containerStateManager.addContainerReplica(containerID, dn1); - containerStateManager.addContainerReplica(containerID, dn2); - replicaSet = containerStateManager.getContainerReplicas(containerID); - Assert.assertEquals(2, replicaSet.size()); - Assert.assertTrue(replicaSet.contains(dn1)); - Assert.assertTrue(replicaSet.contains(dn2)); - - // Test 3: Remove one replica node and then test - containerStateManager.removeContainerReplica(containerID, dn1); - replicaSet = containerStateManager.getContainerReplicas(containerID); - Assert.assertEquals(1, replicaSet.size()); - Assert.assertFalse(replicaSet.contains(dn1)); - Assert.assertTrue(replicaSet.contains(dn2)); - - // Test 3: Remove second replica node and then test - containerStateManager.removeContainerReplica(containerID, dn2); - replicaSet = containerStateManager.getContainerReplicas(containerID); - Assert.assertEquals(0, replicaSet.size()); - Assert.assertFalse(replicaSet.contains(dn1)); - Assert.assertFalse(replicaSet.contains(dn2)); - - // Test 4: Re-insert dn1 - containerStateManager.addContainerReplica(containerID, dn1); - replicaSet = containerStateManager.getContainerReplicas(containerID); - Assert.assertEquals(1, replicaSet.size()); - Assert.assertTrue(replicaSet.contains(dn1)); - Assert.assertFalse(replicaSet.contains(dn2)); - - // Re-insert dn2 - containerStateManager.addContainerReplica(containerID, dn2); - replicaSet = containerStateManager.getContainerReplicas(containerID); - Assert.assertEquals(2, replicaSet.size()); - Assert.assertTrue(replicaSet.contains(dn1)); - Assert.assertTrue(replicaSet.contains(dn2)); - - Assert.assertFalse(logCapturer.getOutput().contains( - "ReplicaMap already contains entry for container Id: " + containerID - .toString() + ",DataNode: " + dn1.toString())); - // Re-insert dn1 - containerStateManager.addContainerReplica(containerID, dn1); - replicaSet = containerStateManager.getContainerReplicas(containerID); - Assert.assertEquals(2, replicaSet.size()); - Assert.assertTrue(replicaSet.contains(dn1)); - Assert.assertTrue(replicaSet.contains(dn2)); - Assert.assertTrue(logCapturer.getOutput().contains( - "ReplicaMap already contains entry for container Id: " + containerID - .toString() + ",DataNode: " + dn1.toString())); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab90248b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java new file mode 100644 index 0000000..c6e819b --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.primitives.Longs; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import org.slf4j.event.Level; + +/** + * Tests for ContainerStateManager. + */ +public class TestContainerStateManagerIntegration { + + private OzoneConfiguration conf; + private MiniOzoneCluster cluster; + private XceiverClientManager xceiverClientManager; + private StorageContainerManager scm; + private Mapping scmContainerMapping; + private ContainerStateManager containerStateManager; + private String containerOwner = "OZONE"; + + + @Before + public void setup() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); + cluster.waitForClusterToBeReady(); + xceiverClientManager = new XceiverClientManager(conf); + scm = cluster.getStorageContainerManager(); + scmContainerMapping = scm.getScmContainerManager(); + containerStateManager = scmContainerMapping.getStateManager(); + } + + @After + public void cleanUp() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testAllocateContainer() throws IOException { + // Allocate a container and verify the container info + ContainerWithPipeline container1 = scm.getClientProtocolServer() + .allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + ContainerInfo info = containerStateManager + .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(container1.getContainerInfo().getContainerID(), + info.getContainerID()); + Assert.assertEquals(OzoneConsts.GB * 3, info.getAllocatedBytes()); + Assert.assertEquals(containerOwner, info.getOwner()); + Assert.assertEquals(xceiverClientManager.getType(), + info.getReplicationType()); + Assert.assertEquals(xceiverClientManager.getFactor(), + info.getReplicationFactor()); + Assert.assertEquals(HddsProtos.LifeCycleState.ALLOCATED, info.getState()); + + // Check there are two containers in ALLOCATED state after allocation + ContainerWithPipeline container2 = scm.getClientProtocolServer() + .allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + int numContainers = containerStateManager + .getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.ALLOCATED).size(); + Assert.assertNotEquals(container1.getContainerInfo().getContainerID(), + container2.getContainerInfo().getContainerID()); + Assert.assertEquals(2, numContainers); + } + + @Test + public void testContainerStateManagerRestart() throws IOException { + // Allocate 5 containers in ALLOCATED state and 5 in CREATING state + + List<ContainerInfo> containers = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + ContainerWithPipeline container = scm.getClientProtocolServer() + .allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + containers.add(container.getContainerInfo()); + if (i >= 5) { + scm.getScmContainerManager().updateContainerState(container + .getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + } + } + + // New instance of ContainerStateManager should load all the containers in + // container store. + ContainerStateManager stateManager = + new ContainerStateManager(conf, scmContainerMapping + ); + int matchCount = stateManager + .getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.ALLOCATED).size(); + Assert.assertEquals(5, matchCount); + matchCount = stateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CREATING).size(); + Assert.assertEquals(5, matchCount); + } + + @Test + public void testGetMatchingContainer() throws IOException { + ContainerWithPipeline container1 = scm.getClientProtocolServer(). + allocateContainer(xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATED); + + ContainerWithPipeline container2 = scm.getClientProtocolServer(). + allocateContainer(xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + + ContainerInfo info = containerStateManager + .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.OPEN); + Assert.assertEquals(container1.getContainerInfo().getContainerID(), + info.getContainerID()); + + info = containerStateManager + .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(container2.getContainerInfo().getContainerID(), + info.getContainerID()); + + scmContainerMapping + .updateContainerState(container2.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + scmContainerMapping + .updateContainerState(container2.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATED); + + // space has already been allocated in container1, now container 2 should + // be chosen. + info = containerStateManager + .getMatchingContainer(OzoneConsts.GB * 3, containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.OPEN); + Assert.assertEquals(container2.getContainerInfo().getContainerID(), + info.getContainerID()); + } + + @Test + public void testUpdateContainerState() throws IOException { + NavigableSet<ContainerID> containerList = containerStateManager + .getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.ALLOCATED); + int containers = containerList == null ? 0 : containerList.size(); + Assert.assertEquals(0, containers); + + // Allocate container1 and update its state from ALLOCATED -> CREATING -> + // OPEN -> CLOSING -> CLOSED -> DELETING -> DELETED + ContainerWithPipeline container1 = scm.getClientProtocolServer() + .allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.ALLOCATED).size(); + Assert.assertEquals(1, containers); + + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CREATING).size(); + Assert.assertEquals(1, containers); + + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATED); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.OPEN).size(); + Assert.assertEquals(1, containers); + + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.FINALIZE); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CLOSING).size(); + Assert.assertEquals(1, containers); + + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CLOSE); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CLOSED).size(); + Assert.assertEquals(1, containers); + + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.DELETE); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.DELETING).size(); + Assert.assertEquals(1, containers); + + scmContainerMapping + .updateContainerState(container1.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CLEANUP); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.DELETED).size(); + Assert.assertEquals(1, containers); + + // Allocate container1 and update its state from ALLOCATED -> CREATING -> + // DELETING + ContainerWithPipeline container2 = scm.getClientProtocolServer() + .allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + scmContainerMapping + .updateContainerState(container2.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + scmContainerMapping + .updateContainerState(container2.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.TIMEOUT); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.DELETING).size(); + Assert.assertEquals(1, containers); + + // Allocate container1 and update its state from ALLOCATED -> CREATING -> + // OPEN -> CLOSING -> CLOSED + ContainerWithPipeline container3 = scm.getClientProtocolServer() + .allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + scmContainerMapping + .updateContainerState(container3.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + scmContainerMapping + .updateContainerState(container3.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATED); + scmContainerMapping + .updateContainerState(container3.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.FINALIZE); + scmContainerMapping + .updateContainerState(container3.getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CLOSE); + containers = containerStateManager.getMatchingContainerIDs(containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.CLOSED).size(); + Assert.assertEquals(1, containers); + } + + @Test + public void testUpdatingAllocatedBytes() throws Exception { + ContainerWithPipeline container1 = scm.getClientProtocolServer() + .allocateContainer(xceiverClientManager.getType(), + xceiverClientManager.getFactor(), containerOwner); + scmContainerMapping.updateContainerState(container1 + .getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATE); + scmContainerMapping.updateContainerState(container1 + .getContainerInfo().getContainerID(), + HddsProtos.LifeCycleEvent.CREATED); + + Random ran = new Random(); + long allocatedSize = 0; + for (int i = 0; i<5; i++) { + long size = Math.abs(ran.nextLong() % OzoneConsts.GB); + allocatedSize += size; + // trigger allocating bytes by calling getMatchingContainer + ContainerInfo info = containerStateManager + .getMatchingContainer(size, containerOwner, + xceiverClientManager.getType(), xceiverClientManager.getFactor(), + HddsProtos.LifeCycleState.OPEN); + Assert.assertEquals(container1.getContainerInfo().getContainerID(), + info.getContainerID()); + + ContainerMapping containerMapping = + (ContainerMapping) scmContainerMapping; + // manually trigger a flush, this will persist the allocated bytes value + // to disk + containerMapping.flushContainerInfo(); + + // the persisted value should always be equal to allocated size. + byte[] containerBytes = containerMapping.getContainerStore().get( + Longs.toByteArray(container1.getContainerInfo().getContainerID())); + HddsProtos.SCMContainerInfo infoProto = + HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); + ContainerInfo currentInfo = ContainerInfo.fromProtobuf(infoProto); + Assert.assertEquals(allocatedSize, currentInfo.getAllocatedBytes()); + } + } + + @Test + public void testReplicaMap() throws Exception { + GenericTestUtils.setLogLevel(ContainerStateMap.getLOG(), Level.DEBUG); + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(ContainerStateMap.getLOG()); + DatanodeDetails dn1 = DatanodeDetails.newBuilder().setHostName("host1") + .setIpAddress("1.1.1.1") + .setUuid(UUID.randomUUID().toString()).build(); + DatanodeDetails dn2 = DatanodeDetails.newBuilder().setHostName("host2") + .setIpAddress("2.2.2.2") + .setUuid(UUID.randomUUID().toString()).build(); + + // Test 1: no replica's exist + ContainerID containerID = ContainerID.valueof(RandomUtils.nextLong()); + Set<DatanodeDetails> replicaSet; + LambdaTestUtils.intercept(SCMException.class, "", () -> { + containerStateManager.getContainerReplicas(containerID); + }); + + // Test 2: Add replica nodes and then test + containerStateManager.addContainerReplica(containerID, dn1); + containerStateManager.addContainerReplica(containerID, dn2); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(2, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + + // Test 3: Remove one replica node and then test + containerStateManager.removeContainerReplica(containerID, dn1); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(1, replicaSet.size()); + Assert.assertFalse(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + + // Test 3: Remove second replica node and then test + containerStateManager.removeContainerReplica(containerID, dn2); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(0, replicaSet.size()); + Assert.assertFalse(replicaSet.contains(dn1)); + Assert.assertFalse(replicaSet.contains(dn2)); + + // Test 4: Re-insert dn1 + containerStateManager.addContainerReplica(containerID, dn1); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(1, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertFalse(replicaSet.contains(dn2)); + + // Re-insert dn2 + containerStateManager.addContainerReplica(containerID, dn2); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(2, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + + Assert.assertFalse(logCapturer.getOutput().contains( + "ReplicaMap already contains entry for container Id: " + containerID + .toString() + ",DataNode: " + dn1.toString())); + // Re-insert dn1 + containerStateManager.addContainerReplica(containerID, dn1); + replicaSet = containerStateManager.getContainerReplicas(containerID); + Assert.assertEquals(2, replicaSet.size()); + Assert.assertTrue(replicaSet.contains(dn1)); + Assert.assertTrue(replicaSet.contains(dn2)); + Assert.assertTrue(logCapturer.getOutput().contains( + "ReplicaMap already contains entry for container Id: " + containerID + .toString() + ",DataNode: " + dn1.toString())); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org