http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java
new file mode 100644
index 0000000..a7ce60b
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/TestRocksDBStoreMBean.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+
+/**
+ * Test the JMX interface for the rocksdb metastore implementation.
+ */
+public class TestRocksDBStoreMBean {
+
+  @Test
+  public void testJmxBeans() throws Exception {
+    File testDir =
+        GenericTestUtils.getTestDir(getClass().getSimpleName() + "-withstat");
+
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
+        OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB);
+
+    RocksDBStore metadataStore =
+        (RocksDBStore) MetadataStoreBuilder.newBuilder().setConf(conf)
+            .setCreateIfMissing(true).setDbFile(testDir).build();
+
+    for (int i = 0; i < 10; i++) {
+      metadataStore.put("key".getBytes(), "value".getBytes());
+    }
+
+    MBeanServer platformMBeanServer =
+        ManagementFactory.getPlatformMBeanServer();
+    Thread.sleep(2000);
+
+    Object keysWritten = platformMBeanServer
+        .getAttribute(metadataStore.getStatMBeanName(), "NUMBER_KEYS_WRITTEN");
+
+    Assert.assertEquals(10L, keysWritten);
+
+    Object dbWriteAverage = platformMBeanServer
+        .getAttribute(metadataStore.getStatMBeanName(), "DB_WRITE_AVERAGE");
+    Assert.assertTrue((double) dbWriteAverage > 0);
+
+    metadataStore.close();
+
+  }
+
+  @Test()
+  public void testDisabledStat() throws Exception {
+    File testDir = GenericTestUtils
+        .getTestDir(getClass().getSimpleName() + "-withoutstat");
+
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
+        OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS,
+        OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF);
+
+    RocksDBStore metadataStore =
+        (RocksDBStore) MetadataStoreBuilder.newBuilder().setConf(conf)
+            .setCreateIfMissing(true).setDbFile(testDir).build();
+
+    Assert.assertNull(metadataStore.getStatMBeanName());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/pom.xml 
b/hadoop-hdds/container-service/pom.xml
new file mode 100644
index 0000000..736272d
--- /dev/null
+++ b/hadoop-hdds/container-service/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-hdds</artifactId>
+    <version>3.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>hadoop-hdds-container-service</artifactId>
+  <version>3.2.0-SNAPSHOT</version>
+  <description>Apache HDDS Container server</description>
+  <name>Apache HDDS Container server</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <hadoop.component>hdds</hadoop.component>
+    <is.hadoop.component>true</is.hadoop.component>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdds-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdds-server-framework</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>2.2.0</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>
+                  
${basedir}/../../hadoop-common-project/hadoop-common/src/main/proto
+                </param>
+                <param>
+                  
${basedir}/../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/
+                </param>
+                <param>
+                  
${basedir}/../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto/
+                </param>
+                <param>
+                  ${basedir}/../../hadoop-hdds/common/src/main/proto/
+                </param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>StorageContainerDatanodeProtocol.proto</include>
+                </includes>
+              </source>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
new file mode 100644
index 0000000..956aef2
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/HddsServerUtil.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdds.HddsUtils.*;
+import static org.apache.hadoop.hdds.server.ServerUtils.sanitizeUserArgs;
+
+/**
+ * Hdds stateless helper functions for server side components.
+ */
+public class HddsServerUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HddsServerUtil.class);
+
+  /**
+   * Retrieve the socket address that should be used by DataNodes to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM service endpoint.
+   */
+  public static InetSocketAddress getScmAddressForDataNodes(
+      Configuration conf) {
+    // We try the following settings in decreasing priority to retrieve the
+    // target host.
+    // - OZONE_SCM_DATANODE_ADDRESS_KEY
+    // - OZONE_SCM_CLIENT_ADDRESS_KEY
+    //
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+
+    if (!host.isPresent()) {
+      throw new IllegalArgumentException(
+          ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY +
+              " must be defined. See" +
+              " https://wiki.apache.org/hadoop/Ozone#Configuration "
+              + "for details on configuring Ozone.");
+    }
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
+
+    InetSocketAddress addr = NetUtils.createSocketAddr(host.get() + ":" +
+        port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+
+    return addr;
+  }
+
+  /**
+   * Retrieve the socket address that should be used by clients to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM client endpoint.
+   */
+  public static InetSocketAddress getScmClientBindAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_KEY);
+
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(ScmConfigKeys.OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
+            port.or(ScmConfigKeys.OZONE_SCM_CLIENT_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that should be used by clients to connect
+   * to the SCM Block service.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM block client endpoint.
+   */
+  public static InetSocketAddress getScmBlockClientBindAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_KEY);
+
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_BIND_HOST_DEFAULT) +
+            ":" + port.or(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT));
+  }
+
+  /**
+   * Retrieve the socket address that should be used by DataNodes to connect
+   * to the SCM.
+   *
+   * @param conf
+   * @return Target InetSocketAddress for the SCM service endpoint.
+   */
+  public static InetSocketAddress getScmDataNodeBindAddress(
+      Configuration conf) {
+    final Optional<String> host = getHostNameFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_KEY);
+
+    // If no port number is specified then we'll just try the defaultBindPort.
+    final Optional<Integer> port = getPortNumberFromConfigKeys(conf,
+        ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY);
+
+    return NetUtils.createSocketAddr(
+        host.or(ScmConfigKeys.OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
+            port.or(ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT));
+  }
+
+
+  /**
+   * Returns the interval in which the heartbeat processor thread runs.
+   *
+   * @param conf - Configuration
+   * @return long in Milliseconds.
+   */
+  public static long getScmheartbeatCheckerInterval(Configuration conf) {
+    return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
+   * SCM.
+   *
+   * @param conf - Ozone Config
+   * @return - HB interval in seconds.
+   */
+  public static long getScmHeartbeatInterval(Configuration conf) {
+    return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL,
+        ScmConfigKeys.OZONE_SCM_HEARBEAT_INTERVAL_DEFAULT,
+        TimeUnit.SECONDS);
+  }
+
+  /**
+   * Get the Stale Node interval, which is used by SCM to flag a datanode as
+   * stale, if the heartbeat from that node has been missing for this duration.
+   *
+   * @param conf - Configuration.
+   * @return - Long, Milliseconds to wait before flagging a node as stale.
+   */
+  public static long getStaleNodeInterval(Configuration conf) {
+
+    long staleNodeIntervalMs =
+        conf.getTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
+            OZONE_SCM_STALENODE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+
+    long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
+
+    long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000;
+
+
+    // Make sure that StaleNodeInterval is configured way above the frequency
+    // at which we run the heartbeat thread.
+    //
+    // Here we check that staleNodeInterval is at least five times more than 
the
+    // frequency at which the accounting thread is going to run.
+    try {
+      sanitizeUserArgs(staleNodeIntervalMs, heartbeatThreadFrequencyMs,
+          5, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Stale Node Interval is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, ex);
+      throw ex;
+    }
+
+    // Make sure that stale node value is greater than configured value that
+    // datanodes are going to send HBs.
+    try {
+      sanitizeUserArgs(staleNodeIntervalMs, heartbeatIntervalMs, 3, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Stale Node Interval MS is cannot be honored due to " +
+          "mis-configured {}. ex:  {}", OZONE_SCM_HEARTBEAT_INTERVAL, ex);
+      throw ex;
+    }
+    return staleNodeIntervalMs;
+  }
+
+  /**
+   * Gets the interval for dead node flagging. This has to be a value that is
+   * greater than stale node value,  and by transitive relation we also know
+   * that this value is greater than heartbeat interval and heartbeatProcess
+   * Interval.
+   *
+   * @param conf - Configuration.
+   * @return - the interval for dead node flagging.
+   */
+  public static long getDeadNodeInterval(Configuration conf) {
+    long staleNodeIntervalMs = getStaleNodeInterval(conf);
+    long deadNodeIntervalMs = conf.getTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
+        OZONE_SCM_DEADNODE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    try {
+      // Make sure that dead nodes Ms is at least twice the time for staleNodes
+      // with a max of 1000 times the staleNodes.
+      sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Dead Node Interval MS is cannot be honored due to " +
+          "mis-configured {}. ex:  {}", OZONE_SCM_STALENODE_INTERVAL, ex);
+      throw ex;
+    }
+    return deadNodeIntervalMs;
+  }
+
+  /**
+   * Returns the maximum number of heartbeat to process per loop of the process
+   * thread.
+   * @param conf Configuration
+   * @return - int -- Number of HBs to process
+   */
+  public static int getMaxHBToProcessPerLoop(Configuration conf) {
+    return conf.getInt(ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
+        ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
+  }
+
+  /**
+   * Timeout value for the RPC from Datanode to SCM, primarily used for
+   * Heartbeats and container reports.
+   *
+   * @param conf - Ozone Config
+   * @return - Rpc timeout in Milliseconds.
+   */
+  public static long getScmRpcTimeOutInMilliseconds(Configuration conf) {
+    return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT,
+        OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Log Warn interval.
+   *
+   * @param conf - Ozone Config
+   * @return - Log warn interval.
+   */
+  public static int getLogWarnInterval(Configuration conf) {
+    return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT,
+        OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT);
+  }
+
+  /**
+   * returns the Container port.
+   * @param conf - Conf
+   * @return port number.
+   */
+  public static int getContainerPort(Configuration conf) {
+    return conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+  }
+
+
+  /**
+   * Return the list of service addresses for the Ozone SCM. This method is 
used
+   * by the DataNodes to determine the service instances to connect to.
+   *
+   * @param conf
+   * @return list of SCM service addresses.
+   */
+  public static Map<String, ? extends Map<String, InetSocketAddress>>
+      getScmServiceRpcAddresses(Configuration conf) {
+
+    final Map<String, InetSocketAddress> serviceInstances = new HashMap<>();
+    serviceInstances.put(OZONE_SCM_SERVICE_INSTANCE_ID,
+        getScmAddressForDataNodes(conf));
+
+    final Map<String, Map<String, InetSocketAddress>> services =
+        new HashMap<>();
+    services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
+    return services;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java
new file mode 100644
index 0000000..4e52046
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/hdds/scm/VersionInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+/**
+ * This is a class that tracks versions of SCM.
+ */
+public final class VersionInfo {
+
+  // We will just be normal and use positive counting numbers for versions.
+  private final static VersionInfo[] VERSION_INFOS =
+      {new VersionInfo("First version of SCM", 1)};
+
+
+  public static final String DESCRIPTION_KEY = "Description";
+  private final String description;
+  private final int version;
+
+  /**
+   * Never created outside this class.
+   *
+   * @param description -- description
+   * @param version     -- version number
+   */
+  private VersionInfo(String description, int version) {
+    this.description = description;
+    this.version = version;
+  }
+
+  /**
+   * Returns all versions.
+   *
+   * @return Version info array.
+   */
+  public static VersionInfo[] getAllVersions() {
+    return VERSION_INFOS.clone();
+  }
+
+  /**
+   * Returns the latest version.
+   *
+   * @return versionInfo
+   */
+  public static VersionInfo getLatestVersion() {
+    return VERSION_INFOS[VERSION_INFOS.length - 1];
+  }
+
+  /**
+   * Return description.
+   *
+   * @return String
+   */
+  public String getDescription() {
+    return description;
+  }
+
+  /**
+   * Return the version.
+   *
+   * @return int.
+   */
+  public int getVersion() {
+    return version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
new file mode 100644
index 0000000..7213e7e
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeServicePlugin;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.UUID;
+
+/**
+ * Datanode service plugin to start the HDDS container services.
+ */
+public class HddsDatanodeService implements DataNodeServicePlugin {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      HddsDatanodeService.class);
+
+  private final boolean isOzoneEnabled;
+
+  private Configuration conf;
+  private DatanodeDetails datanodeDetails;
+  private DatanodeStateMachine datanodeStateMachine;
+
+  public HddsDatanodeService() {
+    try {
+      OzoneConfiguration.activate();
+      this.conf = new OzoneConfiguration();
+      this.isOzoneEnabled = HddsUtils.isHddsEnabled(conf);
+      if (isOzoneEnabled) {
+        this.datanodeDetails = getDatanodeDetails(conf);
+        String hostname = DataNode.getHostName(conf);
+        String ip = InetAddress.getByName(hostname).getHostAddress();
+        this.datanodeDetails.setHostName(hostname);
+        this.datanodeDetails.setIpAddress(ip);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Can't start the HDDS datanode plugin", e);
+    }
+  }
+
+  @Override
+  public void start(Object service) {
+    if (isOzoneEnabled) {
+      try {
+        DataNode dataNode = (DataNode) service;
+        datanodeDetails.setInfoPort(dataNode.getInfoPort());
+        datanodeDetails.setInfoSecurePort(dataNode.getInfoSecurePort());
+        datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf);
+        datanodeStateMachine.startDaemon();
+      } catch (IOException e) {
+        throw new RuntimeException("Can't start the HDDS datanode plugin", e);
+      }
+    }
+  }
+
+  /**
+   * Returns ContainerNodeIDProto or null in case of Error.
+   *
+   * @return ContainerNodeIDProto
+   */
+  private static DatanodeDetails getDatanodeDetails(Configuration conf)
+      throws IOException {
+    String idFilePath = HddsUtils.getDatanodeIdFilePath(conf);
+    if (idFilePath == null || idFilePath.isEmpty()) {
+      LOG.error("A valid file path is needed for config setting {}",
+          ScmConfigKeys.OZONE_SCM_DATANODE_ID);
+      throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_DATANODE_ID +
+          " must be defined. See" +
+          " https://wiki.apache.org/hadoop/Ozone#Configuration"; +
+          " for details on configuring Ozone.");
+    }
+
+    Preconditions.checkNotNull(idFilePath);
+    File idFile = new File(idFilePath);
+    if (idFile.exists()) {
+      return ContainerUtils.readDatanodeDetailsFrom(idFile);
+    } else {
+      // There is no datanode.id file, this might be the first time datanode
+      // is started.
+      String datanodeUuid = UUID.randomUUID().toString();
+      return DatanodeDetails.newBuilder().setUuid(datanodeUuid).build();
+    }
+  }
+
+  /**
+   *
+   * Return DatanodeDetails if set, return null otherwise.
+   *
+   * @return DatanodeDetails
+   */
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+  @InterfaceAudience.Private
+  public DatanodeStateMachine getDatanodeStateMachine() {
+    return datanodeStateMachine;
+  }
+
+  @Override
+  public void stop() {
+    if (datanodeStateMachine != null) {
+      datanodeStateMachine.stopDaemon();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
new file mode 100644
index 0000000..68bf442
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CHECKSUM_MISMATCH;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .INVALID_WRITE_SIZE;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .IO_EXCEPTION;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .OVERWRITE_FLAG_REQUIRED;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNABLE_TO_FIND_CHUNK;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNABLE_TO_FIND_DATA_DIR;
+
+/**
+ * Set of utility functions used by the chunk Manager.
+ */
+public final class ChunkUtils {
+
+  /* Never constructed. */
+  private ChunkUtils() {
+  }
+
+  /**
+   * Checks if we are getting a request to overwrite an existing range of
+   * chunk.
+   *
+   * @param chunkFile - File
+   * @param chunkInfo - Buffer to write
+   * @return bool
+   */
+  public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
+      chunkInfo) {
+
+    if (!chunkFile.exists()) {
+      return false;
+    }
+
+    long offset = chunkInfo.getOffset();
+    return offset < chunkFile.length();
+  }
+
+  /**
+   * Overwrite is permitted if an only if the user explicitly asks for it. We
+   * permit this iff the key/value pair contains a flag called
+   * [OverWriteRequested, true].
+   *
+   * @param chunkInfo - Chunk info
+   * @return true if the user asks for it.
+   */
+  public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
+    String overWrite = 
chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
+    return (overWrite != null) &&
+        (!overWrite.isEmpty()) &&
+        (Boolean.valueOf(overWrite));
+  }
+
+  /**
+   * Validates chunk data and returns a file object to Chunk File that we are
+   * expected to write data to.
+   *
+   * @param pipeline - pipeline.
+   * @param data - container data.
+   * @param info - chunk info.
+   * @return File
+   * @throws StorageContainerException
+   */
+  public static File validateChunk(Pipeline pipeline, ContainerData data,
+      ChunkInfo info) throws StorageContainerException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    File chunkFile = getChunkFile(pipeline, data, info);
+    if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
+      if (!ChunkUtils.isOverWritePermitted(info)) {
+        log.error("Rejecting write chunk request. Chunk overwrite " +
+            "without explicit request. {}", info.toString());
+        throw new StorageContainerException("Rejecting write chunk request. " +
+            "OverWrite flag required." + info.toString(),
+            OVERWRITE_FLAG_REQUIRED);
+      }
+    }
+    return chunkFile;
+  }
+
+  /**
+   * Validates that Path to chunk file exists.
+   *
+   * @param pipeline - Container Info.
+   * @param data - Container Data
+   * @param info - Chunk info
+   * @return - File.
+   * @throws StorageContainerException
+   */
+  public static File getChunkFile(Pipeline pipeline, ContainerData data,
+      ChunkInfo info) throws StorageContainerException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+    if (data == null) {
+      log.error("Invalid container Name: {}", pipeline.getContainerName());
+      throw new StorageContainerException("Unable to find the container Name:" 
+
+          " " +
+          pipeline.getContainerName(), CONTAINER_NOT_FOUND);
+    }
+
+    File dataDir = ContainerUtils.getDataDirectory(data).toFile();
+    if (!dataDir.exists()) {
+      log.error("Unable to find the data directory: {}", dataDir);
+      throw new StorageContainerException("Unable to find the data directory:" 
+
+          " " + dataDir, UNABLE_TO_FIND_DATA_DIR);
+    }
+
+    return dataDir.toPath().resolve(info.getChunkName()).toFile();
+
+  }
+
+  /**
+   * Writes the data in chunk Info to the specified location in the chunkfile.
+   *
+   * @param chunkFile - File to write data to.
+   * @param chunkInfo - Data stream to write.
+   * @param data - The data buffer.
+   * @throws StorageContainerException
+   */
+  public static void writeData(File chunkFile, ChunkInfo chunkInfo,
+      byte[] data) throws
+      StorageContainerException, ExecutionException, InterruptedException,
+      NoSuchAlgorithmException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+    if (data.length != chunkInfo.getLen()) {
+      String err = String.format("data array does not match the length " +
+              "specified. DataLen: %d Byte Array: %d",
+          chunkInfo.getLen(), data.length);
+      log.error(err);
+      throw new StorageContainerException(err, INVALID_WRITE_SIZE);
+    }
+
+    AsynchronousFileChannel file = null;
+    FileLock lock = null;
+
+    try {
+      file =
+          AsynchronousFileChannel.open(chunkFile.toPath(),
+              StandardOpenOption.CREATE,
+              StandardOpenOption.WRITE,
+              StandardOpenOption.SPARSE,
+              StandardOpenOption.SYNC);
+      lock = file.lock().get();
+      if (chunkInfo.getChecksum() != null &&
+          !chunkInfo.getChecksum().isEmpty()) {
+        verifyChecksum(chunkInfo, data, log);
+      }
+      int size = file.write(ByteBuffer.wrap(data), 
chunkInfo.getOffset()).get();
+      if (size != data.length) {
+        log.error("Invalid write size found. Size:{}  Expected: {} ", size,
+            data.length);
+        throw new StorageContainerException("Invalid write size found. " +
+            "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
+      }
+    } catch (IOException e) {
+      throw new StorageContainerException(e, IO_EXCEPTION);
+
+    } finally {
+      if (lock != null) {
+        try {
+          lock.release();
+        } catch (IOException e) {
+          log.error("Unable to release lock ??, Fatal Error.");
+          throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
+
+        }
+      }
+      if (file != null) {
+        try {
+          file.close();
+        } catch (IOException e) {
+          throw new StorageContainerException("Error closing chunk file",
+              e, CONTAINER_INTERNAL_ERROR);
+        }
+      }
+    }
+  }
+
+  /**
+   * Verifies the checksum of a chunk against the data buffer.
+   *
+   * @param chunkInfo - Chunk Info.
+   * @param data - data buffer
+   * @param log - log
+   * @throws NoSuchAlgorithmException
+   * @throws StorageContainerException
+   */
+  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
+      log) throws NoSuchAlgorithmException, StorageContainerException {
+    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha.update(data);
+    if (!Hex.encodeHexString(sha.digest()).equals(
+        chunkInfo.getChecksum())) {
+      log.error("Checksum mismatch. Provided: {} , computed: {}",
+          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
+      throw new StorageContainerException("Checksum mismatch. Provided: " +
+          chunkInfo.getChecksum() + " , computed: " +
+          DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
+    }
+  }
+
+  /**
+   * Reads data from an existing chunk file.
+   *
+   * @param chunkFile - file where data lives.
+   * @param data - chunk definition.
+   * @return ByteBuffer
+   * @throws StorageContainerException
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+  public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
+      StorageContainerException, ExecutionException, InterruptedException,
+      NoSuchAlgorithmException {
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    if (!chunkFile.exists()) {
+      log.error("Unable to find the chunk file. chunk info : {}",
+          data.toString());
+      throw new StorageContainerException("Unable to find the chunk file. " +
+          "chunk info " +
+          data.toString(), UNABLE_TO_FIND_CHUNK);
+    }
+
+    AsynchronousFileChannel file = null;
+    FileLock lock = null;
+    try {
+      file =
+          AsynchronousFileChannel.open(chunkFile.toPath(),
+              StandardOpenOption.READ);
+      lock = file.lock(data.getOffset(), data.getLen(), true).get();
+
+      ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
+      file.read(buf, data.getOffset()).get();
+
+      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
+        verifyChecksum(data, buf.array(), log);
+      }
+
+      return buf;
+    } catch (IOException e) {
+      throw new StorageContainerException(e, IO_EXCEPTION);
+    } finally {
+      if (lock != null) {
+        try {
+          lock.release();
+        } catch (IOException e) {
+          log.error("I/O error is lock release.");
+        }
+      }
+      if (file != null) {
+        IOUtils.closeStream(file);
+      }
+    }
+  }
+
+  /**
+   * Returns a CreateContainer Response. This call is used by create and delete
+   * containers which have null success responses.
+   *
+   * @param msg Request
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+  /**
+   * Gets a response to the read chunk calls.
+   *
+   * @param msg - Msg
+   * @param data - Data
+   * @param info - Info
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg,
+      byte[] data, ChunkInfo info) {
+    Preconditions.checkNotNull(msg);
+
+    ContainerProtos.ReadChunkResponseProto.Builder response =
+        ContainerProtos.ReadChunkResponseProto.newBuilder();
+    response.setChunkData(info.getProtoBufMessage());
+    response.setData(ByteString.copyFrom(data));
+    response.setPipeline(msg.getReadChunk().getPipeline());
+
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
+            .SUCCESS, "");
+    builder.setReadChunk(response);
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
new file mode 100644
index 0000000..c29374c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.Time;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class maintains the information about a container in the ozone world.
+ * <p>
+ * A container is a name, along with metadata- which is a set of key value
+ * pair.
+ */
+public class ContainerData {
+
+  private final String containerName;
+  private final Map<String, String> metadata;
+  private String dbPath;  // Path to Level DB Store.
+  // Path to Physical file system where container and checksum are stored.
+  private String containerFilePath;
+  private String hash;
+  private AtomicLong bytesUsed;
+  private long maxSize;
+  private Long containerID;
+  private HddsProtos.LifeCycleState state;
+
+  /**
+   * Constructs a  ContainerData Object.
+   *
+   * @param containerName - Name
+   */
+  public ContainerData(String containerName, Long containerID,
+      Configuration conf) {
+    this.metadata = new TreeMap<>();
+    this.containerName = containerName;
+    this.maxSize = 
conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
+        ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
+    this.bytesUsed =  new AtomicLong(0L);
+    this.containerID = containerID;
+    this.state = HddsProtos.LifeCycleState.OPEN;
+  }
+
+  /**
+   * Constructs a ContainerData object from ProtoBuf classes.
+   *
+   * @param protoData - ProtoBuf Message
+   * @throws IOException
+   */
+  public static ContainerData getFromProtBuf(
+      ContainerProtos.ContainerData protoData, Configuration conf)
+      throws IOException {
+    ContainerData data = new ContainerData(protoData.getName(),
+        protoData.getContainerID(), conf);
+    for (int x = 0; x < protoData.getMetadataCount(); x++) {
+      data.addMetadata(protoData.getMetadata(x).getKey(),
+          protoData.getMetadata(x).getValue());
+    }
+
+    if (protoData.hasContainerPath()) {
+      data.setContainerPath(protoData.getContainerPath());
+    }
+
+    if (protoData.hasDbPath()) {
+      data.setDBPath(protoData.getDbPath());
+    }
+
+    if (protoData.hasState()) {
+      data.setState(protoData.getState());
+    }
+
+    if(protoData.hasHash()) {
+      data.setHash(protoData.getHash());
+    }
+
+    if (protoData.hasBytesUsed()) {
+      data.setBytesUsed(protoData.getBytesUsed());
+    }
+
+    if (protoData.hasSize()) {
+      data.setMaxSize(protoData.getSize());
+    }
+    return data;
+  }
+
+  /**
+   * Returns a ProtoBuf Message from ContainerData.
+   *
+   * @return Protocol Buffer Message
+   */
+  public ContainerProtos.ContainerData getProtoBufMessage() {
+    ContainerProtos.ContainerData.Builder builder = ContainerProtos
+        .ContainerData.newBuilder();
+    builder.setName(this.getContainerName());
+    builder.setContainerID(this.getContainerID());
+
+    if (this.getDBPath() != null) {
+      builder.setDbPath(this.getDBPath());
+    }
+
+    if (this.getHash() != null) {
+      builder.setHash(this.getHash());
+    }
+
+    if (this.getContainerPath() != null) {
+      builder.setContainerPath(this.getContainerPath());
+    }
+
+    builder.setState(this.getState());
+
+    for (Map.Entry<String, String> entry : metadata.entrySet()) {
+      HddsProtos.KeyValue.Builder keyValBuilder =
+          HddsProtos.KeyValue.newBuilder();
+      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
+
+    if (this.getBytesUsed() >= 0) {
+      builder.setBytesUsed(this.getBytesUsed());
+    }
+
+    if (this.getKeyCount() >= 0) {
+      builder.setKeyCount(this.getKeyCount());
+    }
+
+    if (this.getMaxSize() >= 0) {
+      builder.setSize(this.getMaxSize());
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Returns the name of the container.
+   *
+   * @return - name
+   */
+  public String getContainerName() {
+    return containerName;
+  }
+
+  /**
+   * Adds metadata.
+   */
+  public void addMetadata(String key, String value) throws IOException {
+    synchronized (this.metadata) {
+      if (this.metadata.containsKey(key)) {
+        throw new IOException("This key already exists. Key " + key);
+      }
+      metadata.put(key, value);
+    }
+  }
+
+  /**
+   * Returns all metadata.
+   */
+  public Map<String, String> getAllMetadata() {
+    synchronized (this.metadata) {
+      return Collections.unmodifiableMap(this.metadata);
+    }
+  }
+
+  /**
+   * Returns value of a key.
+   */
+  public String getValue(String key) {
+    synchronized (this.metadata) {
+      return metadata.get(key);
+    }
+  }
+
+  /**
+   * Deletes a metadata entry from the map.
+   *
+   * @param key - Key
+   */
+  public void deleteKey(String key) {
+    synchronized (this.metadata) {
+      metadata.remove(key);
+    }
+  }
+
+  /**
+   * Returns path.
+   *
+   * @return - path
+   */
+  public String getDBPath() {
+    return dbPath;
+  }
+
+  /**
+   * Sets path.
+   *
+   * @param path - String.
+   */
+  public void setDBPath(String path) {
+    this.dbPath = path;
+  }
+
+  /**
+   * This function serves as the generic key for ContainerCache class. Both
+   * ContainerData and ContainerKeyData overrides this function to 
appropriately
+   * return the right name that can  be used in ContainerCache.
+   *
+   * @return String Name.
+   */
+  public String getName() {
+    return getContainerName();
+  }
+
+  /**
+   * Get container file path.
+   * @return - Physical path where container file and checksum is stored.
+   */
+  public String getContainerPath() {
+    return containerFilePath;
+  }
+
+  /**
+   * Set container Path.
+   * @param containerPath - File path.
+   */
+  public void setContainerPath(String containerPath) {
+    this.containerFilePath = containerPath;
+  }
+
+  /**
+   * Get container ID.
+   * @return - container ID.
+   */
+  public synchronized Long getContainerID() {
+    return containerID;
+  }
+
+  public synchronized  void setState(HddsProtos.LifeCycleState state) {
+    this.state = state;
+  }
+
+  public synchronized HddsProtos.LifeCycleState getState() {
+    return this.state;
+  }
+
+  /**
+   * checks if the container is open.
+   * @return - boolean
+   */
+  public synchronized  boolean isOpen() {
+    return HddsProtos.LifeCycleState.OPEN == state;
+  }
+
+  /**
+   * Marks this container as closed.
+   */
+  public synchronized void closeContainer() {
+    // TODO: closed or closing here
+    setState(HddsProtos.LifeCycleState.CLOSED);
+
+    // Some thing brain dead for now. name + Time stamp of when we get the 
close
+    // container message.
+    setHash(DigestUtils.sha256Hex(this.getContainerName() +
+        Long.toString(Time.monotonicNow())));
+  }
+
+  /**
+   * Final hash for this container.
+   * @return - Hash
+   */
+  public String getHash() {
+    return hash;
+  }
+
+  public void setHash(String hash) {
+    this.hash = hash;
+  }
+
+  public void setMaxSize(long maxSize) {
+    this.maxSize = maxSize;
+  }
+
+  public long getMaxSize() {
+    return maxSize;
+  }
+
+  public long getKeyCount() {
+    return metadata.size();
+  }
+
+  public void setBytesUsed(long used) {
+    this.bytesUsed.set(used);
+  }
+
+  public long addBytesUsed(long delta) {
+    return this.bytesUsed.addAndGet(delta);
+  }
+
+  public long getBytesUsed() {
+    return bytesUsed.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
new file mode 100644
index 0000000..d4d732b
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ *
+ * This class is for maintaining  the various Storage Container
+ * DataNode statistics and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ *  for example:
+ *  <p> {@link #numOps}.inc()
+ *
+ */
[email protected]
+@Metrics(about="Storage Container DataNode Metrics", context="dfs")
+public class ContainerMetrics {
+  @Metric private MutableCounterLong numOps;
+  private MutableCounterLong[] numOpsArray;
+  private MutableCounterLong[] opsBytesArray;
+  private MutableRate[] opsLatency;
+  private MutableQuantiles[][] opsLatQuantiles;
+  private MetricsRegistry registry = null;
+
+  public ContainerMetrics(int[] intervals) {
+    int numEnumEntries = ContainerProtos.Type.values().length;
+    final int len = intervals.length;
+    this.numOpsArray = new MutableCounterLong[numEnumEntries];
+    this.opsBytesArray = new MutableCounterLong[numEnumEntries];
+    this.opsLatency = new MutableRate[numEnumEntries];
+    this.opsLatQuantiles = new MutableQuantiles[numEnumEntries][len];
+    this.registry = new MetricsRegistry("StorageContainerMetrics");
+    for (int i = 0; i < numEnumEntries; i++) {
+      numOpsArray[i] = registry.newCounter(
+          "num" + ContainerProtos.Type.valueOf(i + 1),
+          "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops",
+          (long) 0);
+      opsBytesArray[i] = registry.newCounter(
+          "bytes" + ContainerProtos.Type.valueOf(i + 1),
+          "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op",
+          (long) 0);
+      opsLatency[i] = registry.newRate(
+          "latency" + ContainerProtos.Type.valueOf(i + 1),
+          ContainerProtos.Type.valueOf(i + 1) + " op");
+
+      for (int j = 0; j < len; j++) {
+        int interval = intervals[j];
+        String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos"
+            + interval + "s";
+        opsLatQuantiles[i][j] = registry.newQuantiles(quantileName,
+            "latency of Container ops", "ops", "latency", interval);
+      }
+    }
+  }
+
+  public static ContainerMetrics create(Configuration conf) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals =
+             conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    return ms.register("StorageContainerMetrics",
+                       "Storage Container Node Metrics",
+                       new ContainerMetrics(intervals));
+  }
+
+  public void incContainerOpcMetrics(ContainerProtos.Type type){
+    numOps.incr();
+    numOpsArray[type.ordinal()].incr();
+  }
+
+  public long getContainerOpsMetrics(ContainerProtos.Type type){
+    return numOpsArray[type.ordinal()].value();
+  }
+
+  public void incContainerOpsLatencies(ContainerProtos.Type type,
+                                       long latencyNanos) {
+    opsLatency[type.ordinal()].add(latencyNanos);
+    for (MutableQuantiles q: opsLatQuantiles[type.ordinal()]) {
+      q.add(latencyNanos);
+    }
+  }
+
+  public void incContainerBytesStats(ContainerProtos.Type type, long bytes) {
+    opsBytesArray[type.ordinal()].incr(bytes);
+  }
+
+  public long getContainerBytesMetrics(ContainerProtos.Type type){
+    return opsBytesArray[type.ordinal()].value();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
new file mode 100644
index 0000000..50d2da3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
+
+/**
+ * Container Report iterates the closed containers and sends a container report
+ * to SCM.
+ */
+public class ContainerReport {
+  private static final int UNKNOWN = -1;
+  private final String containerName;
+  private final String finalhash;
+  private long size;
+  private long keyCount;
+  private long bytesUsed;
+  private long readCount;
+  private long writeCount;
+  private long readBytes;
+  private long writeBytes;
+  private long containerID;
+
+  public long getContainerID() {
+    return containerID;
+  }
+
+  public void setContainerID(long containerID) {
+    this.containerID = containerID;
+  }
+
+
+
+
+  /**
+   * Constructs the ContainerReport.
+   *
+   * @param containerName - Container Name.
+   * @param finalhash - Final Hash.
+   */
+  public ContainerReport(String containerName, String finalhash) {
+    this.containerName = containerName;
+    this.finalhash = finalhash;
+    this.size = UNKNOWN;
+    this.keyCount = UNKNOWN;
+    this.bytesUsed = 0L;
+    this.readCount = 0L;
+    this.readBytes = 0L;
+    this.writeCount = 0L;
+    this.writeBytes = 0L;
+  }
+
+  /**
+   * Gets a containerReport from protobuf class.
+   *
+   * @param info - ContainerInfo.
+   * @return - ContainerReport.
+   */
+  public static ContainerReport getFromProtoBuf(ContainerInfo info) {
+    Preconditions.checkNotNull(info);
+    ContainerReport report = new ContainerReport(info.getContainerName(),
+        info.getFinalhash());
+    if (info.hasSize()) {
+      report.setSize(info.getSize());
+    }
+    if (info.hasKeyCount()) {
+      report.setKeyCount(info.getKeyCount());
+    }
+    if (info.hasUsed()) {
+      report.setBytesUsed(info.getUsed());
+    }
+    if (info.hasReadCount()) {
+      report.setReadCount(info.getReadCount());
+    }
+    if (info.hasReadBytes()) {
+      report.setReadBytes(info.getReadBytes());
+    }
+    if (info.hasWriteCount()) {
+      report.setWriteCount(info.getWriteCount());
+    }
+    if (info.hasWriteBytes()) {
+      report.setWriteBytes(info.getWriteBytes());
+    }
+
+    report.setContainerID(info.getContainerID());
+    return report;
+  }
+
+  /**
+   * Gets the container name.
+   *
+   * @return - Name
+   */
+  public String getContainerName() {
+    return containerName;
+  }
+
+  /**
+   * Returns the final signature for this container.
+   *
+   * @return - hash
+   */
+  public String getFinalhash() {
+    return finalhash;
+  }
+
+  /**
+   * Returns a positive number it is a valid number, -1 if not known.
+   *
+   * @return size or -1
+   */
+  public long getSize() {
+    return size;
+  }
+
+  /**
+   * Sets the size of the container on disk.
+   *
+   * @param size - int
+   */
+  public void setSize(long size) {
+    this.size = size;
+  }
+
+  /**
+   * Gets number of keys in the container if known.
+   *
+   * @return - Number of keys or -1 for not known.
+   */
+  public long getKeyCount() {
+    return keyCount;
+  }
+
+  /**
+   * Sets the key count.
+   *
+   * @param keyCount - Key Count
+   */
+  public void setKeyCount(long keyCount) {
+    this.keyCount = keyCount;
+  }
+
+  public long getReadCount() {
+    return readCount;
+  }
+
+  public void setReadCount(long readCount) {
+    this.readCount = readCount;
+  }
+
+  public long getWriteCount() {
+    return writeCount;
+  }
+
+  public void setWriteCount(long writeCount) {
+    this.writeCount = writeCount;
+  }
+
+  public long getReadBytes() {
+    return readBytes;
+  }
+
+  public void setReadBytes(long readBytes) {
+    this.readBytes = readBytes;
+  }
+
+  public long getWriteBytes() {
+    return writeBytes;
+  }
+
+  public void setWriteBytes(long writeBytes) {
+    this.writeBytes = writeBytes;
+  }
+
+  public long getBytesUsed() {
+    return bytesUsed;
+  }
+
+  public void setBytesUsed(long bytesUsed) {
+    this.bytesUsed = bytesUsed;
+  }
+
+  /**
+   * Gets a containerInfo protobuf message from ContainerReports.
+   *
+   * @return ContainerInfo
+   */
+  public ContainerInfo getProtoBufMessage() {
+    return ContainerInfo.newBuilder()
+        .setContainerName(this.getContainerName())
+        .setKeyCount(this.getKeyCount())
+        .setSize(this.getSize())
+        .setUsed(this.getBytesUsed())
+        .setReadCount(this.getReadCount())
+        .setReadBytes(this.getReadBytes())
+        .setWriteCount(this.getWriteCount())
+        .setWriteBytes(this.getWriteBytes())
+        .setFinalhash(this.getFinalhash())
+        .setContainerID(this.getContainerID())
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
new file mode 100644
index 0000000..1818188
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.apache.commons.io.FilenameUtils.removeExtension;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .INVALID_ARGUMENT;
+import static org.apache.hadoop.hdds.protocol.proto.ContainerProtos.Result
+    .UNABLE_TO_FIND_DATA_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
+
+/**
+ * A set of helper functions to create proper responses.
+ */
+public final class ContainerUtils {
+
+  private ContainerUtils() {
+    //never constructed.
+  }
+
+  /**
+   * Returns a CreateContainer Response. This call is used by create and delete
+   * containers which have null success responses.
+   *
+   * @param msg Request
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+    return builder.build();
+  }
+
+  /**
+   * Returns a ReadContainer Response.
+   *
+   * @param msg Request
+   * @param containerData - data
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto 
msg,
+      ContainerData containerData) {
+    Preconditions.checkNotNull(containerData);
+
+    ContainerProtos.ReadContainerResponseProto.Builder response =
+        ContainerProtos.ReadContainerResponseProto.newBuilder();
+    response.setContainerData(containerData.getProtoBufMessage());
+
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+    builder.setReadContainer(response);
+    return builder.build();
+  }
+
+  /**
+   * We found a command type but no associated payload for the command. Hence
+   * return malformed Command as response.
+   *
+   * @param msg - Protobuf message.
+   * @param result - result
+   * @param message - Error message.
+   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto.Builder
+      getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
+      ContainerProtos.Result result, String message) {
+    return
+        ContainerProtos.ContainerCommandResponseProto.newBuilder()
+            .setCmdType(msg.getCmdType())
+            .setTraceID(msg.getTraceID())
+            .setResult(result)
+            .setMessage(message);
+  }
+
+  /**
+   * Logs the error and returns a response to the caller.
+   *
+   * @param log - Logger
+   * @param ex - Exception
+   * @param msg - Request Object
+   * @return Response
+   */
+  public static ContainerProtos.ContainerCommandResponseProto 
logAndReturnError(
+      Logger log, StorageContainerException ex,
+      ContainerProtos.ContainerCommandRequestProto msg) {
+    log.info("Operation: {} : Trace ID: {} : Message: {} : Result: {}",
+        msg.getCmdType().name(), msg.getTraceID(),
+        ex.getMessage(), ex.getResult().getValueDescriptor().getName());
+    return getContainerResponse(msg, ex.getResult(), ex.getMessage()).build();
+  }
+
+  /**
+   * Logs the error and returns a response to the caller.
+   *
+   * @param log - Logger
+   * @param ex - Exception
+   * @param msg - Request Object
+   * @return Response
+   */
+  public static ContainerProtos.ContainerCommandResponseProto 
logAndReturnError(
+      Logger log, RuntimeException ex,
+      ContainerProtos.ContainerCommandRequestProto msg) {
+    log.info("Operation: {} : Trace ID: {} : Message: {} ",
+        msg.getCmdType().name(), msg.getTraceID(), ex.getMessage());
+    return getContainerResponse(msg, INVALID_ARGUMENT, 
ex.getMessage()).build();
+  }
+
+  /**
+   * We found a command type but no associated payload for the command. Hence
+   * return malformed Command as response.
+   *
+   * @param msg - Protobuf message.
+   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
+    return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST,
+        "Cmd type does not match the payload.").build();
+  }
+
+  /**
+   * We found a command type that is not supported yet.
+   *
+   * @param msg - Protobuf message.
+   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
+    return getContainerResponse(msg, 
ContainerProtos.Result.UNSUPPORTED_REQUEST,
+        "Server does not support this command yet.").build();
+  }
+
+  /**
+   * get containerName from a container file.
+   *
+   * @param containerFile - File
+   * @return Name of the container.
+   */
+  public static String getContainerNameFromFile(File containerFile) {
+    Preconditions.checkNotNull(containerFile);
+    return Paths.get(containerFile.getParent()).resolve(
+        removeExtension(containerFile.getName())).toString();
+  }
+
+  /**
+   * Verifies that this in indeed a new container.
+   *
+   * @param containerFile - Container File to verify
+   * @param metadataFile - metadata File to verify
+   * @throws IOException
+   */
+  public static void verifyIsNewContainer(File containerFile, File 
metadataFile)
+      throws IOException {
+    Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
+    if (containerFile.exists()) {
+      log.error("container already exists on disk. File: {}",
+          containerFile.toPath());
+      throw new FileAlreadyExistsException("container already exists on " +
+          "disk.");
+    }
+
+    if (metadataFile.exists()) {
+      log.error("metadata found on disk, but missing container. Refusing to" +
+          " write this container. File: {} ", metadataFile.toPath());
+      throw new FileAlreadyExistsException(("metadata found on disk, but " +
+          "missing container. Refusing to write this container."));
+    }
+
+    File parentPath = new File(containerFile.getParent());
+
+    if (!parentPath.exists() && !parentPath.mkdirs()) {
+      log.error("Unable to create parent path. Path: {}",
+          parentPath.toString());
+      throw new IOException("Unable to create container directory.");
+    }
+
+    if (!containerFile.createNewFile()) {
+      log.error("creation of a new container file failed. File: {}",
+          containerFile.toPath());
+      throw new IOException("creation of a new container file failed.");
+    }
+
+    if (!metadataFile.createNewFile()) {
+      log.error("creation of the metadata file failed. File: {}",
+          metadataFile.toPath());
+      throw new IOException("creation of a new container file failed.");
+    }
+  }
+
+  public static String getContainerDbFileName(String containerName) {
+    return containerName + OzoneConsts.DN_CONTAINER_DB;
+  }
+
+  /**
+   * creates a Metadata DB for the specified container.
+   *
+   * @param containerPath - Container Path.
+   * @throws IOException
+   */
+  public static Path createMetadata(Path containerPath, String containerName,
+      Configuration conf)
+      throws IOException {
+    Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
+    Preconditions.checkNotNull(containerPath);
+    Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
+    if (!metadataPath.toFile().mkdirs()) {
+      log.error("Unable to create directory for metadata storage. Path: {}",
+          metadataPath);
+      throw new IOException("Unable to create directory for metadata storage." 
+
+          " Path: " + metadataPath);
+    }
+    MetadataStore store = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(metadataPath
+            .resolve(getContainerDbFileName(containerName)).toFile())
+        .build();
+
+    // we close since the SCM pre-creates containers.
+    // we will open and put Db handle into a cache when keys are being created
+    // in a container.
+
+    store.close();
+
+    Path dataPath = containerPath.resolve(OzoneConsts.CONTAINER_DATA_PATH);
+    if (!dataPath.toFile().mkdirs()) {
+
+      // If we failed to create data directory, we cleanup the
+      // metadata directory completely. That is, we will delete the
+      // whole directory including LevelDB file.
+      log.error("Unable to create directory for data storage. cleaning up the" 
+
+              " container path: {} dataPath: {}",
+          containerPath, dataPath);
+      FileUtils.deleteDirectory(containerPath.toFile());
+      throw new IOException("Unable to create directory for data storage." +
+          " Path: " + dataPath);
+    }
+    return metadataPath;
+  }
+
+  /**
+   * Returns Metadata location.
+   *
+   * @param containerData - Data
+   * @param location - Path
+   * @return Path
+   */
+  public static File getMetadataFile(ContainerData containerData,
+      Path location) {
+    return location.resolve(containerData
+        .getContainerName().concat(CONTAINER_META))
+        .toFile();
+  }
+
+  /**
+   * Returns container file location.
+   *
+   * @param containerData - Data
+   * @param location - Root path
+   * @return Path
+   */
+  public static File getContainerFile(ContainerData containerData,
+      Path location) {
+    return location.resolve(containerData
+        .getContainerName().concat(CONTAINER_EXTENSION))
+        .toFile();
+  }
+
+  /**
+   * Container metadata directory -- here is where the level DB lives.
+   *
+   * @param cData - cData.
+   * @return Path to the parent directory where the DB lives.
+   */
+  public static Path getMetadataDirectory(ContainerData cData) {
+    Path dbPath = Paths.get(cData.getDBPath());
+    Preconditions.checkNotNull(dbPath);
+    Preconditions.checkState(dbPath.toString().length() > 0);
+    return dbPath.getParent();
+  }
+
+  /**
+   * Returns the path where data or chunks live for a given container.
+   *
+   * @param cData - cData container
+   * @return - Path
+   * @throws StorageContainerException
+   */
+  public static Path getDataDirectory(ContainerData cData)
+      throws StorageContainerException {
+    Path path = getMetadataDirectory(cData);
+    Preconditions.checkNotNull(path);
+    Path parentPath = path.getParent();
+    if (parentPath == null) {
+      throw new StorageContainerException("Unable to get Data directory."
+          + path, UNABLE_TO_FIND_DATA_DIR);
+    }
+    return parentPath.resolve(OzoneConsts.CONTAINER_DATA_PATH);
+  }
+
+  /**
+   * remove Container if it is empty.
+   * <p/>
+   * There are three things we need to delete.
+   * <p/>
+   * 1. Container file and metadata file. 2. The Level DB file 3. The path that
+   * we created on the data location.
+   *
+   * @param containerData - Data of the container to remove.
+   * @param conf - configuration of the cluster.
+   * @param forceDelete - whether this container should be deleted forcibly.
+   * @throws IOException
+   */
+  public static void removeContainer(ContainerData containerData,
+      Configuration conf, boolean forceDelete) throws IOException {
+    Preconditions.checkNotNull(containerData);
+    Path dbPath = Paths.get(containerData.getDBPath());
+
+    MetadataStore db = KeyUtils.getDB(containerData, conf);
+    // If the container is not empty and cannot be deleted forcibly,
+    // then throw a SCE to stop deleting.
+    if(!forceDelete && !db.isEmpty()) {
+      throw new StorageContainerException(
+          "Container cannot be deleted because it is not empty.",
+          ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+    }
+    // Close the DB connection and remove the DB handler from cache
+    KeyUtils.removeDB(containerData, conf);
+
+    // Delete the DB File.
+    FileUtils.forceDelete(dbPath.toFile());
+    dbPath = dbPath.getParent();
+
+    // Delete all Metadata in the Data directories for this containers.
+    if (dbPath != null) {
+      FileUtils.deleteDirectory(dbPath.toFile());
+      dbPath = dbPath.getParent();
+    }
+
+    // now delete the container directory, this means that all key data dirs
+    // will be removed too.
+    if (dbPath != null) {
+      FileUtils.deleteDirectory(dbPath.toFile());
+    }
+
+    // Delete the container metadata from the metadata locations.
+    String rootPath = getContainerNameFromFile(new File(containerData
+        .getContainerPath()));
+    Path containerPath = Paths.get(rootPath.concat(CONTAINER_EXTENSION));
+    Path metaPath = Paths.get(rootPath.concat(CONTAINER_META));
+
+    FileUtils.forceDelete(containerPath.toFile());
+    FileUtils.forceDelete(metaPath.toFile());
+  }
+
+  /**
+   * Persistent a {@link DatanodeDetails} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  public synchronized static void writeDatanodeDetailsTo(
+      DatanodeDetails datanodeDetails, File path) throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the datanode ID file.");
+      }
+    } else {
+      if(!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create datanode ID directories.");
+      }
+    }
+    try (FileOutputStream out = new FileOutputStream(path)) {
+      HddsProtos.DatanodeDetailsProto proto =
+          datanodeDetails.getProtoBufMessage();
+      proto.writeTo(out);
+    }
+  }
+
+  /**
+   * Read {@link DatanodeDetails} from a local ID file.
+   *
+   * @param path ID file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the id file is malformed or other I/O exceptions
+   */
+  public synchronized static DatanodeDetails readDatanodeDetailsFrom(File path)
+      throws IOException {
+    if (!path.exists()) {
+      throw new IOException("Datanode ID file not found.");
+    }
+    try(FileInputStream in = new FileInputStream(path)) {
+      return DatanodeDetails.getFromProtoBuf(
+          HddsProtos.DatanodeDetailsProto.parseFrom(in));
+    } catch (IOException e) {
+      throw new IOException("Failed to parse DatanodeDetails from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
new file mode 100644
index 0000000..ade162a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class to wrap the info about under deletion container blocks.
+ */
+public final class DeletedContainerBlocksSummary {
+
+  private final List<DeletedBlocksTransaction> blocks;
+  // key : txID
+  // value : times of this tx has been processed
+  private final Map<Long, Integer> txSummary;
+  // key : container name
+  // value : the number of blocks need to be deleted in this container
+  // if the message contains multiple entries for same block,
+  // blocks will be merged
+  private final Map<String, Integer> blockSummary;
+  // total number of blocks in this message
+  private int numOfBlocks;
+
+  private DeletedContainerBlocksSummary(List<DeletedBlocksTransaction> blocks) 
{
+    this.blocks = blocks;
+    txSummary = Maps.newHashMap();
+    blockSummary = Maps.newHashMap();
+    blocks.forEach(entry -> {
+      txSummary.put(entry.getTxID(), entry.getCount());
+      if (blockSummary.containsKey(entry.getContainerName())) {
+        blockSummary.put(entry.getContainerName(),
+            blockSummary.get(entry.getContainerName())
+                + entry.getBlockIDCount());
+      } else {
+        blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
+      }
+      numOfBlocks += entry.getBlockIDCount();
+    });
+  }
+
+  public static DeletedContainerBlocksSummary getFrom(
+      List<DeletedBlocksTransaction> blocks) {
+    return new DeletedContainerBlocksSummary(blocks);
+  }
+
+  public int getNumOfBlocks() {
+    return numOfBlocks;
+  }
+
+  public int getNumOfContainers() {
+    return blockSummary.size();
+  }
+
+  public String getTXIDs() {
+    return String.join(",", txSummary.keySet()
+        .stream().map(String::valueOf).collect(Collectors.toList()));
+  }
+
+  public String getTxIDSummary() {
+    List<String> txSummaryEntry = txSummary.entrySet().stream()
+        .map(entry -> entry.getKey() + "(" + entry.getValue() + ")")
+        .collect(Collectors.toList());
+    return "[" + String.join(",", txSummaryEntry) + "]";
+  }
+
+  @Override public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (DeletedBlocksTransaction blks : blocks) {
+      sb.append(" ")
+          .append("TXID=")
+          .append(blks.getTxID())
+          .append(", ")
+          .append("TimesProceed=")
+          .append(blks.getCount())
+          .append(", ")
+          .append(blks.getContainerName())
+          .append(" : [")
+          .append(String.join(",", blks.getBlockIDList())).append("]")
+          .append("\n");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
new file mode 100644
index 0000000..566db02
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+
+/**
+ * File Utils are helper routines used by putSmallFile and getSmallFile
+ * RPCs.
+ */
+public final class FileUtils {
+  /**
+   * Never Constructed.
+   */
+  private FileUtils() {
+  }
+
+  /**
+   * Gets a response for the putSmallFile RPC.
+   * @param msg - ContainerCommandRequestProto
+   * @return - ContainerCommandResponseProto
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getPutFileResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+    ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
+        ContainerProtos.PutSmallFileResponseProto.newBuilder();
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
+            .SUCCESS, "");
+    builder.setCmdType(ContainerProtos.Type.PutSmallFile);
+    builder.setPutSmallFile(getResponse);
+    return  builder.build();
+  }
+
+  /**
+   * Gets a response to the read small file call.
+   * @param msg - Msg
+   * @param data  - Data
+   * @param info  - Info
+   * @return    Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getGetSmallFileResponse(ContainerProtos.ContainerCommandRequestProto msg,
+      byte[] data, ChunkInfo info) {
+    Preconditions.checkNotNull(msg);
+
+    ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
+        ContainerProtos.ReadChunkResponseProto.newBuilder();
+    readChunkresponse.setChunkData(info.getProtoBufMessage());
+    readChunkresponse.setData(ByteString.copyFrom(data));
+    
readChunkresponse.setPipeline(msg.getGetSmallFile().getKey().getPipeline());
+
+    ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
+        ContainerProtos.GetSmallFileResponseProto.newBuilder();
+    getSmallFile.setData(readChunkresponse.build());
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
+            .SUCCESS, "");
+    builder.setCmdType(ContainerProtos.Type.GetSmallFile);
+    builder.setGetSmallFile(getSmallFile);
+    return builder.build();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to