http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java
new file mode 100644
index 0000000..0630df2
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.container;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java
new file mode 100644
index 0000000..4762d55
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdsl/client/pom.xml b/hadoop-hdsl/client/pom.xml
deleted file mode 100644
index 1f1eaf0..0000000
--- a/hadoop-hdsl/client/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.hadoop</groupId>
-    <artifactId>hadoop-hdsl</artifactId>
-    <version>3.2.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>hadoop-hdsl-client</artifactId>
-  <version>3.2.0-SNAPSHOT</version>
-  <description>Apache Hadoop HDSL Client libraries</description>
-  <name>Apache Hadoop HDSL Client</name>
-  <packaging>jar</packaging>
-
-  <properties>
-    <hadoop.component>hdsl</hadoop.component>
-    <is.hadoop.component>true</is.hadoop.component>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdsl-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
-    </dependency>
-
-  </dependencies>
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
deleted file mode 100644
index c77f965..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client;
-
-import java.text.ParseException;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.scm.ScmConfigKeys;
-
-import com.google.common.base.Preconditions;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility methods for Ozone and Container Clients.
- *
- * The methods to retrieve SCM service endpoints assume there is a single
- * SCM service instance. This will change when we switch to replicated service
- * instances for redundancy.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public final class OzoneClientUtils {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      OzoneClientUtils.class);
-
-  private static final int NO_PORT = -1;
-
-  private OzoneClientUtils() {
-  }
-
-  /**
-   * Date format that used in ozone. Here the format is thread safe to use.
-   */
-  private static final ThreadLocal<DateTimeFormatter> DATE_FORMAT =
-      ThreadLocal.withInitial(() -> {
-        DateTimeFormatter format =
-            DateTimeFormatter.ofPattern(OzoneConsts.OZONE_DATE_FORMAT);
-        return format.withZone(ZoneId.of(OzoneConsts.OZONE_TIME_ZONE));
-      });
-
-  /**
-   * Returns the cache value to be used for list calls.
-   * @param conf Configuration object
-   * @return list cache size
-   */
-  public static int getListCacheSize(Configuration conf) {
-    return conf.getInt(OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE,
-        OzoneConfigKeys.OZONE_CLIENT_LIST_CACHE_SIZE_DEFAULT);
-  }
-
-  /**
-   * @return a default instance of {@link CloseableHttpClient}.
-   */
-  public static CloseableHttpClient newHttpClient() {
-    return OzoneClientUtils.newHttpClient(new OzoneConfiguration());
-  }
-
-  /**
-   * Returns a {@link CloseableHttpClient} configured by given configuration.
-   * If conf is null, returns a default instance.
-   *
-   * @param conf configuration
-   * @return a {@link CloseableHttpClient} instance.
-   */
-  public static CloseableHttpClient newHttpClient(Configuration conf) {
-    long socketTimeout = OzoneConfigKeys
-        .OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT;
-    long connectionTimeout = OzoneConfigKeys
-        .OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT;
-    if (conf != null) {
-      socketTimeout = conf.getTimeDuration(
-          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
-          OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
-          TimeUnit.MILLISECONDS);
-      connectionTimeout = conf.getTimeDuration(
-          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT,
-          OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_DEFAULT,
-          TimeUnit.MILLISECONDS);
-    }
-
-    CloseableHttpClient client = HttpClients.custom()
-        .setDefaultRequestConfig(
-            RequestConfig.custom()
-                .setSocketTimeout(Math.toIntExact(socketTimeout))
-                .setConnectTimeout(Math.toIntExact(connectionTimeout))
-                .build())
-        .build();
-    return client;
-  }
-
-  /**
-   * verifies that bucket name / volume name is a valid DNS name.
-   *
-   * @param resName Bucket or volume Name to be validated
-   *
-   * @throws IllegalArgumentException
-   */
-  public static void verifyResourceName(String resName)
-      throws IllegalArgumentException {
-
-    if (resName == null) {
-      throw new IllegalArgumentException("Bucket or Volume name is null");
-    }
-
-    if ((resName.length() < OzoneConsts.OZONE_MIN_BUCKET_NAME_LENGTH) ||
-        (resName.length() > OzoneConsts.OZONE_MAX_BUCKET_NAME_LENGTH)) {
-      throw new IllegalArgumentException(
-          "Bucket or Volume length is illegal, " +
-              "valid length is 3-63 characters");
-    }
-
-    if ((resName.charAt(0) == '.') || (resName.charAt(0) == '-')) {
-      throw new IllegalArgumentException(
-          "Bucket or Volume name cannot start with a period or dash");
-    }
-
-    if ((resName.charAt(resName.length() - 1) == '.') ||
-        (resName.charAt(resName.length() - 1) == '-')) {
-      throw new IllegalArgumentException(
-          "Bucket or Volume name cannot end with a period or dash");
-    }
-
-    boolean isIPv4 = true;
-    char prev = (char) 0;
-
-    for (int index = 0; index < resName.length(); index++) {
-      char currChar = resName.charAt(index);
-
-      if (currChar != '.') {
-        isIPv4 = ((currChar >= '0') && (currChar <= '9')) && isIPv4;
-      }
-
-      if (currChar > 'A' && currChar < 'Z') {
-        throw new IllegalArgumentException(
-            "Bucket or Volume name does not support uppercase characters");
-      }
-
-      if ((currChar != '.') && (currChar != '-')) {
-        if ((currChar < '0') || (currChar > '9' && currChar < 'a') ||
-            (currChar > 'z')) {
-          throw new IllegalArgumentException("Bucket or Volume name has an " +
-              "unsupported character : " +
-              currChar);
-        }
-      }
-
-      if ((prev == '.') && (currChar == '.')) {
-        throw new IllegalArgumentException("Bucket or Volume name should not " 
+
-            "have two contiguous periods");
-      }
-
-      if ((prev == '-') && (currChar == '.')) {
-        throw new IllegalArgumentException(
-            "Bucket or Volume name should not have period after dash");
-      }
-
-      if ((prev == '.') && (currChar == '-')) {
-        throw new IllegalArgumentException(
-            "Bucket or Volume name should not have dash after period");
-      }
-      prev = currChar;
-    }
-
-    if (isIPv4) {
-      throw new IllegalArgumentException(
-          "Bucket or Volume name cannot be an IPv4 address or all numeric");
-    }
-  }
-
-  /**
-   * Convert time in millisecond to a human readable format required in ozone.
-   * @return a human readable string for the input time
-   */
-  public static String formatDateTime(long millis) {
-    ZonedDateTime dateTime = ZonedDateTime.ofInstant(
-        Instant.ofEpochSecond(millis), DATE_FORMAT.get().getZone());
-    return DATE_FORMAT.get().format(dateTime);
-  }
-
-  /**
-   * Convert time in ozone date format to millisecond.
-   * @return time in milliseconds
-   */
-  public static long formatDateTime(String date) throws ParseException {
-    Preconditions.checkNotNull(date, "Date string should not be null.");
-    return ZonedDateTime.parse(date, DATE_FORMAT.get())
-        .toInstant().getEpochSecond();
-  }
-
-  /**
-   * Returns the maximum no of outstanding async requests to be handled by
-   * Standalone and Ratis client.
-   */
-  public static int getMaxOutstandingRequests(Configuration config) {
-    return config
-        .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
-            ScmConfigKeys
-                .SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
deleted file mode 100644
index 754ce84..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/ozone/client/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client;
-
-/**
- * Generic helper classes for the client side of hdsl workflows..
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
deleted file mode 100644
index 06b1e99..0000000
--- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.scm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-
-/**
- * A Client for the storageContainer protocol.
- */
-public class XceiverClient extends XceiverClientSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
-  private final Pipeline pipeline;
-  private final Configuration config;
-  private Channel channel;
-  private Bootstrap b;
-  private EventLoopGroup group;
-  private final Semaphore semaphore;
-
-  /**
-   * Constructs a client that can communicate with the Container framework on
-   * data nodes.
-   *
-   * @param pipeline - Pipeline that defines the machines.
-   * @param config -- Ozone Config
-   */
-  public XceiverClient(Pipeline pipeline, Configuration config) {
-    super();
-    Preconditions.checkNotNull(pipeline);
-    Preconditions.checkNotNull(config);
-    this.pipeline = pipeline;
-    this.config = config;
-    this.semaphore =
-        new Semaphore(OzoneClientUtils.getMaxOutstandingRequests(config));
-  }
-
-  @Override
-  public void connect() throws Exception {
-    if (channel != null && channel.isActive()) {
-      throw new IOException("This client is already connected to a host.");
-    }
-
-    group = new NioEventLoopGroup();
-    b = new Bootstrap();
-    b.group(group)
-        .channel(NioSocketChannel.class)
-        .handler(new LoggingHandler(LogLevel.INFO))
-        .handler(new XceiverClientInitializer(this.pipeline, semaphore));
-    DatanodeDetails leader = this.pipeline.getLeader();
-
-    // read port from the data node, on failure use default configured
-    // port.
-    int port = leader.getContainerPort();
-    if (port == 0) {
-      port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    }
-    LOG.debug("Connecting to server Port : " + port);
-    channel = b.connect(leader.getHostName(), port).sync().channel();
-  }
-
-  /**
-   * Returns if the exceiver client connects to a server.
-   *
-   * @return True if the connection is alive, false otherwise.
-   */
-  @VisibleForTesting
-  public boolean isConnected() {
-    return channel.isActive();
-  }
-
-  @Override
-  public void close() {
-    if (group != null) {
-      group.shutdownGracefully().awaitUninterruptibly();
-    }
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public ContainerProtos.ContainerCommandResponseProto sendCommand(
-      ContainerProtos.ContainerCommandRequestProto request) throws IOException 
{
-    try {
-      if ((channel == null) || (!channel.isActive())) {
-        throw new IOException("This channel is not connected.");
-      }
-      XceiverClientHandler handler =
-          channel.pipeline().get(XceiverClientHandler.class);
-
-      return handler.sendCommand(request);
-    } catch (ExecutionException | InterruptedException e) {
-      /**
-       * In case the netty channel handler throws an exception,
-       * the exception thrown will be wrapped within {@link 
ExecutionException}.
-       * Unwarpping here so that original exception gets passed
-       * to to the client.
-       */
-      if (e instanceof ExecutionException) {
-        Throwable cause = e.getCause();
-        if (cause instanceof IOException) {
-          throw (IOException) cause;
-        }
-      }
-      throw new IOException(
-          "Unexpected exception during execution:" + e.getMessage());
-    }
-  }
-
-  /**
-   * Sends a given command to server gets a waitable future back.
-   *
-   * @param request Request
-   * @return Response to the command
-   * @throws IOException
-   */
-  @Override
-  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-      sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
-      throws IOException, ExecutionException, InterruptedException {
-    if ((channel == null) || (!channel.isActive())) {
-      throw new IOException("This channel is not connected.");
-    }
-    XceiverClientHandler handler =
-        channel.pipeline().get(XceiverClientHandler.class);
-    return handler.sendCommandAsync(request);
-  }
-
-  /**
-   * Create a pipeline.
-   *
-   * @param pipelineID - Name of the pipeline.
-   * @param datanodes - Datanodes
-   */
-  @Override
-  public void createPipeline(String pipelineID, List<DatanodeDetails> 
datanodes)
-      throws IOException {
-    // For stand alone pipeline, there is no notion called setup pipeline.
-    return;
-  }
-
-  /**
-   * Returns pipeline Type.
-   *
-   * @return - Stand Alone as the type.
-   */
-  @Override
-  public HdslProtos.ReplicationType getPipelineType() {
-    return HdslProtos.ReplicationType.STAND_ALONE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
deleted file mode 100644
index 4b2d6c4..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.scm;
-
-import com.google.common.base.Preconditions;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-
-/**
- * Netty client handler.
- */
-public class XceiverClientHandler extends
-    SimpleChannelInboundHandler<ContainerCommandResponseProto> {
-
-  static final Logger LOG = 
LoggerFactory.getLogger(XceiverClientHandler.class);
-  private final ConcurrentMap<String, ResponseFuture> responses =
-      new ConcurrentHashMap<>();
-
-  private final Pipeline pipeline;
-  private volatile Channel channel;
-  private XceiverClientMetrics metrics;
-  private final Semaphore semaphore;
-
-  /**
-   * Constructs a client that can communicate to a container server.
-   */
-  public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
-    super(false);
-    Preconditions.checkNotNull(pipeline);
-    this.pipeline = pipeline;
-    this.metrics = XceiverClientManager.getXceiverClientMetrics();
-    this.semaphore = semaphore;
-  }
-
-  /**
-   * <strong>Please keep in mind that this method will be renamed to {@code
-   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
-   * <p>
-   * Is called for each message of type {@link ContainerProtos
-   * .ContainerCommandResponseProto}.
-   *
-   * @param ctx the {@link ChannelHandlerContext} which this {@link
-   * SimpleChannelInboundHandler} belongs to
-   * @param msg the message to handle
-   * @throws Exception is thrown if an error occurred
-   */
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx,
-      ContainerProtos.ContainerCommandResponseProto msg)
-      throws Exception {
-    Preconditions.checkNotNull(msg);
-    metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
-
-    String key = msg.getTraceID();
-    ResponseFuture response = responses.remove(key);
-    semaphore.release();
-
-    if (response != null) {
-      response.getFuture().complete(msg);
-
-      long requestTime = response.getRequestTime();
-      metrics.addContainerOpsLatency(msg.getCmdType(),
-          Time.monotonicNowNanos() - requestTime);
-    } else {
-      LOG.error("A reply received for message that was not queued. trace " +
-          "ID: {}", msg.getTraceID());
-    }
-  }
-
-  @Override
-  public void channelRegistered(ChannelHandlerContext ctx) {
-    LOG.debug("channelRegistered: Connected to ctx");
-    channel = ctx.channel();
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOG.info("Exception in client " + cause.toString());
-    Iterator<String> keyIterator = responses.keySet().iterator();
-    while (keyIterator.hasNext()) {
-      ResponseFuture response = responses.remove(keyIterator.next());
-      response.getFuture().completeExceptionally(cause);
-      semaphore.release();
-    }
-    ctx.close();
-  }
-
-  /**
-   * Since netty is async, we send a work request and then wait until a 
response
-   * appears in the reply queue. This is simple sync interface for clients. we
-   * should consider building async interfaces for client if this turns out to
-   * be a performance bottleneck.
-   *
-   * @param request - request.
-   * @return -- response
-   */
-
-  public ContainerCommandResponseProto sendCommand(
-      ContainerProtos.ContainerCommandRequestProto request)
-      throws ExecutionException, InterruptedException {
-    Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
-    return future.get();
-  }
-
-  /**
-   * SendCommandAsyc queues a command to the Netty Subsystem and returns a
-   * CompletableFuture. This Future is marked compeleted in the channelRead0
-   * when the call comes back.
-   * @param request - Request to execute
-   * @return CompletableFuture
-   */
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerProtos.ContainerCommandRequestProto request)
-      throws InterruptedException {
-
-    // Throw an exception of request doesn't have traceId
-    if (StringUtils.isEmpty(request.getTraceID())) {
-      throw new IllegalArgumentException("Invalid trace ID");
-    }
-
-    // Setting the datanode ID in the commands, so that we can distinguish
-    // commands when the cluster simulator is running.
-    if(!request.hasDatanodeUuid()) {
-      throw new IllegalArgumentException("Invalid Datanode ID");
-    }
-
-    metrics.incrPendingContainerOpsMetrics(request.getCmdType());
-
-    CompletableFuture<ContainerCommandResponseProto> future
-        = new CompletableFuture<>();
-    ResponseFuture response = new ResponseFuture(future,
-        Time.monotonicNowNanos());
-    semaphore.acquire();
-    ResponseFuture previous = responses.putIfAbsent(
-        request.getTraceID(), response);
-    if (previous != null) {
-      LOG.error("Command with Trace already exists. Ignoring this command. " +
-              "{}. Previous Command: {}", request.getTraceID(),
-          previous.toString());
-      throw new IllegalStateException("Duplicate trace ID. Command with this " 
+
-          "trace ID is already executing. Please ensure that " +
-          "trace IDs are not reused. ID: " + request.getTraceID());
-    }
-
-    channel.writeAndFlush(request);
-    return response.getFuture();
-  }
-
-  /**
-   * Class wraps response future info.
-   */
-  static class ResponseFuture {
-    private final long requestTime;
-    private final CompletableFuture<ContainerCommandResponseProto> future;
-
-    ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future,
-        long requestTime) {
-      this.future = future;
-      this.requestTime = requestTime;
-    }
-
-    public long getRequestTime() {
-      return requestTime;
-    }
-
-    public CompletableFuture<ContainerCommandResponseProto> getFuture() {
-      return future;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
deleted file mode 100644
index c70a686..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.scm;
-
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * Setup the netty pipeline.
- */
-public class XceiverClientInitializer extends
-    ChannelInitializer<SocketChannel> {
-  private final Pipeline pipeline;
-  private final Semaphore semaphore;
-
-  /**
-   * Constructs an Initializer for the client pipeline.
-   * @param pipeline  - Pipeline.
-   */
-  public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
-    this.pipeline = pipeline;
-    this.semaphore = semaphore;
-  }
-
-  /**
-   * This method will be called once when the Channel is registered. After
-   * the method returns this instance will be removed from the
-   * ChannelPipeline of the Channel.
-   *
-   * @param ch   Channel which was registered.
-   * @throws Exception is thrown if an error occurs. In that case the
-   *                   Channel will be closed.
-   */
-  @Override
-  protected void initChannel(SocketChannel ch) throws Exception {
-    ChannelPipeline p = ch.pipeline();
-
-    p.addLast(new ProtobufVarint32FrameDecoder());
-    p.addLast(new ProtobufDecoder(ContainerProtos
-        .ContainerCommandResponseProto.getDefaultInstance()));
-
-    p.addLast(new ProtobufVarint32LengthFieldPrepender());
-    p.addLast(new ProtobufEncoder());
-
-    p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
deleted file mode 100644
index 3f62a3a..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.scm;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.Callable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
-import static org.apache.hadoop.scm.ScmConfigKeys
-    .SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
-import static org.apache.hadoop.hdsl.protocol.proto.HdslProtos
-    .ReplicationType.RATIS;
-
-/**
- * XceiverClientManager is responsible for the lifecycle of XceiverClient
- * instances.  Callers use this class to acquire an XceiverClient instance
- * connected to the desired container pipeline.  When done, the caller also 
uses
- * this class to release the previously acquired XceiverClient instance.
- *
- *
- * This class caches connection to container for reuse purpose, such that
- * accessing same container frequently will be through the same connection
- * without reestablishing connection. But the connection will be closed if
- * not being used for a period of time.
- */
-public class XceiverClientManager implements Closeable {
-
-  //TODO : change this to SCM configuration class
-  private final Configuration conf;
-  private final Cache<String, XceiverClientSpi> clientCache;
-  private final boolean useRatis;
-
-  private static XceiverClientMetrics metrics;
-  /**
-   * Creates a new XceiverClientManager.
-   *
-   * @param conf configuration
-   */
-  public XceiverClientManager(Configuration conf) {
-    Preconditions.checkNotNull(conf);
-    int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
-        SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT);
-    long staleThresholdMs = conf.getTimeDuration(
-        SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
-        SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
-    this.useRatis = conf.getBoolean(
-        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
-        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
-    this.conf = conf;
-    this.clientCache = CacheBuilder.newBuilder()
-        .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
-        .maximumSize(maxSize)
-        .removalListener(
-            new RemovalListener<String, XceiverClientSpi>() {
-            @Override
-            public void onRemoval(
-                RemovalNotification<String, XceiverClientSpi>
-                  removalNotification) {
-              synchronized (clientCache) {
-                // Mark the entry as evicted
-                XceiverClientSpi info = removalNotification.getValue();
-                info.setEvicted();
-              }
-            }
-          }).build();
-  }
-
-  @VisibleForTesting
-  public Cache<String, XceiverClientSpi> getClientCache() {
-    return clientCache;
-  }
-
-  /**
-   * Acquires a XceiverClientSpi connected to a container capable of
-   * storing the specified key.
-   *
-   * If there is already a cached XceiverClientSpi, simply return
-   * the cached otherwise create a new one.
-   *
-   * @param pipeline the container pipeline for the client connection
-   * @return XceiverClientSpi connected to a container
-   * @throws IOException if a XceiverClientSpi cannot be acquired
-   */
-  public XceiverClientSpi acquireClient(Pipeline pipeline)
-      throws IOException {
-    Preconditions.checkNotNull(pipeline);
-    Preconditions.checkArgument(pipeline.getMachines() != null);
-    Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
-
-    synchronized (clientCache) {
-      XceiverClientSpi info = getClient(pipeline);
-      info.incrementReference();
-      return info;
-    }
-  }
-
-  /**
-   * Releases a XceiverClientSpi after use.
-   *
-   * @param client client to release
-   */
-  public void releaseClient(XceiverClientSpi client) {
-    Preconditions.checkNotNull(client);
-    synchronized (clientCache) {
-      client.decrementReference();
-    }
-  }
-
-  private XceiverClientSpi getClient(Pipeline pipeline)
-      throws IOException {
-    String containerName = pipeline.getContainerName();
-    try {
-      return clientCache.get(containerName,
-          new Callable<XceiverClientSpi>() {
-          @Override
-          public XceiverClientSpi call() throws Exception {
-            XceiverClientSpi client = pipeline.getType() == RATIS ?
-                    XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
-                    : new XceiverClient(pipeline, conf);
-            client.connect();
-            return client;
-          }
-        });
-    } catch (Exception e) {
-      throw new IOException(
-          "Exception getting XceiverClient: " + e.toString(), e);
-    }
-  }
-
-  /**
-   * Close and remove all the cached clients.
-   */
-  public void close() {
-    //closing is done through RemovalListener
-    clientCache.invalidateAll();
-    clientCache.cleanUp();
-
-    if (metrics != null) {
-      metrics.unRegister();
-    }
-  }
-
-  /**
-   * Tells us if Ratis is enabled for this cluster.
-   * @return True if Ratis is enabled.
-   */
-  public boolean isUseRatis() {
-    return useRatis;
-  }
-
-  /**
-   * Returns hard coded 3 as replication factor.
-   * @return 3
-   */
-  public  HdslProtos.ReplicationFactor getFactor() {
-    if(isUseRatis()) {
-      return HdslProtos.ReplicationFactor.THREE;
-    }
-    return HdslProtos.ReplicationFactor.ONE;
-  }
-
-  /**
-   * Returns the default replication type.
-   * @return Ratis or Standalone
-   */
-  public HdslProtos.ReplicationType getType() {
-    // TODO : Fix me and make Ratis default before release.
-    // TODO: Remove this as replication factor and type are pipeline properties
-    if(isUseRatis()) {
-      return HdslProtos.ReplicationType.RATIS;
-    }
-    return HdslProtos.ReplicationType.STAND_ALONE;
-  }
-
-  /**
-   * Get xceiver client metric.
-   */
-  public synchronized static XceiverClientMetrics getXceiverClientMetrics() {
-    if (metrics == null) {
-      metrics = XceiverClientMetrics.create();
-    }
-
-    return metrics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
deleted file mode 100644
index bcece23..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.scm;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
-
-/**
- * The client metrics for the Storage Container protocol.
- */
-@InterfaceAudience.Private
-@Metrics(about = "Storage Container Client Metrics", context = "dfs")
-public class XceiverClientMetrics {
-  public static final String SOURCE_NAME = XceiverClientMetrics.class
-      .getSimpleName();
-
-  private @Metric MutableCounterLong pendingOps;
-  private MutableCounterLong[] pendingOpsArray;
-  private MutableRate[] containerOpsLatency;
-  private MetricsRegistry registry;
-
-  public XceiverClientMetrics() {
-    int numEnumEntries = ContainerProtos.Type.values().length;
-    this.registry = new MetricsRegistry(SOURCE_NAME);
-
-    this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
-    this.containerOpsLatency = new MutableRate[numEnumEntries];
-    for (int i = 0; i < numEnumEntries; i++) {
-      pendingOpsArray[i] = registry.newCounter(
-          "numPending" + ContainerProtos.Type.valueOf(i + 1),
-          "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
-          (long) 0);
-
-      containerOpsLatency[i] = registry.newRate(
-          ContainerProtos.Type.valueOf(i + 1) + "Latency",
-          "latency of " + ContainerProtos.Type.valueOf(i + 1)
-          + " ops");
-    }
-  }
-
-  public static XceiverClientMetrics create() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register(SOURCE_NAME, "Storage Container Client Metrics",
-        new XceiverClientMetrics());
-  }
-
-  public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
-    pendingOps.incr();
-    pendingOpsArray[type.ordinal()].incr();
-  }
-
-  public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) {
-    pendingOps.incr(-1);
-    pendingOpsArray[type.ordinal()].incr(-1);
-  }
-
-  public void addContainerOpsLatency(ContainerProtos.Type type,
-      long latencyNanos) {
-    containerOpsLatency[type.ordinal()].add(latencyNanos);
-  }
-
-  public long getContainerOpsMetrics(ContainerProtos.Type type) {
-    return pendingOpsArray[type.ordinal()].value();
-  }
-
-  public void unRegister() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    ms.unregisterSource(SOURCE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
deleted file mode 100644
index 084e3e5..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.scm;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * An abstract implementation of {@link XceiverClientSpi} using Ratis.
- * The underlying RPC mechanism can be chosen via the constructor.
- */
-public final class XceiverClientRatis extends XceiverClientSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
-
-  public static XceiverClientRatis newXceiverClientRatis(
-      Pipeline pipeline, Configuration ozoneConf) {
-    final String rpcType = ozoneConf.get(
-        ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-        ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final int maxOutstandingRequests =
-        OzoneClientUtils.getMaxOutstandingRequests(ozoneConf);
-    return new XceiverClientRatis(pipeline,
-        SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
-  }
-
-  private final Pipeline pipeline;
-  private final RpcType rpcType;
-  private final AtomicReference<RaftClient> client = new AtomicReference<>();
-  private final int maxOutstandingRequests;
-
-  /**
-   * Constructs a client.
-   */
-  private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
-      int maxOutStandingChunks) {
-    super();
-    this.pipeline = pipeline;
-    this.rpcType = rpcType;
-    this.maxOutstandingRequests = maxOutStandingChunks;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void createPipeline(String clusterId, List<DatanodeDetails> datanodes)
-      throws IOException {
-    RaftGroup group = RatisHelper.newRaftGroup(datanodes);
-    LOG.debug("initializing pipeline:{} with nodes:{}", clusterId,
-        group.getPeers());
-    reinitialize(datanodes, group);
-  }
-
-  /**
-   * Returns Ratis as pipeline Type.
-   *
-   * @return - Ratis
-   */
-  @Override
-  public HdslProtos.ReplicationType getPipelineType() {
-    return HdslProtos.ReplicationType.RATIS;
-  }
-
-  private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    IOException exception = null;
-    for (DatanodeDetails d : datanodes) {
-      try {
-        reinitialize(d, group);
-      } catch (IOException ioe) {
-        if (exception == null) {
-          exception = new IOException(
-              "Failed to reinitialize some of the RaftPeer(s)", ioe);
-        } else {
-          exception.addSuppressed(ioe);
-        }
-      }
-    }
-    if (exception != null) {
-      throw exception;
-    }
-  }
-
-  /**
-   * Adds a new peers to the Ratis Ring.
-   *
-   * @param datanode - new datanode
-   * @param group    - Raft group
-   * @throws IOException - on Failure.
-   */
-  private void reinitialize(DatanodeDetails datanode, RaftGroup group)
-      throws IOException {
-    final RaftPeer p = RatisHelper.toRaftPeer(datanode);
-    try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
-      client.reinitialize(group, p.getId());
-    } catch (IOException ioe) {
-      LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
-          p, datanode, ioe);
-      throw new IOException("Failed to reinitialize RaftPeer " + p
-          + "(datanode=" + datanode + ")", ioe);
-    }
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public void connect() throws Exception {
-    LOG.debug("Connecting to pipeline:{} leader:{}",
-        getPipeline().getPipelineName(),
-        RatisHelper.toRaftPeerId(pipeline.getLeader()));
-    // TODO : XceiverClient ratis should pass the config value of
-    // maxOutstandingRequests so as to set the upper bound on max no of async
-    // requests to be handled by raft client
-    if (!client.compareAndSet(null,
-        RatisHelper.newRaftClient(rpcType, getPipeline()))) {
-      throw new IllegalStateException("Client is already connected.");
-    }
-  }
-
-  @Override
-  public void close() {
-    final RaftClient c = client.getAndSet(null);
-    if (c != null) {
-      try {
-        c.close();
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  }
-
-  private RaftClient getClient() {
-    return Objects.requireNonNull(client.get(), "client is null");
-  }
-
-  private boolean isReadOnly(ContainerCommandRequestProto proto) {
-    switch (proto.getCmdType()) {
-    case ReadContainer:
-    case ReadChunk:
-    case ListKey:
-    case GetKey:
-    case GetSmallFile:
-    case ListContainer:
-    case ListChunk:
-      return true;
-    case CloseContainer:
-    case WriteChunk:
-    case UpdateContainer:
-    case CompactChunk:
-    case CreateContainer:
-    case DeleteChunk:
-    case DeleteContainer:
-    case DeleteKey:
-    case PutKey:
-    case PutSmallFile:
-    default:
-      return false;
-    }
-  }
-
-  private RaftClientReply sendRequest(ContainerCommandRequestProto request)
-      throws IOException {
-    boolean isReadOnlyRequest = isReadOnly(request);
-    ByteString byteString =
-        ShadedProtoUtil.asShadedByteString(request.toByteArray());
-    LOG.debug("sendCommand {} {}", isReadOnlyRequest, request);
-    final RaftClientReply reply =  isReadOnlyRequest ?
-        getClient().sendReadOnly(() -> byteString) :
-        getClient().send(() -> byteString);
-    LOG.debug("reply {} {}", isReadOnlyRequest, reply);
-    return reply;
-  }
-
-  private CompletableFuture<RaftClientReply> sendRequestAsync(
-      ContainerCommandRequestProto request) throws IOException {
-    boolean isReadOnlyRequest = isReadOnly(request);
-    ByteString byteString =
-        ShadedProtoUtil.asShadedByteString(request.toByteArray());
-    LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request);
-    return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) 
:
-        getClient().sendAsync(() -> byteString);
-  }
-
-  @Override
-  public ContainerCommandResponseProto sendCommand(
-      ContainerCommandRequestProto request) throws IOException {
-    final RaftClientReply reply = sendRequest(request);
-    Preconditions.checkState(reply.isSuccess());
-    return ContainerCommandResponseProto.parseFrom(
-        ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
-  }
-
-  /**
-   * Sends a given command to server gets a waitable future back.
-   *
-   * @param request Request
-   * @return Response to the command
-   * @throws IOException
-   */
-  @Override
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerCommandRequestProto request)
-      throws IOException, ExecutionException, InterruptedException {
-    return sendRequestAsync(request).whenComplete((reply, e) ->
-          LOG.debug("received reply {} for request: {} exception: {}", request,
-              reply, e))
-        .thenApply(reply -> {
-          try {
-            return ContainerCommandResponseProto.parseFrom(
-                ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
-          } catch (InvalidProtocolBufferException e) {
-            throw new CompletionException(e);
-          }
-        });
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
deleted file mode 100644
index 08ddfd6..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.scm.client;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerData;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ReadContainerResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.UUID;
-
-import static 
org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState.ALLOCATED;
-import static 
org.apache.hadoop.hdsl.protocol.proto.HdslProtos.LifeCycleState.OPEN;
-
-/**
- * This class provides the client-facing APIs of container operations.
- */
-public class ContainerOperationClient implements ScmClient {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerOperationClient.class);
-  private static long containerSizeB = -1;
-  private final StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private final XceiverClientManager xceiverClientManager;
-
-  public ContainerOperationClient(
-      StorageContainerLocationProtocolClientSideTranslatorPB
-          storageContainerLocationClient,
-      XceiverClientManager xceiverClientManager) {
-    this.storageContainerLocationClient = storageContainerLocationClient;
-    this.xceiverClientManager = xceiverClientManager;
-  }
-
-  /**
-   * Return the capacity of containers. The current assumption is that all
-   * containers have the same capacity. Therefore one static is sufficient for
-   * any container.
-   * @return The capacity of one container in number of bytes.
-   */
-  public static long getContainerSizeB() {
-    return containerSizeB;
-  }
-
-  /**
-   * Set the capacity of container. Should be exactly once on system start.
-   * @param size Capacity of one container in number of bytes.
-   */
-  public static void setContainerSizeB(long size) {
-    containerSizeB = size;
-  }
-
-  /**
-   * @inheritDoc
-   */
-  @Override
-  public Pipeline createContainer(String containerId, String owner)
-      throws IOException {
-    XceiverClientSpi client = null;
-    try {
-      Pipeline pipeline =
-          storageContainerLocationClient.allocateContainer(
-              xceiverClientManager.getType(),
-              xceiverClientManager.getFactor(), containerId, owner);
-      client = xceiverClientManager.acquireClient(pipeline);
-
-      // Allocated State means that SCM has allocated this pipeline in its
-      // namespace. The client needs to create the pipeline on the machines
-      // which was choosen by the SCM.
-      Preconditions.checkState(pipeline.getLifeCycleState() == ALLOCATED ||
-          pipeline.getLifeCycleState() == OPEN, "Unexpected pipeline state");
-      if (pipeline.getLifeCycleState() == ALLOCATED) {
-        createPipeline(client, pipeline);
-      }
-      // TODO : Container Client State needs to be updated.
-      // TODO : Return ContainerInfo instead of Pipeline
-      createContainer(containerId, client, pipeline);
-      return pipeline;
-    } finally {
-      if (client != null) {
-        xceiverClientManager.releaseClient(client);
-      }
-    }
-  }
-
-  /**
-   * Create a container over pipeline specified by the SCM.
-   *
-   * @param containerId - Container ID
-   * @param client - Client to communicate with Datanodes
-   * @param pipeline - A pipeline that is already created.
-   * @throws IOException
-   */
-  public void createContainer(String containerId, XceiverClientSpi client,
-      Pipeline pipeline) throws IOException {
-    String traceID = UUID.randomUUID().toString();
-    storageContainerLocationClient.notifyObjectStageChange(
-        ObjectStageChangeRequestProto.Type.container,
-        containerId,
-        ObjectStageChangeRequestProto.Op.create,
-        ObjectStageChangeRequestProto.Stage.begin);
-    ContainerProtocolCalls.createContainer(client, traceID);
-    storageContainerLocationClient.notifyObjectStageChange(
-        ObjectStageChangeRequestProto.Type.container,
-        containerId,
-        ObjectStageChangeRequestProto.Op.create,
-        ObjectStageChangeRequestProto.Stage.complete);
-
-    // Let us log this info after we let SCM know that we have completed the
-    // creation state.
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created container " + containerId
-          + " leader:" + pipeline.getLeader()
-          + " machines:" + pipeline.getMachines());
-    }
-  }
-
-  /**
-   * Creates a pipeline over the machines choosen by the SCM.
-   *
-   * @param client - Client
-   * @param pipeline - pipeline to be createdon Datanodes.
-   * @throws IOException
-   */
-  private void createPipeline(XceiverClientSpi client, Pipeline pipeline)
-      throws IOException {
-
-    Preconditions.checkNotNull(pipeline.getPipelineName(), "Pipeline " +
-        "name cannot be null when client create flag is set.");
-
-    // Pipeline creation is a three step process.
-    //
-    // 1. Notify SCM that this client is doing a create pipeline on
-    // datanodes.
-    //
-    // 2. Talk to Datanodes to create the pipeline.
-    //
-    // 3. update SCM that pipeline creation was successful.
-    storageContainerLocationClient.notifyObjectStageChange(
-        ObjectStageChangeRequestProto.Type.pipeline,
-        pipeline.getPipelineName(),
-        ObjectStageChangeRequestProto.Op.create,
-        ObjectStageChangeRequestProto.Stage.begin);
-
-    client.createPipeline(pipeline.getPipelineName(),
-        pipeline.getMachines());
-
-    storageContainerLocationClient.notifyObjectStageChange(
-        ObjectStageChangeRequestProto.Type.pipeline,
-        pipeline.getPipelineName(),
-        ObjectStageChangeRequestProto.Op.create,
-        ObjectStageChangeRequestProto.Stage.complete);
-
-    // TODO : Should we change the state on the client side ??
-    // That makes sense, but it is not needed for the client to work.
-    LOG.debug("Pipeline creation successful. Pipeline: {}",
-        pipeline.toString());
-  }
-
-  /**
-   * @inheritDoc
-   */
-  @Override
-  public Pipeline createContainer(HdslProtos.ReplicationType type,
-      HdslProtos.ReplicationFactor factor,
-      String containerId, String owner) throws IOException {
-    XceiverClientSpi client = null;
-    try {
-      // allocate container on SCM.
-      Pipeline pipeline =
-          storageContainerLocationClient.allocateContainer(type, factor,
-              containerId, owner);
-      client = xceiverClientManager.acquireClient(pipeline);
-
-      // Allocated State means that SCM has allocated this pipeline in its
-      // namespace. The client needs to create the pipeline on the machines
-      // which was choosen by the SCM.
-      if (pipeline.getLifeCycleState() == ALLOCATED) {
-        createPipeline(client, pipeline);
-      }
-
-      // TODO : Return ContainerInfo instead of Pipeline
-      // connect to pipeline leader and allocate container on leader datanode.
-      client = xceiverClientManager.acquireClient(pipeline);
-      createContainer(containerId, client, pipeline);
-      return pipeline;
-    } finally {
-      if (client != null) {
-        xceiverClientManager.releaseClient(client);
-      }
-    }
-  }
-
-  /**
-   * Returns a set of Nodes that meet a query criteria.
-   *
-   * @param nodeStatuses - A set of criteria that we want the node to have.
-   * @param queryScope - Query scope - Cluster or pool.
-   * @param poolName - if it is pool, a pool name is required.
-   * @return A set of nodes that meet the requested criteria.
-   * @throws IOException
-   */
-  @Override
-  public HdslProtos.NodePool queryNode(EnumSet<HdslProtos.NodeState>
-      nodeStatuses, HdslProtos.QueryScope queryScope, String poolName)
-      throws IOException {
-    return storageContainerLocationClient.queryNode(nodeStatuses, queryScope,
-        poolName);
-  }
-
-  /**
-   * Creates a specified replication pipeline.
-   */
-  @Override
-  public Pipeline createReplicationPipeline(HdslProtos.ReplicationType type,
-      HdslProtos.ReplicationFactor factor, HdslProtos.NodePool nodePool)
-      throws IOException {
-    return storageContainerLocationClient.createReplicationPipeline(type,
-        factor, nodePool);
-  }
-
-  /**
-   * Delete the container, this will release any resource it uses.
-   * @param pipeline - Pipeline that represents the container.
-   * @param force - True to forcibly delete the container.
-   * @throws IOException
-   */
-  @Override
-  public void deleteContainer(Pipeline pipeline, boolean force)
-      throws IOException {
-    XceiverClientSpi client = null;
-    try {
-      client = xceiverClientManager.acquireClient(pipeline);
-      String traceID = UUID.randomUUID().toString();
-      ContainerProtocolCalls.deleteContainer(client, force, traceID);
-      storageContainerLocationClient
-          .deleteContainer(pipeline.getContainerName());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Deleted container {}, leader: {}, machines: {} ",
-            pipeline.getContainerName(),
-            pipeline.getLeader(),
-            pipeline.getMachines());
-      }
-    } finally {
-      if (client != null) {
-        xceiverClientManager.releaseClient(client);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<ContainerInfo> listContainer(String startName,
-      String prefixName, int count)
-      throws IOException {
-    return storageContainerLocationClient.listContainer(
-        startName, prefixName, count);
-  }
-
-  /**
-   * Get meta data from an existing container.
-   *
-   * @param pipeline - pipeline that represents the container.
-   * @return ContainerInfo - a message of protobuf which has basic info
-   * of a container.
-   * @throws IOException
-   */
-  @Override
-  public ContainerData readContainer(Pipeline pipeline) throws IOException {
-    XceiverClientSpi client = null;
-    try {
-      client = xceiverClientManager.acquireClient(pipeline);
-      String traceID = UUID.randomUUID().toString();
-      ReadContainerResponseProto response =
-          ContainerProtocolCalls.readContainer(client,
-              pipeline.getContainerName(), traceID);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Read container {}, leader: {}, machines: {} ",
-            pipeline.getContainerName(),
-            pipeline.getLeader(),
-            pipeline.getMachines());
-      }
-      return response.getContainerData();
-    } finally {
-      if (client != null) {
-        xceiverClientManager.releaseClient(client);
-      }
-    }
-  }
-
-  /**
-   * Given an id, return the pipeline associated with the container.
-   * @param containerId - String Container ID
-   * @return Pipeline of the existing container, corresponding to the given id.
-   * @throws IOException
-   */
-  @Override
-  public Pipeline getContainer(String containerId) throws
-      IOException {
-    return storageContainerLocationClient.getContainer(containerId);
-  }
-
-  /**
-   * Close a container.
-   *
-   * @param pipeline the container to be closed.
-   * @throws IOException
-   */
-  @Override
-  public void closeContainer(Pipeline pipeline) throws IOException {
-    XceiverClientSpi client = null;
-    try {
-      LOG.debug("Close container {}", pipeline);
-      /*
-      TODO: two orders here, revisit this later:
-      1. close on SCM first, then on data node
-      2. close on data node first, then on SCM
-
-      with 1: if client failed after closing on SCM, then there is a
-      container SCM thinks as closed, but is actually open. Then SCM will no
-      longer allocate block to it, which is fine. But SCM may later try to
-      replicate this "closed" container, which I'm not sure is safe.
-
-      with 2: if client failed after close on datanode, then there is a
-      container SCM thinks as open, but is actually closed. Then SCM will still
-      try to allocate block to it. Which will fail when actually doing the
-      write. No more data can be written, but at least the correctness and
-      consistency of existing data will maintain.
-
-      For now, take the #2 way.
-       */
-      // Actually close the container on Datanode
-      client = xceiverClientManager.acquireClient(pipeline);
-      String traceID = UUID.randomUUID().toString();
-
-      String containerId = pipeline.getContainerName();
-
-      storageContainerLocationClient.notifyObjectStageChange(
-          ObjectStageChangeRequestProto.Type.container,
-          containerId,
-          ObjectStageChangeRequestProto.Op.close,
-          ObjectStageChangeRequestProto.Stage.begin);
-
-      ContainerProtocolCalls.closeContainer(client, traceID);
-      // Notify SCM to close the container
-      storageContainerLocationClient.notifyObjectStageChange(
-          ObjectStageChangeRequestProto.Type.container,
-          containerId,
-          ObjectStageChangeRequestProto.Op.close,
-          ObjectStageChangeRequestProto.Stage.complete);
-    } finally {
-      if (client != null) {
-        xceiverClientManager.releaseClient(client);
-      }
-    }
-  }
-
-  /**
-   * Get the the current usage information.
-   * @param pipeline - Pipeline
-   * @return the size of the given container.
-   * @throws IOException
-   */
-  @Override
-  public long getContainerSize(Pipeline pipeline) throws IOException {
-    // TODO : Pipeline can be null, handle it correctly.
-    long size = getContainerSizeB();
-    if (size == -1) {
-      throw new IOException("Container size unknown!");
-    }
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java
deleted file mode 100644
index 9febb0ac..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/client/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.scm.client;
-
-/**
- * Client facing classes for the container operations.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java
deleted file mode 100644
index e23763d..0000000
--- a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.scm;
-
-/**
- * Classes for different type of container service client.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
 
b/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
deleted file mode 100644
index 8c54d65..0000000
--- 
a/hadoop-hdsl/client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.scm.storage;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hadoop.fs.Seekable;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ReadChunkResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.XceiverClientManager;
-
-/**
- * An {@link InputStream} used by the REST service in combination with the
- * SCMClient to read the value of a key from a sequence
- * of container chunks.  All bytes of the key value are stored in container
- * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
- * instances.  This class encapsulates all state management for iterating
- * through the sequence of chunks and the sequence of buffers within each 
chunk.
- */
-public class ChunkInputStream extends InputStream implements Seekable {
-
-  private static final int EOF = -1;
-
-  private final String key;
-  private final String traceID;
-  private XceiverClientManager xceiverClientManager;
-  private XceiverClientSpi xceiverClient;
-  private List<ChunkInfo> chunks;
-  private int chunkIndex;
-  private long[] chunkOffset;
-  private List<ByteBuffer> buffers;
-  private int bufferIndex;
-
-  /**
-   * Creates a new ChunkInputStream.
-   *
-   * @param key chunk key
-   * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param chunks list of chunks to read
-   * @param traceID container protocol call traceID
-   */
-  public ChunkInputStream(String key, XceiverClientManager 
xceiverClientManager,
-      XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
-    this.key = key;
-    this.traceID = traceID;
-    this.xceiverClientManager = xceiverClientManager;
-    this.xceiverClient = xceiverClient;
-    this.chunks = chunks;
-    this.chunkIndex = -1;
-    // chunkOffset[i] stores offset at which chunk i stores data in
-    // ChunkInputStream
-    this.chunkOffset = new long[this.chunks.size()];
-    initializeChunkOffset();
-    this.buffers = null;
-    this.bufferIndex = 0;
-  }
-
-  private void initializeChunkOffset() {
-    int tempOffset = 0;
-    for (int i = 0; i < chunks.size(); i++) {
-      chunkOffset[i] = tempOffset;
-      tempOffset += chunks.get(i).getLen();
-    }
-  }
-
-  @Override
-  public synchronized int read()
-      throws IOException {
-    checkOpen();
-    int available = prepareRead(1);
-    return available == EOF ? EOF :
-        Byte.toUnsignedInt(buffers.get(bufferIndex).get());
-  }
-
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    // According to the JavaDocs for InputStream, it is recommended that
-    // subclasses provide an override of bulk read if possible for performance
-    // reasons.  In addition to performance, we need to do it for correctness
-    // reasons.  The Ozone REST service uses PipedInputStream and
-    // PipedOutputStream to relay HTTP response data between a Jersey thread 
and
-    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
-    // have a subtle dependency (bug?) on the wrapped stream providing separate
-    // implementations of single-byte read and bulk read.  Without this, get 
key
-    // responses might close the connection before writing all of the bytes
-    // advertised in the Content-Length.
-    if (b == null) {
-      throw new NullPointerException();
-    }
-    if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (len == 0) {
-      return 0;
-    }
-    checkOpen();
-    int available = prepareRead(len);
-    if (available == EOF) {
-      return EOF;
-    }
-    buffers.get(bufferIndex).get(b, off, available);
-    return available;
-  }
-
-  @Override
-  public synchronized void close() {
-    if (xceiverClientManager != null && xceiverClient != null) {
-      xceiverClientManager.releaseClient(xceiverClient);
-      xceiverClientManager = null;
-      xceiverClient = null;
-    }
-  }
-
-  /**
-   * Checks if the stream is open.  If not, throws an exception.
-   *
-   * @throws IOException if stream is closed
-   */
-  private synchronized void checkOpen() throws IOException {
-    if (xceiverClient == null) {
-      throw new IOException("ChunkInputStream has been closed.");
-    }
-  }
-
-  /**
-   * Prepares to read by advancing through chunks and buffers as needed until 
it
-   * finds data to return or encounters EOF.
-   *
-   * @param len desired length of data to read
-   * @return length of data available to read, possibly less than desired 
length
-   */
-  private synchronized int prepareRead(int len) throws IOException {
-    for (;;) {
-      if (chunks == null || chunks.isEmpty()) {
-        // This must be an empty key.
-        return EOF;
-      } else if (buffers == null) {
-        // The first read triggers fetching the first chunk.
-        readChunkFromContainer();
-      } else if (!buffers.isEmpty() &&
-          buffers.get(bufferIndex).hasRemaining()) {
-        // Data is available from the current buffer.
-        ByteBuffer bb = buffers.get(bufferIndex);
-        return len > bb.remaining() ? bb.remaining() : len;
-      } else if (!buffers.isEmpty() &&
-          !buffers.get(bufferIndex).hasRemaining() &&
-          bufferIndex < buffers.size() - 1) {
-        // There are additional buffers available.
-        ++bufferIndex;
-      } else if (chunkIndex < chunks.size() - 1) {
-        // There are additional chunks available.
-        readChunkFromContainer();
-      } else {
-        // All available input has been consumed.
-        return EOF;
-      }
-    }
-  }
-
-  /**
-   * Attempts to read the chunk at the specified offset in the chunk list.  If
-   * successful, then the data of the read chunk is saved so that its bytes can
-   * be returned from subsequent read calls.
-   *
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void readChunkFromContainer() throws IOException {
-    // On every chunk read chunkIndex should be increased so as to read the
-    // next chunk
-    chunkIndex += 1;
-    final ReadChunkResponseProto readChunkResponse;
-    try {
-      readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
-          chunks.get(chunkIndex), key, traceID);
-    } catch (IOException e) {
-      throw new IOException("Unexpected OzoneException: " + e.toString(), e);
-    }
-    ByteString byteString = readChunkResponse.getData();
-    buffers = byteString.asReadOnlyByteBufferList();
-    bufferIndex = 0;
-  }
-
-  @Override
-  public synchronized void seek(long pos) throws IOException {
-    if (pos < 0 || (chunks.size() == 0 && pos > 0)
-        || pos >= chunkOffset[chunks.size() - 1] + chunks.get(chunks.size() - 
1)
-        .getLen()) {
-      throw new EOFException(
-          "EOF encountered pos: " + pos + " container key: " + key);
-    }
-    if (chunkIndex == -1) {
-      chunkIndex = Arrays.binarySearch(chunkOffset, pos);
-    } else if (pos < chunkOffset[chunkIndex]) {
-      chunkIndex = Arrays.binarySearch(chunkOffset, 0, chunkIndex, pos);
-    } else if (pos >= chunkOffset[chunkIndex] + chunks.get(chunkIndex)
-        .getLen()) {
-      chunkIndex =
-          Arrays.binarySearch(chunkOffset, chunkIndex + 1, chunks.size(), pos);
-    }
-    if (chunkIndex < 0) {
-      // Binary search returns -insertionPoint - 1  if element is not present
-      // in the array. insertionPoint is the point at which element would be
-      // inserted in the sorted array. We need to adjust the chunkIndex
-      // accordingly so that chunkIndex = insertionPoint - 1
-      chunkIndex = -chunkIndex -2;
-    }
-    // adjust chunkIndex so that readChunkFromContainer reads the correct chunk
-    chunkIndex -= 1;
-    readChunkFromContainer();
-    adjustBufferIndex(pos);
-  }
-
-  private void adjustBufferIndex(long pos) {
-    long tempOffest = chunkOffset[chunkIndex];
-    for (int i = 0; i < buffers.size(); i++) {
-      if (pos - tempOffest >= buffers.get(i).capacity()) {
-        tempOffest += buffers.get(i).capacity();
-      } else {
-        bufferIndex = i;
-        break;
-      }
-    }
-    buffers.get(bufferIndex).position((int) (pos - tempOffest));
-  }
-
-  @Override
-  public synchronized long getPos() throws IOException {
-    return chunkIndex == -1 ? 0 :
-        chunkOffset[chunkIndex] + buffers.get(bufferIndex).position();
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return false;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to