Repository: hadoop Updated Branches: refs/heads/HDFS-7240 d10f39e75 -> 52925ef82
HDFS-11108. Ozone: use containers with the state machine. Contributed by Anu Engineer Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52925ef8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52925ef8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52925ef8 Branch: refs/heads/HDFS-7240 Commit: 52925ef824910c7509daa4e43cef7278283ea431 Parents: d10f39e Author: Anu Engineer <[email protected]> Authored: Thu Nov 17 13:01:09 2016 -0800 Committer: Anu Engineer <[email protected]> Committed: Thu Nov 17 13:01:09 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hdfs/server/datanode/DataNode.java | 17 +++-- .../statemachine/DatanodeStateMachine.java | 40 ++++++++--- .../common/transport/server/XceiverServer.java | 6 +- .../container/ozoneimpl/OzoneContainer.java | 72 ++++++++------------ .../common/TestDatanodeStateMachine.java | 31 ++++----- .../container/ozoneimpl/TestOzoneContainer.java | 10 +-- 6 files changed, 92 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/52925ef8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index eab1956..86aed61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -72,7 +72,6 @@ import java.net.Socket; import java.net.URI; import java.net.UnknownHostException; import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; @@ -116,6 +115,8 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -192,7 +193,6 @@ import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SaslPropertiesResolver; import org.apache.hadoop.security.SecurityUtil; @@ -396,7 +396,7 @@ public class DataNode extends ReconfigurableBase private DiskBalancer diskBalancer; private final SocketFactory socketFactory; - private OzoneContainer ozoneServer; + private DatanodeStateMachine datanodeStateMachine; private static Tracer createTracer(Configuration conf) { return new Tracer.Builder("DataNode"). @@ -1615,11 +1615,10 @@ public class DataNode extends ReconfigurableBase initDirectoryScanner(getConf()); if(this.ozoneEnabled) { try { - ozoneServer = new OzoneContainer(getConf(), this.getFSDataset()); - ozoneServer.start(); + datanodeStateMachine = DatanodeStateMachine.initStateMachine(getConf()); LOG.info("Ozone container server started."); - } catch (Exception ex) { - LOG.error("Unable to start Ozone. ex: {}", ex.toString()); + } catch (IOException ex) { + LOG.error("Unable to start Ozone. ex: {}", ex); } } initDiskBalancer(data, getConf()); @@ -1975,9 +1974,9 @@ public class DataNode extends ReconfigurableBase } if(this.ozoneEnabled) { - if(ozoneServer != null) { + if(datanodeStateMachine != null) { try { - ozoneServer.stop(); + datanodeStateMachine.close(); } catch (Exception e) { LOG.error("Error is ozone shutdown. ex {}", e.toString()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52925ef8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 5574c9f..8063e23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; @@ -43,27 +44,26 @@ public class DatanodeStateMachine implements Closeable { private final ExecutorService executorService; private final Configuration conf; private final SCMConnectionManager connectionManager; - private final long taskWaitTime; private final long heartbeatFrequency; private StateContext context; + private final OzoneContainer container; /** - * Constructs a container state machine. + * Constructs a a datanode state machine. * * @param conf - Configration. */ - public DatanodeStateMachine(Configuration conf) { + public DatanodeStateMachine(Configuration conf) throws IOException { this.conf = conf; executorService = HadoopExecutors.newScheduledThreadPool( this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Container State Machine Thread - %d").build()); + .setNameFormat("Datanode State Machine Thread - %d").build()); connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this); - taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT, - OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT); heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf); + container = new OzoneContainer(conf); } /** @@ -81,10 +81,12 @@ public class DatanodeStateMachine implements Closeable { public void start() throws IOException { long now = 0; long nextHB = 0; + container.start(); while (context.getState() != DatanodeStates.SHUTDOWN) { try { nextHB = Time.monotonicNow() + heartbeatFrequency; - context.execute(executorService, taskWaitTime, TimeUnit.SECONDS); + context.execute(executorService, heartbeatFrequency, + TimeUnit.MILLISECONDS); now = Time.monotonicNow(); if (now < nextHB) { Thread.sleep(nextHB - now); @@ -144,6 +146,10 @@ public class DatanodeStateMachine implements Closeable { for (EndpointStateMachine endPoint : connectionManager.getValues()) { endPoint.close(); } + + if(container != null) { + container.stop(); + } } /** @@ -159,7 +165,7 @@ public class DatanodeStateMachine implements Closeable { /** * Constructs ContainerStates. * - * @param value + * @param value Enum Value */ DatanodeStates(int value) { this.value = value; @@ -210,4 +216,22 @@ public class DatanodeStateMachine implements Closeable { return getLastState(); } } + + public static DatanodeStateMachine initStateMachine(Configuration conf) + throws IOException { + DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + Runnable startStateMachineTask = () -> { + try { + stateMachine.start(); + } catch (Exception ex) { + LOG.error("Unable to start the DatanodeState Machine", ex); + } + }; + Thread thread = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Datanode State Machine Thread - %d") + .build().newThread(startStateMachineTask); + thread.start(); + return stateMachine; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52925ef8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 19b849d..6f765e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import java.io.IOException; + /** * Creates a netty server endpoint that acts as the communication layer for * Ozone containers. @@ -58,9 +60,9 @@ public final class XceiverServer { /** * Starts running the server. * - * @throws Exception + * @throws IOException */ - public void start() throws Exception { + public void start() throws IOException { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); channel = new ServerBootstrap() http://git-wip-us.apache.org/repos/asf/hadoop/blob/52925ef8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 2565a04..d088b4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * <p/> + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; @@ -38,6 +36,8 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + /** * Ozone main class sets up the network server and initializes the container * layer. @@ -57,12 +57,11 @@ public class OzoneContainer { * Creates a network endpoint and enables Ozone container. * * @param ozoneConfig - Config - * @param dataSet - FsDataset. * @throws IOException */ public OzoneContainer( - Configuration ozoneConfig, - FsDatasetSpi<? extends FsVolumeSpi> dataSet) throws Exception { + Configuration ozoneConfig) throws IOException { + this.ozoneConfig = ozoneConfig; List<StorageLocation> locations = new LinkedList<>(); String[] paths = ozoneConfig.getStrings( OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS); @@ -71,11 +70,9 @@ public class OzoneContainer { locations.add(StorageLocation.parse(p)); } } else { - getDataDir(dataSet, locations); + getDataDir(locations); } - this.ozoneConfig = ozoneConfig; - manager = new ContainerManagerImpl(); manager.init(this.ozoneConfig, locations); this.chunkManager = new ChunkManagerImpl(manager); @@ -90,43 +87,44 @@ public class OzoneContainer { /** * Starts serving requests to ozone container. - * @throws Exception + * + * @throws IOException */ - public void start() throws Exception { + public void start() throws IOException { server.start(); } /** * Stops the ozone container. - * - * Shutdown logic is not very obvious from the following code. - * if you need to modify the logic, please keep these comments in mind. - * Here is the shutdown sequence. - * + * <p> + * Shutdown logic is not very obvious from the following code. if you need to + * modify the logic, please keep these comments in mind. Here is the shutdown + * sequence. + * <p> * 1. We shutdown the network ports. - * + * <p> * 2. Now we need to wait for all requests in-flight to finish. - * - * 3. The container manager lock is a read-write lock with "Fairness" enabled. - * + * <p> + * 3. The container manager lock is a read-write lock with "Fairness" + * enabled. + * <p> * 4. This means that the waiting threads are served in a "first-come-first * -served" manner. Please note that this applies to waiting threads only. - * + * <p> * 5. Since write locks are exclusive, if we are waiting to get a lock it * implies that we are waiting for in-flight operations to complete. - * + * <p> * 6. if there are other write operations waiting on the reader-writer lock, * fairness guarantees that they will proceed before the shutdown lock * request. - * + * <p> * 7. Since all operations either take a reader or writer lock of container * manager, we are guaranteed that we are the last operation since we have * closed the network port, and we wait until close is successful. - * + * <p> * 8. We take the writer lock and call shutdown on each of the managers in * reverse order. That is chunkManager, keyManager and containerManager is * shutdown. - * */ public void stop() { LOG.info("Attempting to stop container services."); @@ -144,26 +142,14 @@ public class OzoneContainer { /** * Returns a paths to data dirs. - * @param dataset - FSDataset. + * * @param pathList - List of paths. * @throws IOException */ - private void getDataDir( - FsDatasetSpi<? extends FsVolumeSpi> dataset, - List<StorageLocation> pathList) throws IOException { - FsDatasetSpi.FsVolumeReferences references; - try { - synchronized (dataset) { - references = dataset.getFsVolumeReferences(); - for (int ndx = 0; ndx < references.size(); ndx++) { - FsVolumeSpi vol = references.get(ndx); - pathList.add(StorageLocation.parse(vol.getBaseURI().getPath())); - } - references.close(); - } - } catch (IOException ex) { - LOG.error("Unable to get volume paths.", ex); - throw new IOException("Internal error", ex); + private void getDataDir(List<StorageLocation> pathList) throws IOException { + for (String dir : ozoneConfig.getStrings(DFS_DATANODE_DATA_DIR_KEY)) { + StorageLocation location = StorageLocation.parse(dir); + pathList.add(location); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52925ef8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index b75a925..28658bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -20,19 +20,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.statemachine - .DatanodeStateMachine; - -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; - +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .InitDatanodeState; -import org.apache.hadoop.ozone.container.common.states.datanode - .RunningDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; @@ -54,18 +47,20 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + /** * Tests the datanode state machine class and its states. */ public class TestDatanodeStateMachine { + private static final Logger LOG = + LoggerFactory.getLogger(TestDatanodeStateMachine.class); private final int scmServerCount = 3; private List<String> serverAddresses; private List<RPC.Server> scmServers; private List<ScmTestMock> mockServers; private ExecutorService executorService; private Configuration conf; - private static final Logger LOG = - LoggerFactory.getLogger(TestDatanodeStateMachine.class); @Before public void setUp() throws Exception { @@ -91,14 +86,15 @@ public class TestDatanodeStateMachine { String path = p.getPath().concat( TestDatanodeStateMachine.class.getSimpleName()); File f = new File(path); - if(!f.mkdirs()) { + if (!f.mkdirs()) { LOG.info("Required directories already exist."); } - + conf.set(DFS_DATANODE_DATA_DIR_KEY, path); path = Paths.get(path.toString(), TestDatanodeStateMachine.class.getSimpleName() + ".id").toString(); conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path); + executorService = HadoopExecutors.newScheduledThreadPool( conf.getInt( OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, @@ -122,7 +118,6 @@ public class TestDatanodeStateMachine { /** * Assert that starting statemachine executes the Init State. * - * @throws IOException * @throws InterruptedException */ @Test @@ -132,7 +127,7 @@ public class TestDatanodeStateMachine { Runnable startStateMachineTask = () -> { try { stateMachine.start(); - } catch (IOException ex) { + } catch (Exception ex) { } }; Thread thread1 = new Thread(startStateMachineTask); http://git-wip-us.apache.org/repos/asf/hadoop/blob/52925ef8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index e765ad6..7921782 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -32,6 +32,9 @@ import org.junit.rules.Timeout; import java.net.URL; +/** + * Tests ozone containers. + */ public class TestOzoneContainer { /** * Set the timeout for every test. @@ -55,12 +58,11 @@ public class TestOzoneContainer { // We don't start Ozone Container via data node, we will do it // independently in our test path. - Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline - (containerName); + Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline( + containerName); conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline.getLeader().getContainerPort()); - OzoneContainer container = new OzoneContainer(conf, cluster.getDataNodes - ().get(0).getFSDataset()); + OzoneContainer container = new OzoneContainer(conf); container.start(); XceiverClient client = new XceiverClient(pipeline, conf); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
