http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
deleted file mode 100644
index 08f47a2..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-
-import com.google.common.base.Strings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.HdslUtils;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.hadoop.hdsl.HdslUtils.getSCMAddresses;
-
-/**
- * Init Datanode State is the task that gets run when we are in Init State.
- */
-public class InitDatanodeState implements DatanodeState,
-    Callable<DatanodeStateMachine.DatanodeStates> {
-  static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class);
-  private final SCMConnectionManager connectionManager;
-  private final Configuration conf;
-  private final StateContext context;
-  private Future<DatanodeStateMachine.DatanodeStates> result;
-
-  /**
-   *  Create InitDatanodeState Task.
-   *
-   * @param conf - Conf
-   * @param connectionManager - Connection Manager
-   * @param context - Current Context
-   */
-  public InitDatanodeState(Configuration conf,
-                           SCMConnectionManager connectionManager,
-                           StateContext context) {
-    this.conf = conf;
-    this.connectionManager = connectionManager;
-    this.context = context;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates call() throws Exception {
-    Collection<InetSocketAddress> addresses = null;
-    try {
-      addresses = getSCMAddresses(conf);
-    } catch (IllegalArgumentException e) {
-      if(!Strings.isNullOrEmpty(e.getMessage())) {
-        LOG.error("Failed to get SCM addresses: " + e.getMessage());
-      }
-      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-    }
-
-    if (addresses == null || addresses.isEmpty()) {
-      LOG.error("Null or empty SCM address list found.");
-      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-    } else {
-      for (InetSocketAddress addr : addresses) {
-        connectionManager.addSCMServer(addr);
-      }
-    }
-
-    // If datanode ID is set, persist it to the ID file.
-    persistContainerDatanodeDetails();
-
-    return this.context.getState().getNextState();
-  }
-
-  /**
-   * Persist DatanodeDetails to datanode.id file.
-   */
-  private void persistContainerDatanodeDetails() throws IOException {
-    String dataNodeIDPath = HdslUtils.getDatanodeIdFilePath(conf);
-    File idPath = new File(dataNodeIDPath);
-    DatanodeDetails datanodeDetails = this.context.getParent()
-        .getDatanodeDetails();
-    if (datanodeDetails != null && !idPath.exists()) {
-      ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
-      LOG.info("DatanodeDetails is persisted to {}", dataNodeIDPath);
-    }
-  }
-
-  /**
-   * Called before entering this state.
-   */
-  @Override
-  public void onEnter() {
-    LOG.trace("Entering init container state");
-  }
-
-  /**
-   * Called After exiting this state.
-   */
-  @Override
-  public void onExit() {
-    LOG.trace("Exiting init container state");
-  }
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  @Override
-  public void execute(ExecutorService executor) {
-    result = executor.submit(this);
-  }
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param time     - Time
-   * @param timeUnit - Unit of time.
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates await(long time,
-      TimeUnit timeUnit) throws InterruptedException,
-      ExecutionException, TimeoutException {
-    return result.get(time, timeUnit);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
deleted file mode 100644
index 7a8c17b..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Class that implements handshake with SCM.
- */
-public class RunningDatanodeState implements DatanodeState {
-  static final Logger
-      LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
-  private final SCMConnectionManager connectionManager;
-  private final Configuration conf;
-  private final StateContext context;
-  private CompletionService<EndpointStateMachine.EndPointStates> ecs;
-
-  public RunningDatanodeState(Configuration conf,
-      SCMConnectionManager connectionManager,
-      StateContext context) {
-    this.connectionManager = connectionManager;
-    this.conf = conf;
-    this.context = context;
-  }
-
-  /**
-   * Called before entering this state.
-   */
-  @Override
-  public void onEnter() {
-    LOG.trace("Entering handshake task.");
-  }
-
-  /**
-   * Called After exiting this state.
-   */
-  @Override
-  public void onExit() {
-    LOG.trace("Exiting handshake task.");
-  }
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  @Override
-  public void execute(ExecutorService executor) {
-    ecs = new ExecutorCompletionService<>(executor);
-    for (EndpointStateMachine endpoint : connectionManager.getValues()) {
-      Callable<EndpointStateMachine.EndPointStates> endpointTask
-          = getEndPointTask(endpoint);
-      ecs.submit(endpointTask);
-    }
-  }
-  //TODO : Cache some of these tasks instead of creating them
-  //all the time.
-  private Callable<EndpointStateMachine.EndPointStates>
-      getEndPointTask(EndpointStateMachine endpoint) {
-    switch (endpoint.getState()) {
-    case GETVERSION:
-      return new VersionEndpointTask(endpoint, conf);
-    case REGISTER:
-      return  RegisterEndpointTask.newBuilder()
-          .setConfig(conf)
-          .setEndpointStateMachine(endpoint)
-          .setDatanodeDetails(context.getParent().getDatanodeDetails())
-          .build();
-    case HEARTBEAT:
-      return HeartbeatEndpointTask.newBuilder()
-          .setConfig(conf)
-          .setEndpointStateMachine(endpoint)
-          .setDatanodeDetails(context.getParent().getDatanodeDetails())
-          .setContext(context)
-          .build();
-    case SHUTDOWN:
-      break;
-    default:
-      throw new IllegalArgumentException("Illegal Argument.");
-    }
-    return null;
-  }
-
-  /**
-   * Computes the next state the container state machine must move to by 
looking
-   * at all the state of endpoints.
-   * <p>
-   * if any endpoint state has moved to Shutdown, either we have an
-   * unrecoverable error or we have been told to shutdown. Either case the
-   * datanode state machine should move to Shutdown state, otherwise we
-   * remain in the Running state.
-   *
-   * @return next container state.
-   */
-  private DatanodeStateMachine.DatanodeStates
-      computeNextContainerState(
-      List<Future<EndpointStateMachine.EndPointStates>> results) {
-    for (Future<EndpointStateMachine.EndPointStates> state : results) {
-      try {
-        if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) {
-          // if any endpoint tells us to shutdown we move to shutdown state.
-          return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Error in executing end point task.", e);
-      }
-    }
-    return DatanodeStateMachine.DatanodeStates.RUNNING;
-  }
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param duration - Time
-   * @param timeUnit - Unit of duration.
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates
-      await(long duration, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    int count = connectionManager.getValues().size();
-    int returned = 0;
-    long timeLeft = timeUnit.toMillis(duration);
-    long startTime = Time.monotonicNow();
-    List<Future<EndpointStateMachine.EndPointStates>> results = new
-        LinkedList<>();
-
-    while (returned < count && timeLeft > 0) {
-      Future<EndpointStateMachine.EndPointStates> result =
-          ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
-      if (result != null) {
-        results.add(result);
-        returned++;
-      }
-      timeLeft = timeLeft - (Time.monotonicNow() - startTime);
-    }
-    return computeNextContainerState(results);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
deleted file mode 100644
index 6b8d16c..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-/**
- This package contians files that guide the state transitions from
- Init->Running->Shutdown for the datanode.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
deleted file mode 100644
index 29f1f9c..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto;
-import org.apache.hadoop.ozone.container.common.helpers
-    .DeletedContainerBlocksSummary;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine.EndPointStates;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
-import org.apache.hadoop.hdsl.protocol.proto
-     .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.ZonedDateTime;
-import java.util.concurrent.Callable;
-
-/**
- * Heartbeat class for SCMs.
- */
-public class HeartbeatEndpointTask
-    implements Callable<EndpointStateMachine.EndPointStates> {
-  static final Logger LOG =
-      LoggerFactory.getLogger(HeartbeatEndpointTask.class);
-  private final EndpointStateMachine rpcEndpoint;
-  private final Configuration conf;
-  private DatanodeDetailsProto datanodeDetailsProto;
-  private StateContext context;
-
-  /**
-   * Constructs a SCM heart beat.
-   *
-   * @param conf Config.
-   */
-  public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
-      Configuration conf, StateContext context) {
-    this.rpcEndpoint = rpcEndpoint;
-    this.conf = conf;
-    this.context = context;
-  }
-
-  /**
-   * Get the container Node ID proto.
-   *
-   * @return ContainerNodeIDProto
-   */
-  public DatanodeDetailsProto getDatanodeDetailsProto() {
-    return datanodeDetailsProto;
-  }
-
-  /**
-   * Set container node ID proto.
-   *
-   * @param datanodeDetailsProto - the node id.
-   */
-  public void setDatanodeDetailsProto(DatanodeDetailsProto
-      datanodeDetailsProto) {
-    this.datanodeDetailsProto = datanodeDetailsProto;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-    rpcEndpoint.lock();
-    try {
-      Preconditions.checkState(this.datanodeDetailsProto != null);
-
-      SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
-          .sendHeartbeat(datanodeDetailsProto, this.context.getNodeReport(),
-              this.context.getContainerReportState());
-      processResponse(reponse, datanodeDetailsProto);
-      rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
-      rpcEndpoint.zeroMissedCount();
-    } catch (IOException ex) {
-      rpcEndpoint.logIfNeeded(ex);
-    } finally {
-      rpcEndpoint.unlock();
-    }
-    return rpcEndpoint.getState();
-  }
-
-  /**
-   * Returns a builder class for HeartbeatEndpointTask task.
-   * @return   Builder.
-   */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Add this command to command processing Queue.
-   *
-   * @param response - SCMHeartbeat response.
-   */
-  private void processResponse(SCMHeartbeatResponseProto response,
-      final DatanodeDetailsProto datanodeDetails) {
-    for (SCMCommandResponseProto commandResponseProto : response
-        .getCommandsList()) {
-      // Verify the response is indeed for this datanode.
-      Preconditions.checkState(commandResponseProto.getDatanodeUUID()
-          .equalsIgnoreCase(datanodeDetails.getUuid()),
-          "Unexpected datanode ID in the response.");
-      switch (commandResponseProto.getCmdType()) {
-      case sendContainerReport:
-        this.context.addCommand(SendContainerCommand.getFromProtobuf(
-            commandResponseProto.getSendReport()));
-        break;
-      case reregisterCommand:
-        if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Received SCM notification to register."
-                + " Interrupt HEARTBEAT and transit to REGISTER state.");
-          }
-          rpcEndpoint.setState(EndPointStates.REGISTER);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Illegal state {} found, expecting {}.",
-                rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
-          }
-        }
-        break;
-      case deleteBlocksCommand:
-        DeleteBlocksCommand db = DeleteBlocksCommand
-            .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
-        if (!db.blocksTobeDeleted().isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(DeletedContainerBlocksSummary
-                .getFrom(db.blocksTobeDeleted())
-                .toString());
-          }
-          this.context.addCommand(db);
-        }
-        break;
-      case closeContainerCommand:
-        CloseContainerCommand closeContainer =
-            CloseContainerCommand.getFromProtobuf(
-                commandResponseProto.getCloseContainerProto());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Received SCM container close request for container {}",
-              closeContainer.getContainerName());
-        }
-        this.context.addCommand(closeContainer);
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown response : "
-            + commandResponseProto.getCmdType().name());
-      }
-    }
-  }
-
-  /**
-   * Builder class for HeartbeatEndpointTask.
-   */
-  public static class Builder {
-    private EndpointStateMachine endPointStateMachine;
-    private Configuration conf;
-    private DatanodeDetails datanodeDetails;
-    private StateContext context;
-
-    /**
-     * Constructs the builder class.
-     */
-    public Builder() {
-    }
-
-    /**
-     * Sets the endpoint state machine.
-     *
-     * @param rpcEndPoint - Endpoint state machine.
-     * @return Builder
-     */
-    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
-      this.endPointStateMachine = rpcEndPoint;
-      return this;
-    }
-
-    /**
-     * Sets the Config.
-     *
-     * @param config - config
-     * @return Builder
-     */
-    public Builder setConfig(Configuration config) {
-      this.conf = config;
-      return this;
-    }
-
-    /**
-     * Sets the NodeID.
-     *
-     * @param dnDetails - NodeID proto
-     * @return Builder
-     */
-    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
-      this.datanodeDetails = dnDetails;
-      return this;
-    }
-
-    /**
-     * Sets the context.
-     * @param stateContext - State context.
-     * @return this.
-     */
-    public Builder setContext(StateContext stateContext) {
-      this.context = stateContext;
-      return this;
-    }
-
-    public HeartbeatEndpointTask build() {
-      if (endPointStateMachine == null) {
-        LOG.error("No endpoint specified.");
-        throw new IllegalArgumentException("A valid endpoint state machine is" 
+
-            " needed to construct HeartbeatEndpointTask task");
-      }
-
-      if (conf == null) {
-        LOG.error("No config specified.");
-        throw new IllegalArgumentException("A valid configration is needed to" 
+
-            " construct HeartbeatEndpointTask task");
-      }
-
-      if (datanodeDetails == null) {
-        LOG.error("No datanode specified.");
-        throw new IllegalArgumentException("A vaild Node ID is needed to " +
-            "construct HeartbeatEndpointTask task");
-      }
-
-      HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
-          .endPointStateMachine, this.conf, this.context);
-      task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
-      return task;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
deleted file mode 100644
index bfe6a28..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos.DatanodeDetailsProto;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-
-/**
- * Register a container with SCM.
- */
-public final class RegisterEndpointTask implements
-    Callable<EndpointStateMachine.EndPointStates> {
-  static final Logger LOG = 
LoggerFactory.getLogger(RegisterEndpointTask.class);
-
-  private final EndpointStateMachine rpcEndPoint;
-  private final Configuration conf;
-  private Future<EndpointStateMachine.EndPointStates> result;
-  private DatanodeDetailsProto datanodeDetailsProto;
-
-  /**
-   * Creates a register endpoint task.
-   *
-   * @param rpcEndPoint - endpoint
-   * @param conf - conf
-   */
-  @VisibleForTesting
-  public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
-      Configuration conf) {
-    this.rpcEndPoint = rpcEndPoint;
-    this.conf = conf;
-
-  }
-
-  /**
-   * Get the DatanodeDetailsProto Proto.
-   *
-   * @return DatanodeDetailsProto
-   */
-  public DatanodeDetailsProto getDatanodeDetailsProto() {
-    return datanodeDetailsProto;
-  }
-
-  /**
-   * Set the contiainerNodeID Proto.
-   *
-   * @param datanodeDetailsProto - Container Node ID.
-   */
-  public void setDatanodeDetailsProto(
-      DatanodeDetailsProto datanodeDetailsProto) {
-    this.datanodeDetailsProto = datanodeDetailsProto;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-
-    if (getDatanodeDetailsProto() == null) {
-      LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " 
+
-          "shutting down the endpoint.");
-      return 
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
-    }
-
-    rpcEndPoint.lock();
-    try {
-
-      // TODO : Add responses to the command Queue.
-      rpcEndPoint.getEndPoint().register(datanodeDetailsProto,
-          conf.getStrings(ScmConfigKeys.OZONE_SCM_NAMES));
-      EndpointStateMachine.EndPointStates nextState =
-          rpcEndPoint.getState().getNextState();
-      rpcEndPoint.setState(nextState);
-      rpcEndPoint.zeroMissedCount();
-    } catch (IOException ex) {
-      rpcEndPoint.logIfNeeded(ex
-      );
-    } finally {
-      rpcEndPoint.unlock();
-    }
-
-    return rpcEndPoint.getState();
-  }
-
-  /**
-   * Returns a builder class for RegisterEndPoint task.
-   *
-   * @return Builder.
-   */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Builder class for RegisterEndPoint task.
-   */
-  public static class Builder {
-    private EndpointStateMachine endPointStateMachine;
-    private Configuration conf;
-    private DatanodeDetails datanodeDetails;
-
-    /**
-     * Constructs the builder class.
-     */
-    public Builder() {
-    }
-
-    /**
-     * Sets the endpoint state machine.
-     *
-     * @param rpcEndPoint - Endpoint state machine.
-     * @return Builder
-     */
-    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
-      this.endPointStateMachine = rpcEndPoint;
-      return this;
-    }
-
-    /**
-     * Sets the Config.
-     *
-     * @param config - config
-     * @return Builder.
-     */
-    public Builder setConfig(Configuration config) {
-      this.conf = config;
-      return this;
-    }
-
-    /**
-     * Sets the NodeID.
-     *
-     * @param dnDetails - NodeID proto
-     * @return Builder
-     */
-    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
-      this.datanodeDetails = dnDetails;
-      return this;
-    }
-
-    public RegisterEndpointTask build() {
-      if (endPointStateMachine == null) {
-        LOG.error("No endpoint specified.");
-        throw new IllegalArgumentException("A valid endpoint state machine is" 
+
-            " needed to construct RegisterEndPoint task");
-      }
-
-      if (conf == null) {
-        LOG.error("No config specified.");
-        throw new IllegalArgumentException("A valid configration is needed to" 
+
-            " construct RegisterEndpoint task");
-      }
-
-      if (datanodeDetails == null) {
-        LOG.error("No datanode specified.");
-        throw new IllegalArgumentException("A vaild Node ID is needed to " +
-            "construct RegisterEndpoint task");
-      }
-
-      RegisterEndpointTask task = new RegisterEndpointTask(this
-          .endPointStateMachine, this.conf);
-      task.setDatanodeDetailsProto(datanodeDetails.getProtoBufMessage());
-      return task;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
deleted file mode 100644
index 502d827..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import org.apache.hadoop.ozone.protocol.VersionResponse;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-
-/**
- * Task that returns version.
- */
-public class VersionEndpointTask implements
-    Callable<EndpointStateMachine.EndPointStates> {
-  private final EndpointStateMachine rpcEndPoint;
-  private final Configuration configuration;
-
-  public VersionEndpointTask(EndpointStateMachine rpcEndPoint,
-      Configuration conf) {
-    this.rpcEndPoint = rpcEndPoint;
-    this.configuration = conf;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-    rpcEndPoint.lock();
-    try{
-      SCMVersionResponseProto versionResponse =
-          rpcEndPoint.getEndPoint().getVersion(null);
-      rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));
-
-      EndpointStateMachine.EndPointStates nextState =
-          rpcEndPoint.getState().getNextState();
-      rpcEndPoint.setState(nextState);
-      rpcEndPoint.zeroMissedCount();
-    } catch (IOException ex) {
-      rpcEndPoint.logIfNeeded(ex);
-    } finally {
-      rpcEndPoint.unlock();
-    }
-    return rpcEndPoint.getState();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
deleted file mode 100644
index 1122598..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-/**
- This package contains code for RPC endpoints transitions.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
deleted file mode 100644
index 92c953f..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
deleted file mode 100644
index bd180ef..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-
-/**
- * Creates a netty server endpoint that acts as the communication layer for
- * Ozone containers.
- */
-public final class XceiverServer implements XceiverServerSpi {
-  private static final Logger
-      LOG = LoggerFactory.getLogger(XceiverServer.class);
-  private int port;
-  private final ContainerDispatcher storageContainer;
-
-  private EventLoopGroup bossGroup;
-  private EventLoopGroup workerGroup;
-  private Channel channel;
-
-  /**
-   * Constructs a netty server class.
-   *
-   * @param conf - Configuration
-   */
-  public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
-                       ContainerDispatcher dispatcher) {
-    Preconditions.checkNotNull(conf);
-
-    this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    // Get an available port on current node and
-    // use that as the container port
-    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        this.port = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", this.port);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", this.port, e);
-      }
-    }
-    datanodeDetails.setContainerPort(port);
-    this.storageContainer = dispatcher;
-  }
-
-  @Override
-  public int getIPCPort() {
-    return this.port;
-  }
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   *
-   * @return enum -- {Stand_Alone, Ratis, Chained}
-   */
-  @Override
-  public HdslProtos.ReplicationType getServerType() {
-    return HdslProtos.ReplicationType.STAND_ALONE;
-  }
-
-  @Override
-  public void start() throws IOException {
-    bossGroup = new NioEventLoopGroup();
-    workerGroup = new NioEventLoopGroup();
-    channel = new ServerBootstrap()
-        .group(bossGroup, workerGroup)
-        .channel(NioServerSocketChannel.class)
-        .handler(new LoggingHandler(LogLevel.INFO))
-        .childHandler(new XceiverServerInitializer(storageContainer))
-        .bind(port)
-        .syncUninterruptibly()
-        .channel();
-  }
-
-  @Override
-  public void stop() {
-    if (storageContainer != null) {
-      storageContainer.shutdown();
-    }
-    if (bossGroup != null) {
-      bossGroup.shutdownGracefully();
-    }
-    if (workerGroup != null) {
-      workerGroup.shutdownGracefully();
-    }
-    if (channel != null) {
-      channel.close().awaitUninterruptibly();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
deleted file mode 100644
index 6c42c84e1..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-
-/**
- * Netty server handlers that respond to Network events.
- */
-public class XceiverServerHandler extends
-    SimpleChannelInboundHandler<ContainerCommandRequestProto> {
-
-  static final Logger LOG = 
LoggerFactory.getLogger(XceiverServerHandler.class);
-  private final ContainerDispatcher dispatcher;
-
-  /**
-   * Constructor for server handler.
-   * @param dispatcher - Dispatcher interface
-   */
-  public XceiverServerHandler(ContainerDispatcher dispatcher) {
-    this.dispatcher = dispatcher;
-  }
-
-  /**
-   * <strong>Please keep in mind that this method will be renamed to {@code
-   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
-   * <p>
-   * Is called for each message of type {@link ContainerCommandRequestProto}.
-   *
-   * @param ctx the {@link ChannelHandlerContext} which this {@link
-   *            SimpleChannelInboundHandler} belongs to
-   * @param msg the message to handle
-   * @throws Exception is thrown if an error occurred
-   */
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx,
-                           ContainerCommandRequestProto msg) throws
-      Exception {
-    ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
-    LOG.debug("Writing the reponse back to client.");
-    ctx.writeAndFlush(response);
-
-  }
-
-  /**
-   * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
-   * Sub-classes may override this method to change behavior.
-   *
-   * @param ctx   - Channel Handler Context
-   * @param cause - Exception
-   */
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-      throws Exception {
-    LOG.error("An exception caught in the pipeline : " + cause.toString());
-    super.exceptionCaught(ctx, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
deleted file mode 100644
index 036a654..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-
-/**
- * Creates a channel for the XceiverServer.
- */
-public class XceiverServerInitializer extends 
ChannelInitializer<SocketChannel>{
-  private final ContainerDispatcher dispatcher;
-  public XceiverServerInitializer(ContainerDispatcher dispatcher) {
-    Preconditions.checkNotNull(dispatcher);
-    this.dispatcher = dispatcher;
-  }
-
-  /**
-   * This method will be called once the Channel is registered. After
-   * the method returns this instance will be removed from the {@link
-   * ChannelPipeline}
-   *
-   * @param ch the  which was registered.
-   * @throws Exception is thrown if an error occurs. In that case the channel
-   * will be closed.
-   */
-  @Override
-  protected void initChannel(SocketChannel ch) throws Exception {
-    ChannelPipeline pipeline = ch.pipeline();
-    pipeline.addLast(new ProtobufVarint32FrameDecoder());
-    pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
-        .getDefaultInstance()));
-    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
-    pipeline.addLast(new ProtobufEncoder());
-    pipeline.addLast(new XceiverServerHandler(dispatcher));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
deleted file mode 100644
index 09bd6fc..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-
-import java.io.IOException;
-
-/** A server endpoint that acts as the communication layer for Ozone
- * containers. */
-public interface XceiverServerSpi {
-  /** Starts the server. */
-  void start() throws IOException;
-
-  /** Stops a running server. */
-  void stop();
-
-  /** Get server IPC port. */
-  int getIPCPort();
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   * @return enum -- {Stand_Alone, Ratis, Chained}
-   */
-  HdslProtos.ReplicationType getServerType();
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
deleted file mode 100644
index 59c96f1..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-/**
- * This package contains classes for the server of the storage container
- * protocol.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
deleted file mode 100644
index 08a9614..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import 
org.apache.hadoop.hdsl.protocol.proto.ContainerProtos.WriteChunkRequestProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.StateMachineStorage;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ConcurrentHashMap;
-
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
- *
- * The stateMachine is responsible for handling different types of container
- * requests. The container requests can be divided into readonly and write
- * requests.
- *
- * Read only requests are classified in
- * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
- * and these readonly requests are replied from the {@link #query(Message)}.
- *
- * The write requests can be divided into requests with user data
- * (WriteChunkRequest) and other request without user data.
- *
- * Inorder to optimize the write throughput, the writeChunk request is
- * processed in 2 phases. The 2 phases are divided in
- * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
- * data is written directly into the state machine via
- * {@link #writeStateMachineData} and in the second phase the
- * transaction is committed via {@link #applyTransaction(TransactionContext)}
- *
- * For the requests with no stateMachine data, the transaction is directly
- * committed through
- * {@link #applyTransaction(TransactionContext)}
- *
- * There are 2 ordering operation which are enforced right now in the code,
- * 1) Write chunk operation are executed after the create container operation,
- * the write chunk operation will fail otherwise as the container still hasn't
- * been created. Hence the create container operation has been split in the
- * {@link #startTransaction(RaftClientRequest)}, this will help in 
synchronizing
- * the calls in {@link #writeStateMachineData}
- *
- * 2) Write chunk commit operation is executed after write chunk state machine
- * operation. This will ensure that commit operation is sync'd with the state
- * machine operation.
- * */
-public class ContainerStateMachine extends BaseStateMachine {
-  static final Logger LOG = LoggerFactory.getLogger(
-      ContainerStateMachine.class);
-  private final SimpleStateMachineStorage storage
-      = new SimpleStateMachineStorage();
-  private final ContainerDispatcher dispatcher;
-  private ThreadPoolExecutor writeChunkExecutor;
-  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-      writeChunkFutureMap;
-  private final ConcurrentHashMap<String, CompletableFuture<Message>>
-      createContainerFutureMap;
-
-  ContainerStateMachine(ContainerDispatcher dispatcher,
-      ThreadPoolExecutor writeChunkExecutor) {
-    this.dispatcher = dispatcher;
-    this.writeChunkExecutor = writeChunkExecutor;
-    this.writeChunkFutureMap = new ConcurrentHashMap<>();
-    this.createContainerFutureMap = new ConcurrentHashMap<>();
-  }
-
-  @Override
-  public StateMachineStorage getStateMachineStorage() {
-    return storage;
-  }
-
-  @Override
-  public void initialize(
-      RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
-      throws IOException {
-    super.initialize(id, properties, raftStorage);
-    storage.init(raftStorage);
-    //  TODO handle snapshots
-
-    // TODO: Add a flag that tells you that initialize has been called.
-    // Check with Ratis if this feature is done in Ratis.
-  }
-
-  @Override
-  public TransactionContext startTransaction(RaftClientRequest request)
-      throws IOException {
-    final ContainerCommandRequestProto proto =
-        getRequestProto(request.getMessage().getContent());
-
-    final SMLogEntryProto log;
-    if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) {
-      final WriteChunkRequestProto write = proto.getWriteChunk();
-      // create the state machine data proto
-      final WriteChunkRequestProto dataWriteChunkProto =
-          WriteChunkRequestProto
-              .newBuilder(write)
-              .setStage(ContainerProtos.Stage.WRITE_DATA)
-              .build();
-      ContainerCommandRequestProto dataContainerCommandProto =
-          ContainerCommandRequestProto
-              .newBuilder(proto)
-              .setWriteChunk(dataWriteChunkProto)
-              .build();
-
-      // create the log entry proto
-      final WriteChunkRequestProto commitWriteChunkProto =
-          WriteChunkRequestProto.newBuilder()
-              .setPipeline(write.getPipeline())
-              .setKeyName(write.getKeyName())
-              .setChunkData(write.getChunkData())
-              // skipping the data field as it is
-              // already set in statemachine data proto
-              .setStage(ContainerProtos.Stage.COMMIT_DATA)
-              .build();
-      ContainerCommandRequestProto commitContainerCommandProto =
-          ContainerCommandRequestProto
-              .newBuilder(proto)
-              .setWriteChunk(commitWriteChunkProto)
-              .build();
-
-      log = SMLogEntryProto.newBuilder()
-          .setData(getShadedByteString(commitContainerCommandProto))
-          .setStateMachineData(getShadedByteString(dataContainerCommandProto))
-          .build();
-    } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
-      log = SMLogEntryProto.newBuilder()
-          .setData(request.getMessage().getContent())
-          .setStateMachineData(request.getMessage().getContent())
-          .build();
-    } else {
-      log = SMLogEntryProto.newBuilder()
-          .setData(request.getMessage().getContent())
-          .build();
-    }
-    return new TransactionContextImpl(this, request, log);
-  }
-
-  private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
-    return ShadedProtoUtil.asShadedByteString(proto.toByteArray());
-  }
-
-  private ContainerCommandRequestProto getRequestProto(ByteString request)
-      throws InvalidProtocolBufferException {
-    return ContainerCommandRequestProto.parseFrom(
-        ShadedProtoUtil.asByteString(request));
-  }
-
-  private Message runCommand(ContainerCommandRequestProto requestProto) {
-    LOG.trace("dispatch {}", requestProto);
-    ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
-    LOG.trace("response {}", response);
-    return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
-  }
-
-  private CompletableFuture<Message> handleWriteChunk(
-      ContainerCommandRequestProto requestProto, long entryIndex) {
-    final WriteChunkRequestProto write = requestProto.getWriteChunk();
-    String containerName = write.getPipeline().getContainerName();
-    CompletableFuture<Message> future =
-        createContainerFutureMap.get(containerName);
-    CompletableFuture<Message> writeChunkFuture;
-    if (future != null) {
-      writeChunkFuture = future.thenApplyAsync(
-          v -> runCommand(requestProto), writeChunkExecutor);
-    } else {
-      writeChunkFuture = CompletableFuture.supplyAsync(
-          () -> runCommand(requestProto), writeChunkExecutor);
-    }
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
-    return writeChunkFuture;
-  }
-
-  private CompletableFuture<Message> handleCreateContainer(
-      ContainerCommandRequestProto requestProto) {
-    String containerName =
-        requestProto.getCreateContainer().getContainerData().getName();
-    createContainerFutureMap.
-        computeIfAbsent(containerName, k -> new CompletableFuture<>());
-    return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
-  }
-
-  @Override
-  public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) 
{
-    try {
-      final ContainerCommandRequestProto requestProto =
-          getRequestProto(entry.getSmLogEntry().getStateMachineData());
-      ContainerProtos.Type cmdType = requestProto.getCmdType();
-      switch (cmdType) {
-      case CreateContainer:
-        return handleCreateContainer(requestProto);
-      case WriteChunk:
-        return handleWriteChunk(requestProto, entry.getIndex());
-      default:
-        throw new IllegalStateException("Cmd Type:" + cmdType
-            + " should not have state machine data");
-      }
-    } catch (IOException e) {
-      return completeExceptionally(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Message> query(Message request) {
-    try {
-      final ContainerCommandRequestProto requestProto =
-          getRequestProto(request.getContent());
-      return CompletableFuture.completedFuture(runCommand(requestProto));
-    } catch (IOException e) {
-      return completeExceptionally(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    try {
-      ContainerCommandRequestProto requestProto =
-          getRequestProto(trx.getSMLogEntry().getData());
-      ContainerProtos.Type cmdType = requestProto.getCmdType();
-
-      if (cmdType == ContainerProtos.Type.WriteChunk) {
-        WriteChunkRequestProto write = requestProto.getWriteChunk();
-        // the data field has already been removed in start Transaction
-        Preconditions.checkArgument(!write.hasData());
-        CompletableFuture<Message> stateMachineFuture =
-            writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
-        return stateMachineFuture
-            .thenComposeAsync(v ->
-                CompletableFuture.completedFuture(runCommand(requestProto)));
-      } else {
-        Message message = runCommand(requestProto);
-        if (cmdType == ContainerProtos.Type.CreateContainer) {
-          String containerName =
-              requestProto.getCreateContainer().getContainerData().getName();
-          createContainerFutureMap.remove(containerName).complete(message);
-        }
-        return CompletableFuture.completedFuture(message);
-      }
-    } catch (IOException e) {
-      return completeExceptionally(e);
-    }
-  }
-
-  private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
-    final CompletableFuture<T> future = new CompletableFuture<>();
-    future.completeExceptionally(e);
-    return future;
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
deleted file mode 100644
index 7aee5bb..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server
-    .XceiverServerSpi;
-
-import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.RatisHelper;
-import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-import java.util.Objects;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Creates a ratis server endpoint that acts as the communication layer for
- * Ozone containers.
- */
-public final class XceiverServerRatis implements XceiverServerSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
-  private final int port;
-  private final RaftServer server;
-  private ThreadPoolExecutor writeChunkExecutor;
-
-  private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
-      ContainerDispatcher dispatcher, Configuration conf) throws IOException {
-
-    final String rpcType = conf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
-    final int raftSegmentSize = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
-    final int raftSegmentPreallocatedSize = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
-    final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
-    final int numWriteChunkThreads = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
-
-    Objects.requireNonNull(dd, "id == null");
-    this.port = port;
-    RaftProperties serverProperties = newRaftProperties(rpc, port,
-        storageDir, maxChunkSize, raftSegmentSize, 
raftSegmentPreallocatedSize);
-
-    writeChunkExecutor =
-        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
-            100, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(1024),
-            new ThreadPoolExecutor.CallerRunsPolicy());
-    ContainerStateMachine stateMachine =
-        new ContainerStateMachine(dispatcher, writeChunkExecutor);
-    this.server = RaftServer.newBuilder()
-        .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setGroup(RatisHelper.emptyRaftGroup())
-        .setProperties(serverProperties)
-        .setStateMachine(stateMachine)
-        .build();
-  }
-
-  private static RaftProperties newRaftProperties(
-      RpcType rpc, int port, String storageDir, int scmChunkSize,
-      int raftSegmentSize, int raftSegmentPreallocatedSize) {
-    final RaftProperties properties = new RaftProperties();
-    RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
-    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
-        SizeInBytes.valueOf(scmChunkSize));
-    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
-        SizeInBytes.valueOf(raftSegmentSize));
-    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
-    RaftConfigKeys.Rpc.setType(properties, rpc);
-
-    RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
-    GrpcConfigKeys.setMessageSizeMax(properties,
-        SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
-        TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
-    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
-        TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
-    if (rpc == SupportedRpcType.GRPC) {
-      GrpcConfigKeys.Server.setPort(properties, port);
-    } else if (rpc == SupportedRpcType.NETTY) {
-      NettyConfigKeys.Server.setPort(properties, port);
-    }
-    return properties;
-  }
-
-  public static XceiverServerRatis newXceiverServerRatis(
-      DatanodeDetails datanodeDetails, Configuration ozoneConf,
-      ContainerDispatcher dispatcher) throws IOException {
-    final String ratisDir = File.separator + "ratis";
-    int localPort = ozoneConf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
-    String storageDir = ozoneConf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
-
-    if (Strings.isNullOrEmpty(storageDir)) {
-      storageDir = ozoneConf.get(OzoneConfigKeys
-          .OZONE_METADATA_DIRS);
-      Preconditions.checkNotNull(storageDir, "ozone.metadata.dirs " +
-          "cannot be null, Please check your configs.");
-      storageDir = storageDir.concat(ratisDir);
-      LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " 
+
-              "storage under {}. It is a good idea to map this to an SSD 
disk.",
-          storageDir);
-    }
-
-    // Get an available port on current node and
-    // use that as the container port
-    if (ozoneConf.getBoolean(OzoneConfigKeys
-            .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        localPort = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", localPort);
-        // If we have random local ports configured this means that it
-        // probably running under MiniOzoneCluster. Ratis locks the storage
-        // directories, so we need to pass different local directory for each
-        // local instance. So we map ratis directories under datanode ID.
-        storageDir =
-            storageDir.concat(File.separator +
-                datanodeDetails.getUuidString());
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", localPort, e);
-      }
-    }
-    datanodeDetails.setRatisPort(localPort);
-    return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
-        dispatcher, ozoneConf);
-  }
-
-  @Override
-  public void start() throws IOException {
-    LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
-        server.getId(), getIPCPort());
-    writeChunkExecutor.prestartAllCoreThreads();
-    server.start();
-  }
-
-  @Override
-  public void stop() {
-    try {
-      writeChunkExecutor.shutdown();
-      server.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public int getIPCPort() {
-    return port;
-  }
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   *
-   * @return enum -- {Stand_Alone, Ratis, Chained}
-   */
-  @Override
-  public HdslProtos.ReplicationType getServerType() {
-    return HdslProtos.ReplicationType.RATIS;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
deleted file mode 100644
index 8debfe0..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-/**
- * This package contains classes for the server implementation
- * using Apache Ratis
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
deleted file mode 100644
index 6ae45b6..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.utils;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections.MapIterator;
-import org.apache.commons.collections.map.LRUMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * container cache is a LRUMap that maintains the DB handles.
- */
-public final class ContainerCache extends LRUMap {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerCache.class);
-  private final Lock lock = new ReentrantLock();
-  private static ContainerCache cache;
-  private static final float LOAD_FACTOR = 0.75f;
-  /**
-   * Constructs a cache that holds DBHandle references.
-   */
-  private ContainerCache(int maxSize, float loadFactor, boolean
-      scanUntilRemovable) {
-    super(maxSize, loadFactor, scanUntilRemovable);
-  }
-
-  /**
-   * Return a singleton instance of {@link ContainerCache}
-   * that holds the DB handlers.
-   *
-   * @param conf - Configuration.
-   * @return A instance of {@link ContainerCache}.
-   */
-  public synchronized static ContainerCache getInstance(Configuration conf) {
-    if (cache == null) {
-      int cacheSize = conf.getInt(OzoneConfigKeys.OZONE_CONTAINER_CACHE_SIZE,
-          OzoneConfigKeys.OZONE_CONTAINER_CACHE_DEFAULT);
-      cache = new ContainerCache(cacheSize, LOAD_FACTOR, true);
-    }
-    return cache;
-  }
-
-  /**
-   * Closes a db instance.
-   *
-   * @param container - name of the container to be closed.
-   * @param db - db instance to close.
-   */
-  private void closeDB(String container, MetadataStore db) {
-    if (db != null) {
-      try {
-        db.close();
-      } catch (IOException e) {
-        LOG.error("Error closing DB. Container: " + container, e);
-      }
-    }
-  }
-
-  /**
-   * Closes all the db instances and resets the cache.
-   */
-  public void shutdownCache() {
-    lock.lock();
-    try {
-      // iterate the cache and close each db
-      MapIterator iterator = cache.mapIterator();
-      while (iterator.hasNext()) {
-        iterator.next();
-        MetadataStore db = (MetadataStore) iterator.getValue();
-        closeDB(iterator.getKey().toString(), db);
-      }
-      // reset the cache
-      cache.clear();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  protected boolean removeLRU(LinkEntry entry) {
-    lock.lock();
-    try {
-      MetadataStore db = (MetadataStore) entry.getValue();
-      closeDB(entry.getKey().toString(), db);
-    } finally {
-      lock.unlock();
-    }
-    return true;
-  }
-
-  /**
-   * Returns a DB handle if available, create the handler otherwise.
-   *
-   * @param containerName - Name of the container.
-   * @return MetadataStore.
-   */
-  public MetadataStore getDB(String containerName, String containerDBPath)
-      throws IOException {
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkState(!containerName.isEmpty());
-    lock.lock();
-    try {
-      MetadataStore db = (MetadataStore) this.get(containerName);
-
-      if (db == null) {
-        db = MetadataStoreBuilder.newBuilder()
-            .setDbFile(new File(containerDBPath))
-            .setCreateIfMissing(false)
-            .build();
-        this.put(containerName, db);
-      }
-      return db;
-    } catch (Exception e) {
-      LOG.error("Error opening DB. Container:{} ContainerPath:{}",
-          containerName, containerDBPath, e);
-      throw e;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Remove a DB handler from cache.
-   *
-   * @param containerName - Name of the container.
-   */
-  public void removeDB(String containerName) {
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkState(!containerName.isEmpty());
-    lock.lock();
-    try {
-      MetadataStore db = (MetadataStore)this.get(containerName);
-      closeDB(containerName, db);
-      this.remove(containerName);
-    } finally {
-      lock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
deleted file mode 100644
index 08264f0..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.utils;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to