HDFS-10268. Ozone: end-to-end integration for create/get volumes, buckets and 
keys. Contributed by Chris Nauroth.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fedb22d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fedb22d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fedb22d9

Branch: refs/heads/HDFS-7240
Commit: fedb22d9b642da94f6cd3fb79239924708ec34eb
Parents: b3044db
Author: Anu Engineer <aengin...@apache.org>
Authored: Thu Apr 7 14:38:54 2016 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Thu Apr 7 14:38:54 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/datanode/DataNode.java   |  25 +-
 .../server/datanode/ObjectStoreHandler.java     |  60 ++++-
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   2 +
 .../container/common/helpers/ChunkUtils.java    |   3 +-
 .../ozone/container/common/impl/Dispatcher.java |   6 +-
 .../common/transport/client/XceiverClient.java  |  14 +-
 .../transport/client/XceiverClientHandler.java  |   2 +-
 .../transport/client/XceiverClientManager.java  |  83 ++++++
 .../common/transport/client/package-info.java   |  24 ++
 .../common/transport/server/package-info.java   |  24 ++
 .../ozone/storage/StorageContainerManager.java  | 110 +++++++-
 .../hadoop/ozone/web/client/OzoneBucket.java    |   9 +-
 .../ozone/web/exceptions/OzoneException.java    |   2 +-
 .../hadoop/ozone/web/request/OzoneQuota.java    |  10 +
 .../ozone/web/storage/ChunkInputStream.java     | 193 ++++++++++++++
 .../ozone/web/storage/ChunkOutputStream.java    | 193 ++++++++++++++
 .../web/storage/ContainerProtocolCalls.java     | 198 ++++++++++++++
 .../web/storage/DistributedStorageHandler.java  | 266 +++++++++++++++----
 .../web/storage/OzoneContainerTranslation.java  | 261 ++++++++++++++++++
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |  71 +++--
 .../ozone/web/TestOzoneRestWithMiniCluster.java | 253 ++++++++++++++++++
 21 files changed, 1695 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a5d5015..ff71653 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1846,16 +1846,6 @@ public class DataNode extends ReconfigurableBase
   public void shutdown() {
     stopMetricsLogger();
 
-    if(this.ozoneEnabled) {
-      if(ozoneServer != null) {
-        try {
-          ozoneServer.stop();
-        } catch (Exception e) {
-          LOG.error("Error is ozone shutdown. ex {}", e.toString());
-        }
-      }
-    }
-
     if (plugins != null) {
       for (ServicePlugin p : plugins) {
         try {
@@ -1914,6 +1904,21 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
+    // Stop the object store handler
+    if (this.objectStoreHandler != null) {
+      this.objectStoreHandler.close();
+    }
+
+    if(this.ozoneEnabled) {
+      if(ozoneServer != null) {
+        try {
+          ozoneServer.stop();
+        } catch (Exception e) {
+          LOG.error("Error is ozone shutdown. ex {}", e.toString());
+        }
+      }
+    }
+
     if (pauseMonitor != null) {
       pauseMonitor.stop();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
index 6413ac0..b8c6a13 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
@@ -17,36 +17,58 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static 
com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
-import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_DEFAULT_PORT;
+import static 
com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
+import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
 import com.sun.jersey.api.container.ContainerFactory;
 import com.sun.jersey.api.core.ApplicationAdapter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.ObjectStoreApplication;
 import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
 import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler;
 import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Implements object store handling within the DataNode process.  This class is
  * responsible for initializing and maintaining the RPC clients and servers and
  * the web application required for the object store implementation.
  */
-public final class ObjectStoreHandler {
+public final class ObjectStoreHandler implements Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
 
   private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
 
   /**
    * Creates a new ObjectStoreHandler.
@@ -57,14 +79,32 @@ public final class ObjectStoreHandler {
   public ObjectStoreHandler(Configuration conf) throws IOException {
     String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY,
         DFS_STORAGE_HANDLER_TYPE_DEFAULT);
+    LOG.info("ObjectStoreHandler initializing with {}: {}",
+        DFS_STORAGE_HANDLER_TYPE_KEY, shType);
     boolean ozoneTrace = conf.getBoolean(DFS_OBJECTSTORE_TRACE_ENABLED_KEY,
         DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT);
     final StorageHandler storageHandler;
+
+    // Initialize Jersey container for object store web application.
     if ("distributed".equalsIgnoreCase(shType)) {
-      storageHandler = new DistributedStorageHandler();
+      RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+          ProtobufRpcEngine.class);
+      long version =
+          RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+      InetSocketAddress address = conf.getSocketAddr(
+          DFS_STORAGE_RPC_BIND_HOST_KEY, DFS_STORAGE_RPC_ADDRESS_KEY,
+          DFS_STORAGE_RPC_ADDRESS_DEFAULT, DFS_STORAGE_RPC_DEFAULT_PORT);
+      this.storageContainerLocationClient =
+          new StorageContainerLocationProtocolClientSideTranslatorPB(
+              RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
+              address, UserGroupInformation.getCurrentUser(), conf,
+              NetUtils.getDefaultSocketFactory(conf), 
Client.getTimeout(conf)));
+      storageHandler = new DistributedStorageHandler(new OzoneConfiguration(),
+          this.storageContainerLocationClient);
     } else {
       if ("local".equalsIgnoreCase(shType)) {
         storageHandler = new LocalStorageHandler(conf);
+        this.storageContainerLocationClient = null;
       } else {
         throw new IllegalArgumentException(
             String.format("Unrecognized value for %s: %s",
@@ -91,4 +131,12 @@ public final class ObjectStoreHandler {
   public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() {
     return this.objectStoreJerseyContainer;
   }
+
+  @Override
+  public void close() {
+    LOG.info("Closing ObjectStoreHandler.");
+    if (this.storageContainerLocationClient != null) {
+      this.storageContainerLocationClient.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index bebbb78..1ffaa2f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -68,6 +68,8 @@ public final class OzoneConsts {
   public static final String FILE_HASH = "SHA-256";
   public final static String CHUNK_OVERWRITE = "OverWriteRequested";
 
+  public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
+
   /**
    * Supports Bucket Versioning.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
index 15e4524..b4c8aa6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
@@ -169,7 +169,8 @@ public final class ChunkUtils {
               StandardOpenOption.SPARSE,
               StandardOpenOption.SYNC);
       lock = file.lock().get();
-      if (!chunkInfo.getChecksum().isEmpty()) {
+      if (chunkInfo.getChecksum() != null &&
+          !chunkInfo.getChecksum().isEmpty()) {
         verifyChecksum(chunkInfo, data, log);
       }
       int size = file.write(ByteBuffer.wrap(data), 
chunkInfo.getOffset()).get();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index 66ff1ba..bad1d23 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -127,7 +127,7 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getCreateContainer().getContainerData().getName(),
           msg.getCmdType().name(),
           msg.getTraceID(),
-          ex.toString());
+          ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
       return ContainerUtils.getContainerResponse(msg,
@@ -169,7 +169,7 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getCreateContainer().getContainerData().getName(),
           msg.getCmdType().name(),
           msg.getTraceID(),
-          ex.toString());
+          ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
       return ContainerUtils.getContainerResponse(msg,
@@ -210,7 +210,7 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getCreateContainer().getContainerData().getName(),
           msg.getCmdType().name(),
           msg.getTraceID(),
-          ex.toString());
+          ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
       return ContainerUtils.getContainerResponse(msg,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
index 3b9ba8d..e6d914a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
@@ -34,12 +34,13 @@ import 
org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 /**
  * A Client for the storageContainer protocol.
  */
-public class XceiverClient {
+public class XceiverClient implements Closeable {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
   private final Pipeline pipeline;
   private final Configuration config;
@@ -92,6 +93,7 @@ public class XceiverClient {
   /**
    * Close the client.
    */
+  @Override
   public void close() {
     if(group != null) {
       group.shutdownGracefully();
@@ -103,6 +105,16 @@ public class XceiverClient {
   }
 
   /**
+   * Returns the pipeline of machines that host the container used by this
+   * client.
+   *
+   * @return pipeline of machines that host the container
+   */
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  /**
    * Sends a given command to server and gets the reply back.
    * @param request Request
    * @return Response to the command

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
index a219e4e..c9a3ad3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
@@ -94,7 +94,7 @@ public class XceiverClientHandler extends
     ContainerProtos.ContainerCommandResponseProto response;
     channel.writeAndFlush(request);
     boolean interrupted = false;
-    for (; ; ) {
+    for (;;) {
       try {
         response = responses.take();
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java
new file mode 100644
index 0000000..8123ae9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+
+/**
+ * XceiverClientManager is responsible for the lifecycle of XceiverClient
+ * instances.  Callers use this class to acquire an XceiverClient instance
+ * connected to the desired container pipeline.  When done, the caller also 
uses
+ * this class to release the previously acquired XceiverClient instance.
+ *
+ * This class may evolve to implement efficient lifecycle management policies 
by
+ * caching container location information and pooling connected client 
instances
+ * for reuse without needing to reestablish a socket connection.  The current
+ * implementation simply allocates and closes a new instance every time.
+ */
+public class XceiverClientManager {
+
+  private final OzoneConfiguration conf;
+
+  /**
+   * Creates a new XceiverClientManager.
+   *
+   * @param conf configuration
+   */
+  public XceiverClientManager(OzoneConfiguration conf) {
+    Preconditions.checkNotNull(conf);
+    this.conf = conf;
+  }
+
+  /**
+   * Acquires a XceiverClient connected to a container capable of storing the
+   * specified key.
+   *
+   * @param pipeline the container pipeline for the client connection
+   * @return XceiverClient connected to a container
+   * @throws IOException if an XceiverClient cannot be acquired
+   */
+  public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
+    Preconditions.checkNotNull(pipeline);
+    Preconditions.checkArgument(pipeline.getMachines() != null);
+    Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
+    XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
+    try {
+      xceiverClient.connect();
+    } catch (Exception e) {
+      throw new IOException("Exception connecting XceiverClient.", e);
+    }
+    return xceiverClient;
+  }
+
+  /**
+   * Releases an XceiverClient after use.
+   *
+   * @param xceiverClient client to release
+   */
+  public void releaseClient(XceiverClient xceiverClient) {
+    Preconditions.checkNotNull(xceiverClient);
+    xceiverClient.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java
new file mode 100644
index 0000000..d3c0278
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+/**
+ * This package contains classes for the client of the storage container
+ * protocol.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
new file mode 100644
index 0000000..59c96f1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+/**
+ * This package contains classes for the server of the storage container
+ * protocol.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
index 90e200a..16863c9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
@@ -48,10 +48,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -81,6 +86,10 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import 
org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
 import org.apache.hadoop.ozone.protocol.LocatedContainer;
 import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
 import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
@@ -94,11 +103,14 @@ import org.apache.hadoop.util.StringUtils;
  *
  * The current implementation is a stub suitable to begin end-to-end testing of
  * Ozone service interactions.  DataNodes report to StorageContainerManager
- * using the existing heartbeat messages.  StorageContainerManager tells 
clients
- * container locations by reporting that all registered nodes are a viable
- * location.  This will evolve from a stub to a full-fledged implementation
- * capable of partitioning the keyspace across multiple containers, with
- * appropriate distribution across nodes.
+ * using the existing heartbeat messages.  StorageContainerManager lazily
+ * initializes a single storage container to be served by those DataNodes.
+ * All subsequent requests for container locations will reply with that single
+ * pipeline, using all registered nodes.
+ *
+ * This will evolve from a stub to a full-fledged implementation capable of
+ * partitioning the keyspace across multiple containers, with appropriate
+ * distribution across nodes.
  */
 @InterfaceAudience.Private
 public class StorageContainerManager
@@ -109,6 +121,8 @@ public class StorageContainerManager
 
   private final StorageContainerNameService ns;
   private final BlockManager blockManager;
+  private final XceiverClientManager xceiverClientManager;
+  private Pipeline singlePipeline;
 
   /** The RPC server that listens to requests from DataNodes. */
   private final RPC.Server serviceRpcServer;
@@ -128,11 +142,12 @@ public class StorageContainerManager
    *
    * @param conf configuration
    */
-  public StorageContainerManager(Configuration conf)
+  public StorageContainerManager(OzoneConfiguration conf)
       throws IOException {
     ns = new StorageContainerNameService();
     boolean haEnabled = false;
     blockManager = new BlockManager(ns, haEnabled, conf);
+    xceiverClientManager = new XceiverClientManager(conf);
 
     RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
@@ -193,20 +208,20 @@ public class StorageContainerManager
   public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
       throws IOException {
     LOG.trace("getStorageContainerLocations keys = {}", keys);
+    Pipeline pipeline = initSingleContainerPipeline();
     List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
     if (liveNodes.isEmpty()) {
       throw new IOException("Storage container locations not found.");
     }
-    String containerName = UUID.randomUUID().toString();
     Set<DatanodeInfo> locations =
         Sets.<DatanodeInfo>newLinkedHashSet(liveNodes);
     DatanodeInfo leader = liveNodes.get(0);
     Set<LocatedContainer> locatedContainers =
         Sets.newLinkedHashSetWithExpectedSize(keys.size());
     for (String key: keys) {
-      locatedContainers.add(new LocatedContainer(key, key, containerName,
-          locations, leader));
+      locatedContainers.add(new LocatedContainer(key, key,
+          pipeline.getContainerName(), locations, leader));
     }
     LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}",
         keys, locatedContainers);
@@ -416,6 +431,56 @@ public class StorageContainerManager
   }
 
   /**
+   * Lazily initializes a single container pipeline using all registered
+   * DataNodes via a synchronous call to the container protocol.  This single
+   * container pipeline will be reused for container requests for the lifetime
+   * of this StorageContainerManager.
+   *
+   * @throws IOException if there is an I/O error
+   */
+  private synchronized Pipeline initSingleContainerPipeline()
+      throws IOException {
+    if (singlePipeline == null) {
+      List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
+      blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
+      if (liveNodes.isEmpty()) {
+        throw new IOException("Storage container locations not found.");
+      }
+      Pipeline newPipeline = newPipelineFromNodes(liveNodes,
+          UUID.randomUUID().toString());
+      XceiverClient xceiverClient =
+          xceiverClientManager.acquireClient(newPipeline);
+      try {
+        ContainerData containerData = ContainerData
+            .newBuilder()
+            .setName(newPipeline.getContainerName())
+            .build();
+        CreateContainerRequestProto createContainerRequest =
+            CreateContainerRequestProto.newBuilder()
+            .setPipeline(newPipeline.getProtobufMessage())
+            .setContainerData(containerData)
+            .build();
+        ContainerCommandRequestProto request = ContainerCommandRequestProto
+            .newBuilder()
+            .setCmdType(Type.CreateContainer)
+            .setCreateContainer(createContainerRequest)
+            .build();
+        ContainerCommandResponseProto response = xceiverClient.sendCommand(
+            request);
+        Result result = response.getResult();
+        if (result != Result.SUCCESS) {
+          throw new IOException(
+              "Failed to initialize container due to result code: " + result);
+        }
+        singlePipeline = newPipeline;
+      } finally {
+        xceiverClientManager.releaseClient(xceiverClient);
+      }
+    }
+    return singlePipeline;
+  }
+
+  /**
    * Builds a message for logging startup information about an RPC server.
    *
    * @param description RPC server description
@@ -430,6 +495,25 @@ public class StorageContainerManager
   }
 
   /**
+   * Translates a list of nodes, ordered such that the first is the leader, 
into
+   * a corresponding {@link Pipeline} object.
+   *
+   * @param nodes list of nodes
+   * @param containerName container name
+   * @return pipeline corresponding to nodes
+   */
+  private static Pipeline newPipelineFromNodes(List<DatanodeDescriptor> nodes,
+      String containerName) {
+    String leaderId = nodes.get(0).getDatanodeUuid();
+    Pipeline pipeline = new Pipeline(leaderId);
+    for (DatanodeDescriptor node : nodes) {
+      pipeline.addMember(node);
+    }
+    pipeline.setContainerName(containerName);
+    return pipeline;
+  }
+
+  /**
    * Starts an RPC server, if configured.
    *
    * @param conf configuration
@@ -443,7 +527,7 @@ public class StorageContainerManager
    * @return RPC server, or null if addr is null
    * @throws IOException if there is an I/O error while creating RPC server
    */
-  private static RPC.Server startRpcServer(Configuration conf,
+  private static RPC.Server startRpcServer(OzoneConfiguration conf,
       InetSocketAddress addr, Class<?> protocol, BlockingService instance,
       String bindHostKey, String handlerCountKey, int handlerCountDefault)
       throws IOException {
@@ -480,7 +564,7 @@ public class StorageContainerManager
    * @param rpcServer started RPC server.  If null, then the server was not
    *     started, and this method is a no-op.
    */
-  private static InetSocketAddress updateListenAddress(Configuration conf,
+  private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
       String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
     if (rpcServer == null) {
       return null;
@@ -502,7 +586,7 @@ public class StorageContainerManager
     StringUtils.startupShutdownMessage(
         StorageContainerManager.class, argv, LOG);
     StorageContainerManager scm = new StorageContainerManager(
-        new Configuration());
+        new OzoneConfiguration());
     scm.start();
     scm.join();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
index 4df303a..3441bf9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
@@ -174,9 +174,12 @@ public class OzoneBucket {
 
       InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING));
       putRequest.setEntity(new InputStreamEntity(is, data.length()));
-      putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
-      putRequest
-          .setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length()));
+      is.mark(data.length());
+      try {
+        putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
+      } finally {
+        is.reset();
+      }
       executePutKey(putRequest, httpClient);
 
     } catch (IOException | URISyntaxException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java
index 2bd4a69..c2e64da 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java
@@ -92,7 +92,7 @@ public class OzoneException extends Exception {
    */
   public OzoneException(long httpCode, String shortMessage, String message) {
     this.shortMessage = shortMessage;
-    this.resource = message;
+    this.message = message;
     this.httpCode = httpCode;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
index ca6ddeb..501b239 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
@@ -78,6 +78,16 @@ public class OzoneQuota {
   }
 
   /**
+   * Formats a quota as a string.
+   *
+   * @param quota the quota to format
+   * @return string representation of quota
+   */
+  public static String formatQuota(OzoneQuota quota) {
+    return String.valueOf(quota.size) + quota.unit;
+  }
+
+  /**
    * Parses a user provided string and returns the
    * Quota Object.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
new file mode 100644
index 0000000..166e71c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import 
org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+
+/**
+ * An {@link InputStream} used by the REST service in combination with the
+ * {@link DistributedStorageHandler} to read the value of a key from a sequence
+ * of container chunks.  All bytes of the key value are stored in container
+ * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
+ * instances.  This class encapsulates all state management for iterating
+ * through the sequence of chunks and the sequence of buffers within each 
chunk.
+ */
+class ChunkInputStream extends InputStream {
+
+  private static final int EOF = -1;
+
+  private final String key;
+  private final UserArgs args;
+  private XceiverClientManager xceiverClientManager;
+  private XceiverClient xceiverClient;
+  private List<ChunkInfo> chunks;
+  private int chunkOffset;
+  private List<ByteBuffer> buffers;
+  private int bufferOffset;
+
+  /**
+   * Creates a new ChunkInputStream.
+   *
+   * @param key chunk key
+   * @param xceiverClientManager client manager that controls client
+   * @param xceiverClient client to perform container calls
+   * @param chunks list of chunks to read
+   * @param args container protocol call args
+   */
+  public ChunkInputStream(String key, XceiverClientManager 
xceiverClientManager,
+      XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) {
+    this.key = key;
+    this.args = args;
+    this.xceiverClientManager = xceiverClientManager;
+    this.xceiverClient = xceiverClient;
+    this.chunks = chunks;
+    this.chunkOffset = 0;
+    this.buffers = null;
+    this.bufferOffset = 0;
+  }
+
+  @Override
+  public synchronized int read()
+      throws IOException {
+    checkOpen();
+    int available = prepareRead(1);
+    return available == EOF ? EOF : buffers.get(bufferOffset).get();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread 
and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get 
key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    checkOpen();
+    int available = prepareRead(len);
+    if (available == EOF) {
+      return EOF;
+    }
+    buffers.get(bufferOffset).get(b, off, available);
+    return available;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (xceiverClientManager != null && xceiverClient != null) {
+      xceiverClientManager.releaseClient(xceiverClient);
+      xceiverClientManager = null;
+      xceiverClient = null;
+    }
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("ChunkInputStream has been closed.");
+    }
+  }
+
+  /**
+   * Prepares to read by advancing through chunks and buffers as needed until 
it
+   * finds data to return or encounters EOF.
+   *
+   * @param len desired length of data to read
+   * @return length of data available to read, possibly less than desired 
length
+   */
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (chunks == null || chunks.isEmpty()) {
+        // This must be an empty key.
+        return EOF;
+      } else if (buffers == null) {
+        // The first read triggers fetching the first chunk.
+        readChunkFromContainer(0);
+      } else if (!buffers.isEmpty() &&
+          buffers.get(bufferOffset).hasRemaining()) {
+        // Data is available from the current buffer.
+        ByteBuffer bb = buffers.get(bufferOffset);
+        return len > bb.remaining() ? bb.remaining() : len;
+      } else if (!buffers.isEmpty() &&
+          !buffers.get(bufferOffset).hasRemaining() &&
+          bufferOffset < buffers.size() - 1) {
+        // There are additional buffers available.
+        ++bufferOffset;
+      } else if (chunkOffset < chunks.size() - 1) {
+        // There are additional chunks available.
+        readChunkFromContainer(chunkOffset + 1);
+      } else {
+        // All available input has been consumed.
+        return EOF;
+      }
+    }
+  }
+
+  /**
+   * Attempts to read the chunk at the specified offset in the chunk list.  If
+   * successful, then the data of the read chunk is saved so that its bytes can
+   * be returned from subsequent read calls.
+   *
+   * @param readChunkOffset offset in the chunk list of which chunk to read
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void readChunkFromContainer(int readChunkOffset)
+      throws IOException {
+    final ReadChunkResponseProto readChunkResponse;
+    try {
+      readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset),
+          key, args);
+    } catch (OzoneException e) {
+      throw new IOException("Unexpected OzoneException", e);
+    }
+    chunkOffset = readChunkOffset;
+    ByteString byteString = readChunkResponse.getData();
+    buffers = byteString.asReadOnlyByteBufferList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
new file mode 100644
index 0000000..d4e639f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
+import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import 
org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+
+/**
+ * An {@link OutputStream} used by the REST service in combination with the
+ * {@link DistributedStorageHandler} to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written 
to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the 
corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+class ChunkOutputStream extends OutputStream {
+
+  private final String containerKey;
+  private final KeyInfo key;
+  private final UserArgs args;
+  private final KeyData.Builder containerKeyData;
+  private XceiverClientManager xceiverClientManager;
+  private XceiverClient xceiverClient;
+  private ByteBuffer buffer;
+  private final String streamId;
+  private int chunkIndex;
+
+  /**
+   * Creates a new ChunkOutputStream.
+   *
+   * @param containerKey container key
+   * @param key chunk key
+   * @param xceiverClientManager client manager that controls client
+   * @param xceiverClient client to perform container calls
+   * @param args container protocol call args
+   */
+  public ChunkOutputStream(String containerKey, KeyInfo key,
+      XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
+      UserArgs args) {
+    this.containerKey = containerKey;
+    this.key = key;
+    this.args = args;
+    this.containerKeyData = fromKeyToContainerKeyDataBuilder(
+        xceiverClient.getPipeline().getContainerName(), containerKey, key);
+    this.xceiverClientManager = xceiverClientManager;
+    this.xceiverClient = xceiverClient;
+    this.buffer = ByteBuffer.allocate(CHUNK_SIZE);
+    this.streamId = UUID.randomUUID().toString();
+    this.chunkIndex = 0;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    checkOpen();
+    int rollbackPosition = buffer.position();
+    int rollbackLimit = buffer.limit();
+    buffer.put((byte)b);
+    if (buffer.position() == CHUNK_SIZE) {
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    checkOpen();
+    if (buffer.position() > 0) {
+      int rollbackPosition = buffer.position();
+      int rollbackLimit = buffer.limit();
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (xceiverClientManager != null && xceiverClient != null &&
+        buffer != null) {
+      try {
+        if (buffer.position() > 0) {
+          writeChunkToContainer();
+        }
+        putKey(xceiverClient, containerKeyData.build(), args);
+      } catch (OzoneException e) {
+        throw new IOException("Unexpected OzoneException", e);
+      } finally {
+        xceiverClientManager.releaseClient(xceiverClient);
+        xceiverClientManager = null;
+        xceiverClient = null;
+        buffer = null;
+      }
+    }
+
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("ChunkOutputStream has been closed.");
+    }
+  }
+
+  /**
+   * Attempts to flush buffered writes by writing a new chunk to the container.
+   * If successful, then clears the buffer to prepare to receive writes for a
+   * new chunk.
+   *
+   * @param rollbackPosition position to restore in buffer if write fails
+   * @param rollbackLimit limit to restore in buffer if write fails
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void flushBufferToChunk(int rollbackPosition,
+      int rollbackLimit) throws IOException {
+    boolean success = false;
+    try {
+      writeChunkToContainer();
+      success = true;
+    } finally {
+      if (success) {
+        buffer.clear();
+      } else {
+        buffer.position(rollbackPosition);
+        buffer.limit(rollbackLimit);
+      }
+    }
+  }
+
+  /**
+   * Writes buffered data as a new chunk to the container and saves chunk
+   * information to be used later in putKey call.
+   *
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void writeChunkToContainer() throws IOException {
+    buffer.flip();
+    ByteString data = ByteString.copyFrom(buffer);
+    ChunkInfo chunk = ChunkInfo
+        .newBuilder()
+        .setChunkName(
+            key.getKeyName() + "_stream_" + streamId + "_chunk_" + 
++chunkIndex)
+        .setOffset(0)
+        .setLen(data.size())
+        .build();
+    try {
+      writeChunk(xceiverClient, chunk, key.getKeyName(), data, args);
+    } catch (OzoneException e) {
+      throw new IOException("Unexpected OzoneException", e);
+    }
+    containerKeyData.addChunks(chunk);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
new file mode 100644
index 0000000..4cb3ab9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+
+/**
+ * Implementation of all container protocol calls performed by
+ * {@link DistributedStorageHandler}.
+ */
+final class ContainerProtocolCalls {
+
+  /**
+   * Calls the container protocol to get a container key.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerKeyData key data to identify container
+   * @param args container protocol call args
+   * @returns container protocol get key response
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
+      KeyData containerKeyData, UserArgs args) throws IOException,
+      OzoneException {
+    GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyData(containerKeyData);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.GetKey)
+        .setTraceID(args.getRequestID())
+        .setGetKey(readKeyRequest)
+        .build();
+    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+    return response.getGetKey();
+  }
+
+  /**
+   * Calls the container protocol to put a container key.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerKeyData key data to identify container
+   * @param args container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static void putKey(XceiverClient xceiverClient,
+      KeyData containerKeyData, UserArgs args) throws IOException,
+      OzoneException {
+    PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyData(containerKeyData);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.PutKey)
+        .setTraceID(args.getRequestID())
+        .setPutKey(createKeyRequest)
+        .build();
+    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+  }
+
+  /**
+   * Calls the container protocol to read a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to read
+   * @param key the key name
+   * @param args container protocol call args
+   * @returns container protocol read chunk response
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
+      ChunkInfo chunk, String key, UserArgs args)
+      throws IOException, OzoneException {
+    ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyName(key)
+        .setChunkData(chunk);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.ReadChunk)
+        .setTraceID(args.getRequestID())
+        .setReadChunk(readChunkRequest)
+        .build();
+    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+    return response.getReadChunk();
+  }
+
+  /**
+   * Calls the container protocol to write a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to write
+   * @param key the key name
+   * @param data the data of the chunk to write
+   * @param args container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
+      String key, ByteString data, UserArgs args)
+      throws IOException, OzoneException {
+    WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyName(key)
+        .setChunkData(chunk)
+        .setData(data);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.WriteChunk)
+        .setTraceID(args.getRequestID())
+        .setWriteChunk(writeChunkRequest)
+        .build();
+    ContainerCommandResponseProto response = 
xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+  }
+
+  /**
+   * Validates a response from a container protocol call.  Any non-successful
+   * return code is mapped to a corresponding exception and thrown.
+   *
+   * @param response container protocol call response
+   * @param args container protocol call args
+   * @throws OzoneException if the container protocol call failed
+   */
+  private static void validateContainerResponse(
+      ContainerCommandResponseProto response, UserArgs args)
+      throws OzoneException {
+    switch (response.getResult()) {
+    case SUCCESS:
+      break;
+    case MALFORMED_REQUEST:
+      throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST,
+          "badRequest", "Bad container request."), args);
+    case UNSUPPORTED_REQUEST:
+      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
+          "internalServerError", "Unsupported container request."), args);
+    case CONTAINER_INTERNAL_ERROR:
+      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
+          "internalServerError", "Container internal error."), args);
+    default:
+      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
+          "internalServerError", "Unrecognized container response."), args);
+    }
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private ContainerProtocolCalls() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 9cb8430..8d4868c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -18,7 +18,32 @@
 
 package org.apache.hadoop.ozone.web.storage;
 
+import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import 
org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import 
org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
+import org.apache.hadoop.ozone.protocol.LocatedContainer;
+import 
org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
 import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@@ -27,13 +52,13 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
 import org.apache.hadoop.ozone.web.response.ListBuckets;
 import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
-
-import java.io.IOException;
-import java.io.OutputStream;
+import org.apache.hadoop.ozone.web.response.VolumeOwner;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link StorageHandler} implementation that distributes object storage
@@ -41,156 +66,283 @@ import java.io.OutputStream;
  */
 public final class DistributedStorageHandler implements StorageHandler {
 
-  @Override
-  public void createVolume(VolumeArgs args) throws
-      IOException, OzoneException {
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocation;
+  private final XceiverClientManager xceiverClientManager;
+
+  /**
+   * Creates a new DistributedStorageHandler.
+   *
+   * @param conf configuration
+   * @param storageContainerLocation StorageContainerLocationProtocol proxy
+   */
+  public DistributedStorageHandler(OzoneConfiguration conf,
+      StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocation) {
+    this.storageContainerLocation = storageContainerLocation;
+    this.xceiverClientManager = new XceiverClientManager(conf);
+  }
 
+  @Override
+  public void createVolume(VolumeArgs args) throws IOException, OzoneException 
{
+    String containerKey = buildContainerKey(args.getVolumeName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      VolumeInfo volume = new VolumeInfo();
+      volume.setVolumeName(args.getVolumeName());
+      volume.setQuota(args.getQuota());
+      volume.setOwner(new VolumeOwner(args.getUserName()));
+      volume.setCreatedOn(dateToString(new Date()));
+      volume.setCreatedBy(args.getAdminName());
+      KeyData containerKeyData = fromVolumeToContainerKeyData(
+          xceiverClient.getPipeline().getContainerName(), containerKey, 
volume);
+      putKey(xceiverClient, containerKeyData, args);
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
   @Override
   public void setVolumeOwner(VolumeArgs args) throws
       IOException, OzoneException {
-
+    throw new UnsupportedOperationException("setVolumeOwner not implemented");
   }
 
   @Override
   public void setVolumeQuota(VolumeArgs args, boolean remove)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("setVolumeQuota not implemented");
   }
 
   @Override
   public boolean checkVolumeAccess(VolumeArgs args)
       throws IOException, OzoneException {
-    return false;
+    throw new UnsupportedOperationException("checkVolumeAccessnot 
implemented");
   }
 
   @Override
   public ListVolumes listVolumes(UserArgs args)
       throws IOException, OzoneException {
-    return null;
+    throw new UnsupportedOperationException("listVolumes not implemented");
   }
 
   @Override
   public void deleteVolume(VolumeArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("deleteVolume not implemented");
   }
 
   @Override
   public VolumeInfo getVolumeInfo(VolumeArgs args)
       throws IOException, OzoneException {
-    return null;
+    String containerKey = buildContainerKey(args.getVolumeName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      KeyData containerKeyData = containerKeyDataForRead(
+          xceiverClient.getPipeline().getContainerName(), containerKey);
+      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
+          args);
+      return fromContainerKeyValueListToVolume(
+          response.getKeyData().getMetadataList());
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
   @Override
-  public void createBucket(BucketArgs args)
+  public void createBucket(final BucketArgs args)
       throws IOException, OzoneException {
-
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      BucketInfo bucket = new BucketInfo();
+      bucket.setVolumeName(args.getVolumeName());
+      bucket.setBucketName(args.getBucketName());
+      bucket.setAcls(args.getAddAcls());
+      bucket.setVersioning(args.getVersioning());
+      bucket.setStorageType(args.getStorageType());
+      KeyData containerKeyData = fromBucketToContainerKeyData(
+          xceiverClient.getPipeline().getContainerName(), containerKey, 
bucket);
+      putKey(xceiverClient, containerKeyData, args);
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
   @Override
   public void setBucketAcls(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("setBucketAcls not implemented");
   }
 
   @Override
   public void setBucketVersioning(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException(
+        "setBucketVersioning not implemented");
   }
 
   @Override
   public void setBucketStorageClass(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException(
+        "setBucketStorageClass not implemented");
   }
 
   @Override
   public void deleteBucket(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("deleteBucket not implemented");
   }
 
   @Override
   public void checkBucketAccess(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException(
+        "checkBucketAccess not implemented");
   }
 
   @Override
   public ListBuckets listBuckets(VolumeArgs args)
       throws IOException, OzoneException {
-    return null;
+    throw new UnsupportedOperationException("listBuckets not implemented");
   }
 
   @Override
   public BucketInfo getBucketInfo(BucketArgs args)
       throws IOException, OzoneException {
-    return null;
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      KeyData containerKeyData = containerKeyDataForRead(
+          xceiverClient.getPipeline().getContainerName(), containerKey);
+      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
+          args);
+      return fromContainerKeyValueListToBucket(
+          response.getKeyData().getMetadataList());
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
-  /**
-   * Writes a key in an existing bucket.
-   *
-   * @param args KeyArgs
-   * @return InputStream
-   * @throws OzoneException
-   */
   @Override
   public OutputStream newKeyWriter(KeyArgs args) throws IOException,
       OzoneException {
-    return null;
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName(), args.getKeyName());
+    KeyInfo key = new KeyInfo();
+    key.setKeyName(args.getKeyName());
+    key.setCreatedOn(dateToString(new Date()));
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    return new ChunkOutputStream(containerKey, key, xceiverClientManager,
+        xceiverClient, args);
   }
 
-  /**
-   * Tells the file system that the object has been written out completely and
-   * it can do any house keeping operation that needs to be done.
-   *
-   * @param args   Key Args
-   * @param stream
-   * @throws IOException
-   */
   @Override
   public void commitKey(KeyArgs args, OutputStream stream) throws
       IOException, OzoneException {
+    stream.close();
+  }
+
+  @Override
+  public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
+      OzoneException {
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName(), args.getKeyName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    boolean success = false;
+    try {
+      KeyData containerKeyData = containerKeyDataForRead(
+          xceiverClient.getPipeline().getContainerName(), containerKey);
+      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
+          args);
+      long length = 0;
+      List<ChunkInfo> chunks = response.getKeyData().getChunksList();
+      for (ChunkInfo chunk : chunks) {
+        length += chunk.getLen();
+      }
+      success = true;
+      return new LengthInputStream(new ChunkInputStream(
+          containerKey, xceiverClientManager, xceiverClient, chunks, args),
+          length);
+    } finally {
+      if (!success) {
+        xceiverClientManager.releaseClient(xceiverClient);
+      }
+    }
+  }
+
+  @Override
+  public void deleteKey(KeyArgs args) throws IOException, OzoneException {
+    throw new UnsupportedOperationException("deleteKey not implemented");
+  }
 
+  @Override
+  public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
+    throw new UnsupportedOperationException("listKeys not implemented");
   }
 
   /**
-   * Reads a key from an existing bucket.
+   * Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes
+   * capable of serving container protocol operations.  The container is
+   * selected based on the specified container key.
    *
-   * @param args KeyArgs
-   * @return LengthInputStream
-   * @throws IOException
+   * @param containerKey container key
+   * @return XceiverClient connected to a container
+   * @throws IOException if an XceiverClient cannot be acquired
    */
-  @Override
-  public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
-      OzoneException {
-    return null;
+  private XceiverClient acquireXceiverClient(String containerKey)
+      throws IOException {
+    Set<LocatedContainer> locatedContainers =
+        storageContainerLocation.getStorageContainerLocations(
+            new HashSet<>(Arrays.asList(containerKey)));
+    Pipeline pipeline = newPipelineFromLocatedContainer(
+        locatedContainers.iterator().next());
+    return xceiverClientManager.acquireClient(pipeline);
   }
 
   /**
-   * Deletes an existing key.
+   * Creates a container key from any number of components by combining all
+   * components with a delimiter.
    *
-   * @param args KeyArgs
-   * @throws OzoneException
+   * @param parts container key components
+   * @return container key
    */
-  @Override
-  public void deleteKey(KeyArgs args) throws IOException, OzoneException {
+  private static String buildContainerKey(String... parts) {
+    return '/' + StringUtils.join('/', parts);
+  }
 
+  /**
+   * Formats a date in the expected string format.
+   *
+   * @param date the date to format
+   * @return formatted string representation of date
+   */
+  private static String dateToString(Date date) {
+    SimpleDateFormat sdf =
+        new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
+    sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
+    return sdf.format(date);
   }
 
   /**
-   * Returns a list of Key.
+   * Translates a set of container locations, ordered such that the first is 
the
+   * leader, into a corresponding {@link Pipeline} object.
    *
-   * @param args KeyArgs
-   * @return BucketList
-   * @throws IOException
+   * @param locatedContainer container location
+   * @return pipeline corresponding to container locations
    */
-  @Override
-  public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
-    return null;
+  private static Pipeline newPipelineFromLocatedContainer(
+      LocatedContainer locatedContainer) {
+    Set<DatanodeInfo> locations = locatedContainer.getLocations();
+    String leaderId = locations.iterator().next().getDatanodeUuid();
+    Pipeline pipeline = new Pipeline(leaderId);
+    for (DatanodeInfo location : locations) {
+      pipeline.addMember(location);
+    }
+    pipeline.setContainerName(locatedContainer.getContainerName());
+    return pipeline;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fedb22d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
new file mode 100644
index 0000000..9333fe6
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
+import org.apache.hadoop.ozone.web.response.BucketInfo;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.response.VolumeOwner;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class contains methods that define the translation between the Ozone
+ * domain model and the storage container domain model.
+ */
+final class OzoneContainerTranslation {
+
+  private static final String ACLS = "ACLS";
+  private static final String BUCKET = "BUCKET";
+  private static final String BUCKET_NAME = "BUCKET_NAME";
+  private static final String CREATED_BY = "CREATED_BY";
+  private static final String CREATED_ON = "CREATED_ON";
+  private static final String KEY = "KEY";
+  private static final String OWNER = "OWNER";
+  private static final String QUOTA = "QUOTA";
+  private static final String STORAGE_TYPE = "STORAGE_TYPE";
+  private static final String TYPE = "TYPE";
+  private static final String VERSIONING = "VERSIONING";
+  private static final String VOLUME = "VOLUME";
+  private static final String VOLUME_NAME = "VOLUME_NAME";
+
+  /**
+   * Creates key data intended for reading a container key.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @return KeyData intended for reading the container key
+   */
+  public static KeyData containerKeyDataForRead(String containerName,
+      String containerKey) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .build();
+  }
+
+  /**
+   * Translates a bucket to its container representation.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param bucket the bucket to translate
+   * @return KeyData representation of bucket
+   */
+  public static KeyData fromBucketToContainerKeyData(
+      String containerName, String containerKey, BucketInfo bucket) {
+    KeyData.Builder containerKeyData = KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, BUCKET))
+        .addMetadata(newKeyValue(VOLUME_NAME, bucket.getVolumeName()))
+        .addMetadata(newKeyValue(BUCKET_NAME, bucket.getBucketName()));
+
+    if (bucket.getAcls() != null) {
+      containerKeyData.addMetadata(newKeyValue(ACLS,
+          StringUtils.join(',', bucket.getAcls())));
+    }
+
+    if (bucket.getVersioning() != null &&
+        bucket.getVersioning() != Versioning.NOT_DEFINED) {
+      containerKeyData.addMetadata(newKeyValue(VERSIONING,
+          bucket.getVersioning().name()));
+    }
+
+    if (bucket.getStorageType() != StorageType.RAM_DISK) {
+      containerKeyData.addMetadata(newKeyValue(STORAGE_TYPE,
+          bucket.getStorageType().name()));
+    }
+
+    return containerKeyData.build();
+  }
+
+  /**
+   * Translates a bucket from its container representation.
+   *
+   * @param metadata container metadata representing the bucket
+   * @return bucket translated from container representation
+   */
+  public static BucketInfo fromContainerKeyValueListToBucket(
+      List<KeyValue> metadata) {
+    BucketInfo bucket = new BucketInfo();
+    for (KeyValue keyValue : metadata) {
+      switch (keyValue.getKey()) {
+      case VOLUME_NAME:
+        bucket.setVolumeName(keyValue.getValue());
+        break;
+      case BUCKET_NAME:
+        bucket.setBucketName(keyValue.getValue());
+        break;
+      case VERSIONING:
+        bucket.setVersioning(
+            Enum.valueOf(Versioning.class, keyValue.getValue()));
+        break;
+      case STORAGE_TYPE:
+        bucket.setStorageType(
+            Enum.valueOf(StorageType.class, keyValue.getValue()));
+        break;
+      default:
+        break;
+      }
+    }
+    return bucket;
+  }
+
+  /**
+   * Translates a volume from its container representation.
+   *
+   * @param metadata container metadata representing the volume
+   * @return volume translated from container representation
+   */
+  public static VolumeInfo fromContainerKeyValueListToVolume(
+      List<KeyValue> metadata) {
+    VolumeInfo volume = new VolumeInfo();
+    for (KeyValue keyValue : metadata) {
+      switch (keyValue.getKey()) {
+      case VOLUME_NAME:
+        volume.setVolumeName(keyValue.getValue());
+        break;
+      case CREATED_BY:
+        volume.setCreatedBy(keyValue.getValue());
+        break;
+      case CREATED_ON:
+        volume.setCreatedOn(keyValue.getValue());
+        break;
+      case OWNER:
+        volume.setOwner(new VolumeOwner(keyValue.getValue()));
+        break;
+      case QUOTA:
+        volume.setQuota(OzoneQuota.parseQuota(keyValue.getValue()));
+        break;
+      default:
+        break;
+      }
+    }
+    return volume;
+  }
+
+  /**
+   * Translates a key to its container representation.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param keyInfo key information received from call
+   * @return KeyData intended for reading the container key
+   */
+  public static KeyData fromKeyToContainerKeyData(String containerName,
+      String containerKey, KeyInfo key) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, KEY))
+        .build();
+  }
+
+  /**
+   * Translates a key to its container representation.  The return value is a
+   * builder that can be manipulated further before building the result.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param keyInfo key information received from call
+   * @return KeyData builder
+   */
+  public static KeyData.Builder fromKeyToContainerKeyDataBuilder(
+      String containerName, String containerKey, KeyInfo key) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, KEY));
+  }
+
+  /**
+   * Translates a volume to its container representation.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param volume the volume to translate
+   * @return KeyData representation of volume
+   */
+  public static KeyData fromVolumeToContainerKeyData(
+      String containerName, String containerKey, VolumeInfo volume) {
+    KeyData.Builder containerKeyData = KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, VOLUME))
+        .addMetadata(newKeyValue(VOLUME_NAME, volume.getVolumeName()))
+        .addMetadata(newKeyValue(CREATED_ON, volume.getCreatedOn()));
+
+    if (volume.getQuota() != null && volume.getQuota().sizeInBytes() != -1L) {
+      containerKeyData.addMetadata(newKeyValue(QUOTA,
+          OzoneQuota.formatQuota(volume.getQuota())));
+    }
+
+    if (volume.getOwner() != null && volume.getOwner().getName() != null &&
+        !volume.getOwner().getName().isEmpty()) {
+      containerKeyData.addMetadata(newKeyValue(OWNER,
+          volume.getOwner().getName()));
+    }
+
+    if (volume.getCreatedBy() != null && !volume.getCreatedBy().isEmpty()) {
+      containerKeyData.addMetadata(
+          newKeyValue(CREATED_BY, volume.getCreatedBy()));
+    }
+
+    return containerKeyData.build();
+  }
+
+  /**
+   * Translates a key-value pair to its container representation.
+   *
+   * @param key the key
+   * @param value the value
+   * @return container representation of key-value pair
+   */
+  private static KeyValue newKeyValue(String key, Object value) {
+    return 
KeyValue.newBuilder().setKey(key).setValue(value.toString()).build();
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private OzoneContainerTranslation() {
+  }
+}

Reply via email to