http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index fc9afd6..d8b9958 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -63,9 +63,10 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeReportHandler; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; -import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler; -import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler; -import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler; +import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -149,6 +150,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl * State Managers of SCM. */ private final NodeManager scmNodeManager; + private final PipelineManager pipelineManager; private final ContainerManager containerManager; private final BlockManager scmBlockManager; private final SCMStorage scmStorage; @@ -201,8 +203,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl scmNodeManager = new SCMNodeManager( conf, scmStorage.getClusterID(), this, eventQueue); + pipelineManager = new SCMPipelineManager(conf, scmNodeManager, eventQueue); containerManager = new SCMContainerManager( - conf, scmNodeManager, eventQueue); + conf, scmNodeManager, pipelineManager, eventQueue); scmBlockManager = new BlockManagerImpl( conf, scmNodeManager, containerManager, eventQueue); @@ -213,14 +216,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl NodeReportHandler nodeReportHandler = new NodeReportHandler(scmNodeManager); PipelineReportHandler pipelineReportHandler = - new PipelineReportHandler( - containerManager.getPipelineSelector()); + new PipelineReportHandler(pipelineManager, conf); CommandStatusReportHandler cmdStatusReportHandler = new CommandStatusReportHandler(); NewNodeHandler newNodeHandler = new NewNodeHandler(scmNodeManager); StaleNodeHandler staleNodeHandler = - new StaleNodeHandler(containerManager.getPipelineSelector()); + new StaleNodeHandler(scmNodeManager, pipelineManager); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager, containerManager); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); @@ -231,11 +233,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl new ContainerReportHandler(containerManager, scmNodeManager, replicationStatus); - PipelineActionEventHandler pipelineActionEventHandler = - new PipelineActionEventHandler(); - - PipelineCloseHandler pipelineCloseHandler = - new PipelineCloseHandler(containerManager.getPipelineSelector()); + PipelineActionHandler pipelineActionHandler = + new PipelineActionHandler(pipelineManager); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, @@ -294,10 +293,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); - eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, - pipelineActionEventHandler); - eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler); - + eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); @@ -771,6 +767,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl LOG.error("SCM Event Queue stop failed", ex); } IOUtils.cleanupWithLogger(LOG, containerManager); + IOUtils.cleanupWithLogger(LOG, pipelineManager); } /** @@ -815,6 +812,16 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl return scmNodeManager; } + /** + * Returns pipeline manager. + * + * @return - Pipeline Manager + */ + @VisibleForTesting + public PipelineManager getPipelineManager() { + return pipelineManager; + } + @VisibleForTesting public BlockManager getScmBlockManager() { return scmBlockManager;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index a9c6906..32e8640 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.scm.server.SCMStorage; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -58,6 +60,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB; public class TestBlockManager implements EventHandler<Boolean> { private static SCMContainerManager mapping; private static MockNodeManager nodeManager; + private static PipelineManager pipelineManager; private static BlockManagerImpl blockManager; private static File testDir; private final static long DEFAULT_BLOCK_SIZE = 128 * MB; @@ -83,7 +86,10 @@ public class TestBlockManager implements EventHandler<Boolean> { throw new IOException("Unable to create test directory path"); } nodeManager = new MockNodeManager(true, 10); - mapping = new SCMContainerManager(conf, nodeManager, eventQueue); + pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue); + mapping = new SCMContainerManager(conf, nodeManager, pipelineManager, + eventQueue); blockManager = new BlockManagerImpl(conf, nodeManager, mapping, eventQueue); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager); @@ -101,6 +107,7 @@ public class TestBlockManager implements EventHandler<Boolean> { @After public void cleanup() throws IOException { blockManager.close(); + pipelineManager.close(); mapping.close(); FileUtil.fullyDelete(testDir); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index 5b76137..06f4f5e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -24,12 +24,11 @@ import org.apache.hadoop.hdds.scm.container.SCMContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto @@ -102,12 +101,13 @@ public class TestDeletedBlockLog { ContainerInfo containerInfo = new ContainerInfo.Builder().setContainerID(1).build(); - Pipeline pipeline = - new Pipeline(null, LifeCycleState.CLOSED, - ReplicationType.RATIS, ReplicationFactor.THREE, null); - pipeline.addMember(dnList.get(0)); - pipeline.addMember(dnList.get(1)); - pipeline.addMember(dnList.get(2)); + Pipeline pipeline = Pipeline.newBuilder() + .setType(ReplicationType.RATIS) + .setFactor(ReplicationFactor.THREE) + .setState(Pipeline.PipelineState.CLOSED) + .setId(PipelineID.randomId()) + .setNodes(dnList) + .build(); ContainerWithPipeline containerWithPipeline = new ContainerWithPipeline(containerInfo, pipeline); when(containerManager.getContainerWithPipeline(anyObject())) @@ -383,11 +383,15 @@ public class TestDeletedBlockLog { private void mockContainerInfo(long containerID, DatanodeDetails dd) throws IOException { - Pipeline pipeline = - new Pipeline("fake", LifeCycleState.OPEN, - ReplicationType.STAND_ALONE, ReplicationFactor.ONE, - PipelineID.randomId()); - pipeline.addMember(dd); + List<DatanodeDetails> dns = new ArrayList<>(); + dns.add(dd); + Pipeline pipeline = Pipeline.newBuilder() + .setType(ReplicationType.STAND_ALONE) + .setFactor(ReplicationFactor.ONE) + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setNodes(dns) + .build(); ContainerInfo.Builder builder = new ContainerInfo.Builder(); builder.setPipelineID(pipeline.getId()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index f4ce102..8d36d29 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 517bc67..44a8deb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; @@ -65,8 +67,11 @@ public class TestCloseContainerEventHandler { configuration .set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); nodeManager = new MockNodeManager(true, 10); - containerManager = new SCMContainerManager(configuration, nodeManager, - new EventQueue()); + PipelineManager pipelineManager = + new SCMPipelineManager(configuration, nodeManager, eventQueue); + containerManager = new + SCMContainerManager(configuration, nodeManager, + pipelineManager, new EventQueue()); eventQueue = new EventQueue(); eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler(containerManager)); @@ -110,11 +115,12 @@ public class TestCloseContainerEventHandler { GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer .captureLogs(CloseContainerEventHandler.LOG); ContainerWithPipeline containerWithPipeline = containerManager - .allocateContainer(HddsProtos.ReplicationType.STAND_ALONE, + .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE, "ozone"); ContainerID id = new ContainerID( containerWithPipeline.getContainerInfo().getContainerID()); - DatanodeDetails datanode = containerWithPipeline.getPipeline().getLeader(); + DatanodeDetails datanode = + containerWithPipeline.getPipeline().getFirstNode(); int closeCount = nodeManager.getCommandCount(datanode); eventQueue.fireEvent(CLOSE_CONTAINER, id); eventQueue.processAll(1000); @@ -149,13 +155,13 @@ public class TestCloseContainerEventHandler { eventQueue.processAll(1000); int i = 0; for (DatanodeDetails details : containerWithPipeline.getPipeline() - .getMachines()) { + .getNodes()) { closeCount[i] = nodeManager.getCommandCount(details); i++; } i = 0; for (DatanodeDetails details : containerWithPipeline.getPipeline() - .getMachines()) { + .getNodes()) { Assert.assertEquals(closeCount[i], nodeManager.getCommandCount(details)); i++; } @@ -166,7 +172,7 @@ public class TestCloseContainerEventHandler { i = 0; // Make sure close is queued for each datanode on the pipeline for (DatanodeDetails details : containerWithPipeline.getPipeline() - .getMachines()) { + .getNodes()) { Assert.assertEquals(closeCount[i] + 1, nodeManager.getCommandCount(details)); Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING, http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 7135173..861d241 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.container.replication .ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.Event; @@ -73,8 +75,11 @@ public class TestContainerReportHandler implements EventPublisher { //GIVEN OzoneConfiguration conf = new OzoneConfiguration(); conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir); + EventQueue eventQueue = new EventQueue(); + PipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue); SCMContainerManager containerManager = new SCMContainerManager( - conf, nodeManager, new EventQueue()); + conf, nodeManager, pipelineManager, eventQueue); ReplicationActivityStatus replicationActivityStatus = new ReplicationActivityStatus(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index 69a3b31..446eb58 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; +import java.util.ArrayList; import java.util.Set; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -25,9 +26,10 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -102,19 +104,20 @@ public class TestContainerStateManager { private ContainerInfo allocateContainer() throws IOException { - PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); + PipelineManager pipelineManager = Mockito.mock(SCMPipelineManager.class); - Pipeline pipeline = new Pipeline("leader", HddsProtos.LifeCycleState.CLOSED, - HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.THREE, - PipelineID.randomId()); + Pipeline pipeline = + Pipeline.newBuilder().setState(Pipeline.PipelineState.CLOSED) + .setId(PipelineID.randomId()) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.THREE) + .setNodes(new ArrayList<>()).build(); - when(pipelineSelector - .getReplicationPipeline(HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline); + when(pipelineManager.createPipeline(HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.THREE)).thenReturn(pipeline); - return containerStateManager.allocateContainer( - pipelineSelector, HddsProtos.ReplicationType.STAND_ALONE, + return containerStateManager.allocateContainer(pipelineManager, + HddsProtos.ReplicationType.STAND_ALONE, HddsProtos.ReplicationFactor.THREE, "root"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java index 75f8b8c..fa0f084 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java @@ -24,13 +24,15 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; @@ -59,6 +61,7 @@ import java.util.concurrent.TimeUnit; public class TestSCMContainerManager { private static SCMContainerManager containerManager; private static MockNodeManager nodeManager; + private static PipelineManager pipelineManager; private static File testDir; private static XceiverClientManager xceiverClientManager; private static String containerOwner = "OZONE"; @@ -85,8 +88,10 @@ public class TestSCMContainerManager { throw new IOException("Unable to create test directory path"); } nodeManager = new MockNodeManager(true, 10); + pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue()); containerManager = new SCMContainerManager(conf, nodeManager, - new EventQueue()); + pipelineManager, new EventQueue()); xceiverClientManager = new XceiverClientManager(conf); random = new Random(); } @@ -96,6 +101,9 @@ public class TestSCMContainerManager { if(containerManager != null) { containerManager.close(); } + if (pipelineManager != null) { + pipelineManager.close(); + } FileUtil.fullyDelete(testDir); } @@ -130,7 +138,7 @@ public class TestSCMContainerManager { Assert.assertNotNull(containerInfo); Assert.assertNotNull(containerInfo.getPipeline()); - pipelineList.add(containerInfo.getPipeline().getLeader() + pipelineList.add(containerInfo.getPipeline().getFirstNode() .getUuid()); } Assert.assertTrue(pipelineList.size() > 5); @@ -145,8 +153,8 @@ public class TestSCMContainerManager { Pipeline pipeline = containerInfo.getPipeline(); Assert.assertNotNull(pipeline); Pipeline newPipeline = containerInfo.getPipeline(); - Assert.assertEquals(pipeline.getLeader().getUuid(), - newPipeline.getLeader().getUuid()); + Assert.assertEquals(pipeline.getFirstNode().getUuid(), + newPipeline.getFirstNode().getUuid()); } @Test @@ -191,15 +199,15 @@ public class TestSCMContainerManager { contInfo = containerManager.getContainer(contInfo.containerID()); Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED); Pipeline pipeline = containerWithPipeline.getPipeline(); - containerManager.getPipelineSelector().finalizePipeline(pipeline); + pipelineManager.finalizePipeline(pipeline.getId()); ContainerWithPipeline containerWithPipeline2 = containerManager .getContainerWithPipeline(contInfo.containerID()); pipeline = containerWithPipeline2.getPipeline(); Assert.assertNotEquals(containerWithPipeline, containerWithPipeline2); Assert.assertNotNull("Pipeline should not be null", pipeline); - Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn1.getHostName())); - Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn2.getHostName())); + Assert.assertTrue(pipeline.getNodes().contains(dn1)); + Assert.assertTrue(pipeline.getNodes().contains(dn2)); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index b0951c8..571a5fb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -20,23 +20,22 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.stream.IntStream; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationRequestToRepeat; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -227,18 +226,16 @@ public class TestReplicationManager { public static Pipeline createPipeline(Iterable<DatanodeDetails> ids) throws IOException { Objects.requireNonNull(ids, "ids == null"); - final Iterator<DatanodeDetails> i = ids.iterator(); - Preconditions.checkArgument(i.hasNext()); - final DatanodeDetails leader = i.next(); - final Pipeline pipeline = - new Pipeline(leader.getUuidString(), LifeCycleState.OPEN, - ReplicationType.STAND_ALONE, ReplicationFactor.ONE, - PipelineID.randomId()); - pipeline.addMember(leader); - while (i.hasNext()) { - pipeline.addMember(i.next()); - } - return pipeline; + Preconditions.checkArgument(ids.iterator().hasNext()); + List<DatanodeDetails> dns = new ArrayList<>(); + ids.forEach(dns::add); + return Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(ReplicationFactor.ONE) + .setNodes(dns) + .build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index fb08ad2..e283732 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -102,7 +104,9 @@ public class TestContainerPlacement { EventQueue eventQueue = new EventQueue(); final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, OZONE_SCM_DB_CACHE_SIZE_DEFAULT); - return new SCMContainerManager(config, scmNodeManager, + PipelineManager pipelineManager = + new SCMPipelineManager(config, scmNodeManager, eventQueue); + return new SCMContainerManager(config, scmNodeManager, pipelineManager, eventQueue); } @@ -156,7 +160,7 @@ public class TestContainerPlacement { xceiverClientManager.getType(), xceiverClientManager.getFactor(), "OZONE"); assertEquals(xceiverClientManager.getFactor().getNumber(), - containerWithPipeline.getPipeline().getMachines().size()); + containerWithPipeline.getPipeline().getNodes().size()); } finally { IOUtils.closeQuietly(containerManager); IOUtils.closeQuietly(nodeManager); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index d971e68..985fa2c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -75,7 +77,10 @@ public class TestDeadNodeHandler { conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir); eventQueue = new EventQueue(); nodeManager = new SCMNodeManager(conf, "cluster1", null, eventQueue); - containerManager = new SCMContainerManager(conf, nodeManager, eventQueue); + PipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, eventQueue); + containerManager = new SCMContainerManager(conf, nodeManager, + pipelineManager, eventQueue); deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); publisher = Mockito.mock(EventPublisher.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index ed95709..c899bda 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 390746f..b2ddf39 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.statemachine http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 069f1af..fbc3420 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -20,8 +20,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java index 0135df3..bf37718 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/InfoSubcommand.java @@ -82,10 +82,7 @@ public class InfoSubcommand implements Callable<Void> { LOG.info("Container Metadata: {}", metadataStr); // Print pipeline of an existing container. - LOG.info("LeaderID: {}", container.getPipeline() - .getLeader().getHostName()); - String machinesStr = container.getPipeline() - .getMachines().stream().map( + String machinesStr = container.getPipeline().getNodes().stream().map( DatanodeDetails::getHostName).collect(Collectors.joining(",")); LOG.info("Datanodes: [{}]", machinesStr); return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 3772c59..0c09fc8 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -282,7 +282,10 @@ public class ChunkGroupInputStream extends InputStream implements Seekable { // irrespective of the container state, we will always read via Standalone // protocol. - pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE); + if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { + pipeline = Pipeline.newBuilder(pipeline) + .setType(HddsProtos.ReplicationType.STAND_ALONE).build(); + } XceiverClientSpi xceiverClient = xceiverClientManager .acquireClient(pipeline); boolean success = false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java index 7a0fa5c..74cbea4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java @@ -24,9 +24,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.junit.After; @@ -50,7 +47,7 @@ public class TestNode2PipelineMap { private static StorageContainerManager scm; private static ContainerWithPipeline ratisContainer; private static ContainerManager containerManager; - private static PipelineSelector pipelineSelector; + private static PipelineManager pipelineManager; /** * Create a MiniDFSCluster for testing. @@ -66,7 +63,7 @@ public class TestNode2PipelineMap { containerManager = scm.getContainerManager(); ratisContainer = containerManager.allocateContainer( RATIS, THREE, "testOwner"); - pipelineSelector = containerManager.getPipelineSelector(); + pipelineManager = scm.getPipelineManager(); } /** @@ -83,15 +80,15 @@ public class TestNode2PipelineMap { @Test public void testPipelineMap() throws IOException { - Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline( - ratisContainer.getPipeline().getId()); + Set<ContainerID> set = pipelineManager + .getContainersInPipeline(ratisContainer.getPipeline().getId()); ContainerID cId = ratisContainer.getContainerInfo().containerID(); Assert.assertEquals(1, set.size()); set.forEach(containerID -> Assert.assertEquals(containerID, cId)); - List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines(); + List<DatanodeDetails> dns = ratisContainer.getPipeline().getNodes(); Assert.assertEquals(3, dns.size()); // get pipeline details by dnid @@ -112,18 +109,14 @@ public class TestNode2PipelineMap { .updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE); containerManager .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); - Set<ContainerID> set2 = pipelineSelector.getOpenContainerIDsByPipeline( + Set<ContainerID> set2 = pipelineManager.getContainersInPipeline( ratisContainer.getPipeline().getId()); Assert.assertEquals(0, set2.size()); - try { - pipelineSelector.updatePipelineState(ratisContainer.getPipeline(), - HddsProtos.LifeCycleEvent.CLOSE); - Assert.fail("closing of pipeline without finalize should fail"); - } catch (Exception e) { - Assert.assertTrue(e instanceof SCMException); - Assert.assertEquals(((SCMException)e).getResult(), - SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE); - } + pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId()); + pipelineManager.removePipeline(ratisContainer.getPipeline().getId()); + pipelines = scm.getScmNodeManager() + .getPipelineByDnID(dns.get(0).getUuid()); + Assert.assertEquals(0, pipelines.size()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java index f3e1ece..45886c6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java @@ -20,12 +20,10 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -53,6 +51,7 @@ public class TestNodeFailure { private static ContainerWithPipeline ratisContainer1; private static ContainerWithPipeline ratisContainer2; private static ContainerManager containerManager; + private static PipelineManager pipelineManager; private static long timeForFailure; /** @@ -76,6 +75,7 @@ public class TestNodeFailure { cluster.waitForClusterToBeReady(); StorageContainerManager scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); + pipelineManager = scm.getPipelineManager(); ratisContainer1 = containerManager.allocateContainer( RATIS, THREE, "testOwner"); ratisContainer2 = containerManager.allocateContainer( @@ -102,21 +102,21 @@ public class TestNodeFailure { @Test public void testPipelineFail() throws InterruptedException, IOException, TimeoutException { - Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(), - HddsProtos.LifeCycleState.OPEN); + Assert.assertEquals(ratisContainer1.getPipeline().getPipelineState(), + Pipeline.PipelineState.OPEN); Pipeline pipelineToFail = ratisContainer1.getPipeline(); - DatanodeDetails dnToFail = pipelineToFail.getMachines().get(0); + DatanodeDetails dnToFail = pipelineToFail.getFirstNode(); cluster.shutdownHddsDatanode(dnToFail); // wait for sufficient time for the callback to be triggered Thread.sleep(3 * timeForFailure); - Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED, - ratisContainer1.getPipeline().getLifeCycleState()); - Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, - ratisContainer2.getPipeline().getLifeCycleState()); - Assert.assertNull(containerManager.getPipelineSelector() - .getPipeline(pipelineToFail.getId())); + Assert.assertEquals(Pipeline.PipelineState.CLOSED, + pipelineManager.getPipeline(ratisContainer1.getPipeline().getId()) + .getPipelineState()); + Assert.assertEquals(Pipeline.PipelineState.OPEN, + pipelineManager.getPipeline(ratisContainer2.getPipeline().getId()) + .getPipelineState()); // Now restart the datanode and make sure that a new pipeline is created. cluster.restartHddsDatanode(dnToFail); ContainerWithPipeline ratisContainer3 = http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java index 52a493d..42d3063 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java @@ -24,8 +24,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.common.helpers .ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.junit.AfterClass; @@ -50,7 +48,7 @@ public class TestPipelineClose { private static ContainerWithPipeline ratisContainer1; private static ContainerWithPipeline ratisContainer2; private static ContainerManager containerManager; - private static PipelineSelector pipelineSelector; + private static PipelineManager pipelineManager; /** * Create a MiniDFSCluster for testing. @@ -68,7 +66,7 @@ public class TestPipelineClose { .allocateContainer(RATIS, THREE, "testOwner"); ratisContainer2 = containerManager .allocateContainer(RATIS, THREE, "testOwner"); - pipelineSelector = containerManager.getPipelineSelector(); + pipelineManager = scm.getPipelineManager(); // At this stage, there should be 2 pipeline one with 1 open container each. // Try closing the both the pipelines, one with a closed container and // the other with an open container. @@ -87,8 +85,8 @@ public class TestPipelineClose { @Test public void testPipelineCloseWithClosedContainer() throws IOException { - Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline( - ratisContainer1.getPipeline().getId()); + Set<ContainerID> set = pipelineManager + .getContainersInPipeline(ratisContainer1.getPipeline().getId()); ContainerID cId = ratisContainer1.getContainerInfo().containerID(); Assert.assertEquals(1, set.size()); @@ -105,17 +103,17 @@ public class TestPipelineClose { containerManager .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE); - Set<ContainerID> setClosed = pipelineSelector.getOpenContainerIDsByPipeline( - ratisContainer1.getPipeline().getId()); + Set<ContainerID> setClosed = pipelineManager + .getContainersInPipeline(ratisContainer1.getPipeline().getId()); Assert.assertEquals(0, setClosed.size()); - pipelineSelector.finalizePipeline(ratisContainer1.getPipeline()); - Pipeline pipeline1 = pipelineSelector + pipelineManager.finalizePipeline(ratisContainer1.getPipeline().getId()); + Pipeline pipeline1 = pipelineManager .getPipeline(ratisContainer1.getPipeline().getId()); - Assert.assertNull(pipeline1); - Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(), - HddsProtos.LifeCycleState.CLOSED); - for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) { + Assert.assertEquals(pipeline1.getPipelineState(), + Pipeline.PipelineState.CLOSED); + pipelineManager.removePipeline(pipeline1.getId()); + for (DatanodeDetails dn : ratisContainer1.getPipeline().getNodes()) { // Assert that the pipeline has been removed from Node2PipelineMap as well Assert.assertEquals(scm.getScmNodeManager().getPipelineByDnID( dn.getUuid()).size(), 0); @@ -125,7 +123,7 @@ public class TestPipelineClose { @Test public void testPipelineCloseWithOpenContainer() throws IOException, TimeoutException, InterruptedException { - Set<ContainerID> setOpen = pipelineSelector.getOpenContainerIDsByPipeline( + Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline( ratisContainer2.getPipeline().getId()); Assert.assertEquals(1, setOpen.size()); @@ -134,12 +132,13 @@ public class TestPipelineClose { .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATE); containerManager .updateContainerState(cId2, HddsProtos.LifeCycleEvent.CREATED); - pipelineSelector.finalizePipeline(ratisContainer2.getPipeline()); - Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(), - HddsProtos.LifeCycleState.CLOSING); - Pipeline pipeline2 = pipelineSelector + pipelineManager.finalizePipeline(ratisContainer2.getPipeline().getId()); + Assert.assertEquals( + pipelineManager.getPipeline(ratisContainer2.getPipeline().getId()) + .getPipelineState(), Pipeline.PipelineState.CLOSED); + Pipeline pipeline2 = pipelineManager .getPipeline(ratisContainer2.getPipeline().getId()); - Assert.assertEquals(pipeline2.getLifeCycleState(), - HddsProtos.LifeCycleState.CLOSING); + Assert.assertEquals(pipeline2.getPipelineState(), + Pipeline.PipelineState.CLOSED); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index 49fb2bc..fd6f76b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -91,7 +91,7 @@ public class TestPipelineStateManager { } // verify pipeline returned is same - Pipeline pipeline1 = stateManager.getPipeline(pipeline.getID()); + Pipeline pipeline1 = stateManager.getPipeline(pipeline.getId()); Assert.assertTrue(pipeline == pipeline1); // clean up @@ -102,15 +102,17 @@ public class TestPipelineStateManager { public void testGetPipelines() throws IOException { Set<Pipeline> pipelines = new HashSet<>(); Pipeline pipeline = createDummyPipeline(1); - pipelines.add(pipeline); stateManager.addPipeline(pipeline); - pipeline = createDummyPipeline(1); + stateManager.openPipeline(pipeline.getId()); pipelines.add(pipeline); + pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getId()); + pipelines.add(pipeline); - Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelinesByType( + Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines( HddsProtos.ReplicationType.RATIS)); - Assert.assertEquals(pipelines, pipelines1); + Assert.assertEquals(pipelines1.size(), pipelines.size()); // clean up for (Pipeline pipeline1 : pipelines) { removePipeline(pipeline1); @@ -131,16 +133,16 @@ public class TestPipelineStateManager { stateManager.addPipeline(pipeline); pipelines.add(pipeline); - // 5 pipelines in allocated state for each type and factor + // 5 pipelines in open state for each type and factor pipeline = createDummyPipeline(type, factor, factor.getNumber()); stateManager.addPipeline(pipeline); - stateManager.openPipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); pipelines.add(pipeline); - // 5 pipelines in allocated state for each type and factor + // 5 pipelines in closed state for each type and factor pipeline = createDummyPipeline(type, factor, factor.getNumber()); stateManager.addPipeline(pipeline); - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); pipelines.add(pipeline); } } @@ -152,8 +154,8 @@ public class TestPipelineStateManager { .values()) { // verify pipelines received List<Pipeline> pipelines1 = - stateManager.getPipelinesByTypeAndFactor(type, factor); - Assert.assertEquals(5, pipelines1.size()); + stateManager.getPipelines(type, factor); + Assert.assertEquals(15, pipelines1.size()); pipelines1.stream().forEach(p -> { Assert.assertEquals(p.getType(), type); Assert.assertEquals(p.getFactor(), factor); @@ -168,40 +170,79 @@ public class TestPipelineStateManager { } @Test + public void testGetPipelinesByTypeAndState() throws IOException { + Set<Pipeline> pipelines = new HashSet<>(); + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + for (int i = 0; i < 5; i++) { + // 5 pipelines in allocated state for each type and factor + Pipeline pipeline = + createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + pipelines.add(pipeline); + + // 5 pipelines in open state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getId()); + pipelines.add(pipeline); + + // 5 pipelines in closed state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.finalizePipeline(pipeline.getId()); + pipelines.add(pipeline); + } + } + + for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType + .values()) { + // verify pipelines received + List<Pipeline> pipelines1 = stateManager + .getPipelines(type, Pipeline.PipelineState.OPEN); + Assert.assertEquals(5, pipelines1.size()); + pipelines1.forEach(p -> { + Assert.assertEquals(p.getType(), type); + Assert.assertEquals(p.getPipelineState(), Pipeline.PipelineState.OPEN); + }); + + pipelines1 = stateManager + .getPipelines(type, Pipeline.PipelineState.OPEN, + Pipeline.PipelineState.CLOSED, Pipeline.PipelineState.ALLOCATED); + Assert.assertEquals(15, pipelines1.size()); + } + + //clean up + for (Pipeline pipeline : pipelines) { + removePipeline(pipeline); + } + } + + @Test public void testAddAndGetContainer() throws IOException { long containerID = 0; Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); - pipeline = stateManager.getPipeline(pipeline.getID()); - - try { - stateManager.addContainerToPipeline(pipeline.getID(), - ContainerID.valueof(++containerID)); - Assert.fail("Container should not have been added"); - } catch (IOException e) { - // add container possible only in container with open state - Assert.assertTrue(e.getMessage().contains("is not in open state")); - } + pipeline = stateManager.getPipeline(pipeline.getId()); + stateManager.addContainerToPipeline(pipeline.getId(), + ContainerID.valueof(++containerID)); // move pipeline to open state - stateManager.openPipeline(pipeline.getID()); - - // add three containers - stateManager.addContainerToPipeline(pipeline.getID(), - ContainerID.valueof(containerID)); - stateManager.addContainerToPipeline(pipeline.getID(), + stateManager.openPipeline(pipeline.getId()); + stateManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(++containerID)); - stateManager.addContainerToPipeline(pipeline.getID(), + stateManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(++containerID)); //verify the number of containers returned Set<ContainerID> containerIDs = - stateManager.getContainers(pipeline.getID()); + stateManager.getContainers(pipeline.getId()); Assert.assertEquals(containerIDs.size(), containerID); removePipeline(pipeline); try { - stateManager.addContainerToPipeline(pipeline.getID(), + stateManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(++containerID)); Assert.fail("Container should not have been added"); } catch (IOException e) { @@ -215,12 +256,12 @@ public class TestPipelineStateManager { Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); // close the pipeline - stateManager.openPipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); stateManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1)); + .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1)); try { - stateManager.removePipeline(pipeline.getID()); + stateManager.removePipeline(pipeline.getId()); Assert.fail("Pipeline should not have been removed"); } catch (IOException e) { // can not remove a pipeline which already has containers @@ -228,10 +269,10 @@ public class TestPipelineStateManager { } // close the pipeline - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); try { - stateManager.removePipeline(pipeline.getID()); + stateManager.removePipeline(pipeline.getId()); Assert.fail("Pipeline should not have been removed"); } catch (IOException e) { // can not remove a pipeline which already has containers @@ -248,33 +289,33 @@ public class TestPipelineStateManager { Pipeline pipeline = createDummyPipeline(1); // create an open pipeline in stateMap stateManager.addPipeline(pipeline); - stateManager.openPipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); - stateManager.addContainerToPipeline(pipeline.getID(), + stateManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(containerID)); - Assert.assertEquals(1, stateManager.getContainers(pipeline.getID()).size()); - stateManager.removeContainerFromPipeline(pipeline.getID(), + Assert.assertEquals(1, stateManager.getContainers(pipeline.getId()).size()); + stateManager.removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(containerID)); - Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size()); + Assert.assertEquals(0, stateManager.getContainers(pipeline.getId()).size()); // add two containers in the pipeline - stateManager.addContainerToPipeline(pipeline.getID(), + stateManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(++containerID)); - stateManager.addContainerToPipeline(pipeline.getID(), + stateManager.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(++containerID)); - Assert.assertEquals(2, stateManager.getContainers(pipeline.getID()).size()); + Assert.assertEquals(2, stateManager.getContainers(pipeline.getId()).size()); // move pipeline to closing state - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); - stateManager.removeContainerFromPipeline(pipeline.getID(), + stateManager.removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(containerID)); - stateManager.removeContainerFromPipeline(pipeline.getID(), + stateManager.removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(--containerID)); - Assert.assertEquals(0, stateManager.getContainers(pipeline.getID()).size()); + Assert.assertEquals(0, stateManager.getContainers(pipeline.getId()).size()); // clean up - stateManager.removePipeline(pipeline.getID()); + stateManager.removePipeline(pipeline.getId()); } @Test @@ -282,30 +323,30 @@ public class TestPipelineStateManager { Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); // finalize on ALLOCATED pipeline - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); Assert.assertEquals(Pipeline.PipelineState.CLOSED, - stateManager.getPipeline(pipeline.getID()).getPipelineState()); + stateManager.getPipeline(pipeline.getId()).getPipelineState()); // clean up removePipeline(pipeline); pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); - stateManager.openPipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); // finalize on OPEN pipeline - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); Assert.assertEquals(Pipeline.PipelineState.CLOSED, - stateManager.getPipeline(pipeline.getID()).getPipelineState()); + stateManager.getPipeline(pipeline.getId()).getPipelineState()); // clean up removePipeline(pipeline); pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); - stateManager.openPipeline(pipeline.getID()); - stateManager.finalizePipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); + stateManager.finalizePipeline(pipeline.getId()); // finalize should work on already closed pipeline - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); Assert.assertEquals(Pipeline.PipelineState.CLOSED, - stateManager.getPipeline(pipeline.getID()).getPipelineState()); + stateManager.getPipeline(pipeline.getId()).getPipelineState()); // clean up removePipeline(pipeline); } @@ -315,25 +356,25 @@ public class TestPipelineStateManager { Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); // open on ALLOCATED pipeline - stateManager.openPipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); Assert.assertEquals(Pipeline.PipelineState.OPEN, - stateManager.getPipeline(pipeline.getID()).getPipelineState()); + stateManager.getPipeline(pipeline.getId()).getPipelineState()); - stateManager.openPipeline(pipeline.getID()); + stateManager.openPipeline(pipeline.getId()); // open should work on already open pipeline Assert.assertEquals(Pipeline.PipelineState.OPEN, - stateManager.getPipeline(pipeline.getID()).getPipelineState()); + stateManager.getPipeline(pipeline.getId()).getPipelineState()); // clean up removePipeline(pipeline); } private void removePipeline(Pipeline pipeline) throws IOException { - stateManager.finalizePipeline(pipeline.getID()); + stateManager.finalizePipeline(pipeline.getId()); Set<ContainerID> containerIDs = - stateManager.getContainers(pipeline.getID()); + stateManager.getContainers(pipeline.getId()); for (ContainerID containerID : containerIDs) { - stateManager.removeContainerFromPipeline(pipeline.getID(), containerID); + stateManager.removeContainerFromPipeline(pipeline.getId(), containerID); } - stateManager.removePipeline(pipeline.getID()); + stateManager.removePipeline(pipeline.getId()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java index 184143a..0025c2e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -58,7 +58,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; @@ -71,7 +71,7 @@ public class TestRatisPipelineProvider { Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline1.getFactor(), factor); Assert.assertEquals(pipeline1.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); } @@ -86,19 +86,20 @@ public class TestRatisPipelineProvider { @Test public void testCreatePipelineWithNodes() throws IOException { HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; - Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber())); + Pipeline pipeline = + provider.create(factor, createListOfNodes(factor.getNumber())); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals( - pipeline.getPipelineState(), Pipeline.PipelineState.ALLOCATED); + pipeline.getPipelineState(), Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; - pipeline = provider.create(createListOfNodes(factor.getNumber())); + pipeline = provider.create(factor, createListOfNodes(factor.getNumber())); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 0f9ad55..dab7fb6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; -import org.apache.hadoop.hdds.scm.container.TestSCMContainerManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -54,7 +53,7 @@ public class TestSCMPipelineManager { public static void setUp() throws Exception { conf = new OzoneConfiguration(); testDir = GenericTestUtils - .getTestDir(TestSCMContainerManager.class.getSimpleName()); + .getTestDir(TestSCMPipelineManager.class.getSimpleName()); conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); boolean folderExisted = testDir.exists() || testDir.mkdirs(); if (!folderExisted) { @@ -83,16 +82,18 @@ public class TestSCMPipelineManager { // new pipeline manager should be able to load the pipelines from the db pipelineManager = - new SCMPipelineManager(conf, nodeManager, - new EventQueue()); + new SCMPipelineManager(conf, nodeManager, new EventQueue()); + for (Pipeline p : pipelines) { + pipelineManager.openPipeline(p.getId()); + } List<Pipeline> pipelineList = - pipelineManager.getPipelinesByType(HddsProtos.ReplicationType.RATIS); + pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS); Assert.assertEquals(pipelines, new HashSet<>(pipelineList)); // clean up for (Pipeline pipeline : pipelines) { - pipelineManager.finalizePipeline(pipeline.getID()); - pipelineManager.removePipeline(pipeline.getID()); + pipelineManager.finalizePipeline(pipeline.getId()); + pipelineManager.removePipeline(pipeline.getId()); } pipelineManager.close(); } @@ -104,13 +105,13 @@ public class TestSCMPipelineManager { Pipeline pipeline = pipelineManager .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); - pipelineManager.openPipeline(pipeline.getID()); + pipelineManager.openPipeline(pipeline.getId()); pipelineManager - .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1)); - pipelineManager.finalizePipeline(pipeline.getID()); + .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1)); + pipelineManager.finalizePipeline(pipeline.getId()); pipelineManager - .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(1)); - pipelineManager.removePipeline(pipeline.getID()); + .removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(1)); + pipelineManager.removePipeline(pipeline.getId()); pipelineManager.close(); // new pipeline manager should not be able to load removed pipelines @@ -118,7 +119,7 @@ public class TestSCMPipelineManager { new SCMPipelineManager(conf, nodeManager, new EventQueue()); try { - pipelineManager.getPipeline(pipeline.getID()); + pipelineManager.getPipeline(pipeline.getId()); Assert.fail("Pipeline should not have been retrieved"); } catch (IOException e) { Assert.assertTrue(e.getMessage().contains("not found")); @@ -138,36 +139,36 @@ public class TestSCMPipelineManager { .createPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE); Assert - .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isHealthy()); + .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); Assert - .assertFalse(pipelineManager.getPipeline(pipeline.getID()).isOpen()); + .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); // get pipeline report from each dn in the pipeline PipelineReportHandler pipelineReportHandler = new PipelineReportHandler(pipelineManager, conf); for (DatanodeDetails dn: pipeline.getNodes()) { PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID()); + TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId()); // pipeline is not healthy until all dns report Assert.assertFalse( - pipelineManager.getPipeline(pipeline.getID()).isHealthy()); + pipelineManager.getPipeline(pipeline.getId()).isHealthy()); pipelineReportHandler .onMessage(pipelineReportFromDatanode, new EventQueue()); } // pipeline is healthy when all dns report Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isHealthy()); + .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy()); // pipeline should now move to open state Assert - .assertTrue(pipelineManager.getPipeline(pipeline.getID()).isOpen()); + .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen()); // close the pipeline - pipelineManager.finalizePipeline(pipeline.getID()); + pipelineManager.finalizePipeline(pipeline.getId()); for (DatanodeDetails dn: pipeline.getNodes()) { PipelineReportFromDatanode pipelineReportFromDatanode = - TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getID()); + TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId()); // pipeline report for a closed pipeline should destroy the pipeline // and remove it from the pipeline manager pipelineReportHandler @@ -175,7 +176,7 @@ public class TestSCMPipelineManager { } try { - pipelineManager.getPipeline(pipeline.getID()); + pipelineManager.getPipeline(pipeline.getId()); Assert.fail("Pipeline should not have been retrieved"); } catch (IOException e) { Assert.assertTrue(e.getMessage().contains("not found")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java index bac4022..0fa8649 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java @@ -19,9 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.junit.AfterClass; @@ -48,6 +46,7 @@ public class TestSCMRestart { private static Pipeline ratisPipeline2; private static ContainerManager containerManager; private static ContainerManager newContainerManager; + private static PipelineManager pipelineManager; /** * Create a MiniDFSCluster for testing. @@ -65,6 +64,7 @@ public class TestSCMRestart { cluster.waitForClusterToBeReady(); StorageContainerManager scm = cluster.getStorageContainerManager(); containerManager = scm.getContainerManager(); + pipelineManager = scm.getPipelineManager(); ratisPipeline1 = containerManager.allocateContainer( RATIS, THREE, "Owner1").getPipeline(); ratisPipeline2 = containerManager.allocateContainer( @@ -75,6 +75,7 @@ public class TestSCMRestart { cluster.restartStorageContainerManager(); newContainerManager = cluster.getStorageContainerManager() .getContainerManager(); + pipelineManager = cluster.getStorageContainerManager().getPipelineManager(); } /** @@ -90,25 +91,15 @@ public class TestSCMRestart { @Test public void testPipelineWithScmRestart() throws IOException { // After restart make sure that the pipeline are still present - Pipeline ratisPipeline1AfterRestart = newContainerManager - .getPipelineSelector().getPipeline(ratisPipeline1.getId()); - Pipeline ratisPipeline2AfterRestart = newContainerManager - .getPipelineSelector().getPipeline(ratisPipeline2.getId()); + Pipeline ratisPipeline1AfterRestart = + pipelineManager.getPipeline(ratisPipeline1.getId()); + Pipeline ratisPipeline2AfterRestart = + pipelineManager.getPipeline(ratisPipeline2.getId()); Assert.assertNotSame(ratisPipeline1AfterRestart, ratisPipeline1); Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2); Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1); Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2); - for (DatanodeDetails dn : ratisPipeline1.getMachines()) { - Assert.assertEquals(dn, ratisPipeline1AfterRestart.getDatanodes() - .get(dn.getUuidString())); - } - - for (DatanodeDetails dn : ratisPipeline2.getMachines()) { - Assert.assertEquals(dn, ratisPipeline2AfterRestart.getDatanodes() - .get(dn.getUuidString())); - } - // Try creating a new ratis pipeline, it should be from the same pipeline // as was before restart Pipeline newRatisPipeline = newContainerManager http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java index b44dbef..22fd95b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java @@ -57,7 +57,7 @@ public class TestSimplePipelineProvider { HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; @@ -67,7 +67,7 @@ public class TestSimplePipelineProvider { HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline1.getFactor(), factor); Assert.assertEquals(pipeline1.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); } @@ -82,21 +82,22 @@ public class TestSimplePipelineProvider { @Test public void testCreatePipelineWithNodes() throws IOException { HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; - Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber())); + Pipeline pipeline = + provider.create(factor, createListOfNodes(factor.getNumber())); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); factor = HddsProtos.ReplicationFactor.ONE; - pipeline = provider.create(createListOfNodes(factor.getNumber())); + pipeline = provider.create(factor, createListOfNodes(factor.getNumber())); Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.STAND_ALONE); Assert.assertEquals(pipeline.getFactor(), factor); Assert.assertEquals(pipeline.getPipelineState(), - Pipeline.PipelineState.ALLOCATED); + Pipeline.PipelineState.OPEN); Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index a83c16e..871f389 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -20,7 +20,7 @@ package org.apache.hadoop.ozone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.rpc.RpcClient; import org.apache.hadoop.hdds.protocol.DatanodeDetails; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java index c69a94c..78a8511 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms. ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms. http://git-wip-us.apache.org/repos/asf/hadoop/blob/dce4ebe8/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index bf6a189..e616eef 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -21,7 +21,7 @@ package org.apache.hadoop.ozone; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; @@ -29,7 +29,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.XceiverClientGrpc; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.TestGenericTestUtils; import org.junit.AfterClass; @@ -40,6 +40,7 @@ import org.junit.Test; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -91,18 +92,20 @@ public class TestMiniOzoneCluster { assertEquals(numberOfNodes, datanodes.size()); for(HddsDatanodeService dn : datanodes) { // Create a single member pipe line - DatanodeDetails datanodeDetails = dn.getDatanodeDetails(); - final Pipeline pipeline = - new Pipeline(datanodeDetails.getUuidString(), - HddsProtos.LifeCycleState.OPEN, - HddsProtos.ReplicationType.STAND_ALONE, - HddsProtos.ReplicationFactor.ONE, PipelineID.randomId()); - pipeline.addMember(datanodeDetails); + List<DatanodeDetails> dns = new ArrayList<>(); + dns.add(dn.getDatanodeDetails()); + Pipeline pipeline = Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.ONE) + .setNodes(dns) + .build(); // Verify client is able to connect to the container try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){ client.connect(); - assertTrue(client.isConnected(pipeline.getLeader())); + assertTrue(client.isConnected(pipeline.getFirstNode())); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
