http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
deleted file mode 100644
index 690aa01..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-
-/**
- * Register a container with SCM.
- */
-public final class RegisterEndpointTask implements
-    Callable<EndpointStateMachine.EndPointStates> {
-  static final Logger LOG = 
LoggerFactory.getLogger(RegisterEndpointTask.class);
-
-  private final EndpointStateMachine rpcEndPoint;
-  private final Configuration conf;
-  private Future<EndpointStateMachine.EndPointStates> result;
-  private DatanodeDetails datanodeDetails;
-  private final OzoneContainer datanodeContainerManager;
-  private StateContext stateContext;
-
-  /**
-   * Creates a register endpoint task.
-   *
-   * @param rpcEndPoint - endpoint
-   * @param conf - conf
-   * @param ozoneContainer - container
-   */
-  @VisibleForTesting
-  public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
-      Configuration conf, OzoneContainer ozoneContainer,
-      StateContext context) {
-    this.rpcEndPoint = rpcEndPoint;
-    this.conf = conf;
-    this.datanodeContainerManager = ozoneContainer;
-    this.stateContext = context;
-
-  }
-
-  /**
-   * Get the DatanodeDetails.
-   *
-   * @return DatanodeDetailsProto
-   */
-  public DatanodeDetails getDatanodeDetails() {
-    return datanodeDetails;
-  }
-
-  /**
-   * Set the contiainerNodeID Proto.
-   *
-   * @param datanodeDetails - Container Node ID.
-   */
-  public void setDatanodeDetails(
-      DatanodeDetails datanodeDetails) {
-    this.datanodeDetails = datanodeDetails;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-
-    if (getDatanodeDetails() == null) {
-      LOG.error("DatanodeDetails cannot be null in RegisterEndpoint task, " +
-          "shutting down the endpoint.");
-      return 
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
-    }
-
-    rpcEndPoint.lock();
-    try {
-
-      ContainerReportsProto containerReport = datanodeContainerManager
-          .getContainerReport();
-      NodeReportProto nodeReport = datanodeContainerManager.getNodeReport();
-      PipelineReportsProto pipelineReportsProto =
-              datanodeContainerManager.getPipelineReport();
-      // TODO : Add responses to the command Queue.
-      SCMRegisteredResponseProto response = rpcEndPoint.getEndPoint()
-          .register(datanodeDetails.getProtoBufMessage(), nodeReport,
-                  containerReport, pipelineReportsProto);
-      Preconditions.checkState(UUID.fromString(response.getDatanodeUUID())
-              .equals(datanodeDetails.getUuid()),
-          "Unexpected datanode ID in the response.");
-      Preconditions.checkState(!StringUtils.isBlank(response.getClusterID()),
-          "Invalid cluster ID in the response.");
-      if (response.hasHostname() && response.hasIpAddress()) {
-        datanodeDetails.setHostName(response.getHostname());
-        datanodeDetails.setIpAddress(response.getIpAddress());
-      }
-      EndpointStateMachine.EndPointStates nextState =
-          rpcEndPoint.getState().getNextState();
-      rpcEndPoint.setState(nextState);
-      rpcEndPoint.zeroMissedCount();
-      this.stateContext.configureHeartbeatFrequency();
-    } catch (IOException ex) {
-      rpcEndPoint.logIfNeeded(ex);
-    } finally {
-      rpcEndPoint.unlock();
-    }
-
-    return rpcEndPoint.getState();
-  }
-
-  /**
-   * Returns a builder class for RegisterEndPoint task.
-   *
-   * @return Builder.
-   */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Builder class for RegisterEndPoint task.
-   */
-  public static class Builder {
-    private EndpointStateMachine endPointStateMachine;
-    private Configuration conf;
-    private DatanodeDetails datanodeDetails;
-    private OzoneContainer container;
-    private StateContext context;
-
-    /**
-     * Constructs the builder class.
-     */
-    public Builder() {
-    }
-
-    /**
-     * Sets the endpoint state machine.
-     *
-     * @param rpcEndPoint - Endpoint state machine.
-     * @return Builder
-     */
-    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
-      this.endPointStateMachine = rpcEndPoint;
-      return this;
-    }
-
-    /**
-     * Sets the Config.
-     *
-     * @param config - config
-     * @return Builder.
-     */
-    public Builder setConfig(Configuration config) {
-      this.conf = config;
-      return this;
-    }
-
-    /**
-     * Sets the NodeID.
-     *
-     * @param dnDetails - NodeID proto
-     * @return Builder
-     */
-    public Builder setDatanodeDetails(DatanodeDetails dnDetails) {
-      this.datanodeDetails = dnDetails;
-      return this;
-    }
-
-    /**
-     * Sets the ozonecontainer.
-     * @param ozoneContainer
-     * @return Builder
-     */
-    public Builder setOzoneContainer(OzoneContainer ozoneContainer) {
-      this.container = ozoneContainer;
-      return this;
-    }
-
-    public Builder setContext(StateContext stateContext) {
-      this.context = stateContext;
-      return this;
-    }
-
-    public RegisterEndpointTask build() {
-      if (endPointStateMachine == null) {
-        LOG.error("No endpoint specified.");
-        throw new IllegalArgumentException("A valid endpoint state machine is" 
+
-            " needed to construct RegisterEndPoint task");
-      }
-
-      if (conf == null) {
-        LOG.error("No config specified.");
-        throw new IllegalArgumentException(
-            "A valid configuration is needed to construct RegisterEndpoint "
-                + "task");
-      }
-
-      if (datanodeDetails == null) {
-        LOG.error("No datanode specified.");
-        throw new IllegalArgumentException("A vaild Node ID is needed to " +
-            "construct RegisterEndpoint task");
-      }
-
-      if (container == null) {
-        LOG.error("Container is not specified");
-        throw new IllegalArgumentException("Container is not specified to " +
-            "construct RegisterEndpoint task");
-      }
-
-      if (context == null) {
-        LOG.error("StateContext is not specified");
-        throw new IllegalArgumentException("Container is not specified to " +
-            "construct RegisterEndpoint task");
-      }
-
-      RegisterEndpointTask task = new RegisterEndpointTask(this
-          .endPointStateMachine, this.conf, this.container, this.context);
-      task.setDatanodeDetails(datanodeDetails);
-      return task;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
deleted file mode 100644
index 64e078d..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.VersionResponse;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-/**
- * Task that returns version.
- */
-public class VersionEndpointTask implements
-    Callable<EndpointStateMachine.EndPointStates> {
-  public static final Logger LOG = LoggerFactory.getLogger(VersionEndpointTask
-      .class);
-  private final EndpointStateMachine rpcEndPoint;
-  private final Configuration configuration;
-  private final OzoneContainer ozoneContainer;
-
-  public VersionEndpointTask(EndpointStateMachine rpcEndPoint,
-                             Configuration conf, OzoneContainer container) {
-    this.rpcEndPoint = rpcEndPoint;
-    this.configuration = conf;
-    this.ozoneContainer = container;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-    rpcEndPoint.lock();
-    try{
-      SCMVersionResponseProto versionResponse =
-          rpcEndPoint.getEndPoint().getVersion(null);
-      VersionResponse response = VersionResponse.getFromProtobuf(
-          versionResponse);
-      rpcEndPoint.setVersion(response);
-      VolumeSet volumeSet = ozoneContainer.getVolumeSet();
-      Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
-
-      String scmId = response.getValue(OzoneConsts.SCM_ID);
-      String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
-
-      Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
-          "null");
-      Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
-          "cannot be null");
-
-      // If version file does not exist create version file and also set scmId
-      for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
-        HddsVolume hddsVolume = entry.getValue();
-        boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
-            clusterId, LOG);
-        if (!result) {
-          volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
-        }
-      }
-      if (volumeSet.getVolumesList().size() == 0) {
-        // All volumes are inconsistent state
-        throw new DiskOutOfSpaceException("All configured Volumes are in " +
-            "Inconsistent State");
-      }
-      ozoneContainer.getDispatcher().setScmId(scmId);
-
-      EndpointStateMachine.EndPointStates nextState =
-          rpcEndPoint.getState().getNextState();
-      rpcEndPoint.setState(nextState);
-      rpcEndPoint.zeroMissedCount();
-    } catch (DiskOutOfSpaceException ex) {
-      rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
-    } catch(IOException ex) {
-      rpcEndPoint.logIfNeeded(ex);
-    } finally {
-      rpcEndPoint.unlock();
-    }
-    return rpcEndPoint.getState();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
deleted file mode 100644
index 1122598..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-/**
- This package contains code for RPC endpoints transitions.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
deleted file mode 100644
index 92c953f..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
deleted file mode 100644
index db4a86a..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto
-    .XceiverClientProtocolServiceGrpc;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Grpc Service for handling Container Commands on datanode.
- */
-public class GrpcXceiverService extends
-    XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase {
-  public static final Logger
-      LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
-
-  private final ContainerDispatcher dispatcher;
-
-  public GrpcXceiverService(ContainerDispatcher dispatcher) {
-    this.dispatcher = dispatcher;
-  }
-
-  @Override
-  public StreamObserver<ContainerCommandRequestProto> send(
-      StreamObserver<ContainerCommandResponseProto> responseObserver) {
-    return new StreamObserver<ContainerCommandRequestProto>() {
-      private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-      @Override
-      public void onNext(ContainerCommandRequestProto request) {
-        try {
-          ContainerCommandResponseProto resp = dispatcher.dispatch(request);
-          responseObserver.onNext(resp);
-        } catch (Throwable e) {
-          LOG.error("{} got exception when processing"
-                    + " ContainerCommandRequestProto {}: {}", request, e);
-          responseObserver.onError(e);
-        }
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        // for now we just log a msg
-        LOG.error("{}: ContainerCommand send on error. Exception: {}", t);
-      }
-
-      @Override
-      public void onCompleted() {
-        if (isClosed.compareAndSet(false, true)) {
-          LOG.debug("{}: ContainerCommand send completed");
-          responseObserver.onCompleted();
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
deleted file mode 100644
index c51da98..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
-    StorageContainerException;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-
-import org.apache.ratis.shaded.io.grpc.BindableService;
-import org.apache.ratis.shaded.io.grpc.Server;
-import org.apache.ratis.shaded.io.grpc.ServerBuilder;
-import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Creates a Grpc server endpoint that acts as the communication layer for
- * Ozone containers.
- */
-public final class XceiverServerGrpc implements XceiverServerSpi {
-  private static final Logger
-      LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
-  private int port;
-  private UUID id;
-  private Server server;
-  private final ContainerDispatcher storageContainer;
-
-  /**
-   * Constructs a Grpc server class.
-   *
-   * @param conf - Configuration
-   */
-  public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
-      ContainerDispatcher dispatcher, BindableService... additionalServices) {
-    Preconditions.checkNotNull(conf);
-
-    this.id = datanodeDetails.getUuid();
-    this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    // Get an available port on current node and
-    // use that as the container port
-    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        this.port = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", this.port);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", this.port, e);
-      }
-    }
-    datanodeDetails.setPort(
-        DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
-    server = ((NettyServerBuilder) ServerBuilder.forPort(port))
-        .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-        .addService(new GrpcXceiverService(dispatcher))
-        .build();
-    NettyServerBuilder nettyServerBuilder =
-        ((NettyServerBuilder) ServerBuilder.forPort(port))
-            
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
-            .addService(new GrpcXceiverService(dispatcher));
-    for (BindableService service : additionalServices) {
-      nettyServerBuilder.addService(service);
-    }
-    server = nettyServerBuilder.build();
-    storageContainer = dispatcher;
-  }
-
-  @Override
-  public int getIPCPort() {
-    return this.port;
-  }
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   *
-   * @return enum -- {Stand_Alone, Ratis, Grpc, Chained}
-   */
-  @Override
-  public HddsProtos.ReplicationType getServerType() {
-    return HddsProtos.ReplicationType.STAND_ALONE;
-  }
-
-  @Override
-  public void start() throws IOException {
-    server.start();
-  }
-
-  @Override
-  public void stop() {
-    server.shutdown();
-  }
-
-  @Override
-  public void submitRequest(ContainerCommandRequestProto request,
-      HddsProtos.PipelineID pipelineID) throws IOException {
-    ContainerProtos.ContainerCommandResponseProto response =
-        storageContainer.dispatch(request);
-    if (response.getResult() != ContainerProtos.Result.SUCCESS) {
-      throw new StorageContainerException(response.getMessage(),
-          response.getResult());
-    }
-  }
-
-  @Override
-  public List<PipelineReport> getPipelineReport() {
-    return Collections.singletonList(
-            PipelineReport.newBuilder()
-                    .setPipelineID(PipelineID.valueOf(id).getProtobuf())
-                    .build());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
deleted file mode 100644
index 8c3fa5c..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReport;
-
-import java.io.IOException;
-import java.util.List;
-
-/** A server endpoint that acts as the communication layer for Ozone
- * containers. */
-public interface XceiverServerSpi {
-  /** Starts the server. */
-  void start() throws IOException;
-
-  /** Stops a running server. */
-  void stop();
-
-  /** Get server IPC port. */
-  int getIPCPort();
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   * @return enum -- {Stand_Alone, Ratis, Chained}
-   */
-  HddsProtos.ReplicationType getServerType();
-
-  /**
-   * submits a containerRequest to be performed by the replication pipeline.
-   * @param request ContainerCommandRequest
-   */
-  void submitRequest(ContainerCommandRequestProto request,
-      HddsProtos.PipelineID pipelineID)
-      throws IOException;
-
-  /**
-   * Get pipeline report for the XceiverServer instance.
-   * @return list of report for each pipeline.
-   */
-  List<PipelineReport> getPipelineReport();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
deleted file mode 100644
index 59c96f1..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-/**
- * This package contains classes for the server of the storage container
- * protocol.
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
deleted file mode 100644
index b6aed60..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-
-/**
- * This class is for maintaining Container State Machine statistics.
- */
[email protected]
-@Metrics(about="Container State Machine Metrics", context="dfs")
-public class CSMMetrics {
-  public static final String SOURCE_NAME =
-      CSMMetrics.class.getSimpleName();
-
-  // ratis op metrics metrics
-  private @Metric MutableCounterLong numWriteStateMachineOps;
-  private @Metric MutableCounterLong numReadStateMachineOps;
-  private @Metric MutableCounterLong numApplyTransactionOps;
-
-  // Failure Metrics
-  private @Metric MutableCounterLong numWriteStateMachineFails;
-  private @Metric MutableCounterLong numReadStateMachineFails;
-  private @Metric MutableCounterLong numApplyTransactionFails;
-
-  public CSMMetrics() {
-  }
-
-  public static CSMMetrics create() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register(SOURCE_NAME,
-        "Container State Machine",
-        new CSMMetrics());
-  }
-
-  public void incNumWriteStateMachineOps() {
-    numWriteStateMachineOps.incr();
-  }
-
-  public void incNumReadStateMachineOps() {
-    numReadStateMachineOps.incr();
-  }
-
-  public void incNumApplyTransactionsOps() {
-    numApplyTransactionOps.incr();
-  }
-
-  public void incNumWriteStateMachineFails() {
-    numWriteStateMachineFails.incr();
-  }
-
-  public void incNumReadStateMachineFails() {
-    numReadStateMachineFails.incr();
-  }
-
-  public void incNumApplyTransactionsFails() {
-    numApplyTransactionFails.incr();
-  }
-
-  @VisibleForTesting
-  public long getNumWriteStateMachineOps() {
-    return numWriteStateMachineOps.value();
-  }
-
-  @VisibleForTesting
-  public long getNumReadStateMachineOps() {
-    return numReadStateMachineOps.value();
-  }
-
-  @VisibleForTesting
-  public long getNumApplyTransactionsOps() {
-    return numApplyTransactionOps.value();
-  }
-
-  @VisibleForTesting
-  public long getNumWriteStateMachineFails() {
-    return numWriteStateMachineFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumReadStateMachineFails() {
-    return numReadStateMachineFails.value();
-  }
-
-  @VisibleForTesting
-  public long getNumApplyTransactionsFails() {
-    return numApplyTransactionFails.value();
-  }
-
-  public void unRegister() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    ms.unregisterSource(SOURCE_NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
deleted file mode 100644
index a7bef86..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ /dev/null
@@ -1,656 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.shaded.com.google.protobuf
-    .InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .WriteChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ReadChunkResponseProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.statemachine.StateMachineStorage;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.stream.Collectors;
-
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
- *
- * The stateMachine is responsible for handling different types of container
- * requests. The container requests can be divided into readonly and write
- * requests.
- *
- * Read only requests are classified in
- * {@link org.apache.hadoop.hdds.HddsUtils#isReadOnly}
- * and these readonly requests are replied from the {@link #query(Message)}.
- *
- * The write requests can be divided into requests with user data
- * (WriteChunkRequest) and other request without user data.
- *
- * Inorder to optimize the write throughput, the writeChunk request is
- * processed in 2 phases. The 2 phases are divided in
- * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
- * data is written directly into the state machine via
- * {@link #writeStateMachineData} and in the second phase the
- * transaction is committed via {@link #applyTransaction(TransactionContext)}
- *
- * For the requests with no stateMachine data, the transaction is directly
- * committed through
- * {@link #applyTransaction(TransactionContext)}
- *
- * There are 2 ordering operation which are enforced right now in the code,
- * 1) Write chunk operation are executed after the create container operation,
- * the write chunk operation will fail otherwise as the container still hasn't
- * been created. Hence the create container operation has been split in the
- * {@link #startTransaction(RaftClientRequest)}, this will help in 
synchronizing
- * the calls in {@link #writeStateMachineData}
- *
- * 2) Write chunk commit operation is executed after write chunk state machine
- * operation. This will ensure that commit operation is sync'd with the state
- * machine operation.
- *
- * Synchronization between {@link #writeStateMachineData} and
- * {@link #applyTransaction} need to be enforced in the StateMachine
- * implementation. For example, synchronization between writeChunk and
- * createContainer in {@link ContainerStateMachine}.
- *
- * PutBlock is synchronized with WriteChunk operations, PutBlock for a block is
- * executed only after all the WriteChunk preceding the PutBlock have finished.
- *
- * CloseContainer is synchronized with WriteChunk and PutBlock operations,
- * CloseContainer for a container is processed after all the preceding write
- * operations for the container have finished.
- * */
-public class ContainerStateMachine extends BaseStateMachine {
-  static final Logger LOG = LoggerFactory.getLogger(
-      ContainerStateMachine.class);
-  private final SimpleStateMachineStorage storage
-      = new SimpleStateMachineStorage();
-  private final ContainerDispatcher dispatcher;
-  private ThreadPoolExecutor chunkExecutor;
-  private final XceiverServerRatis ratisServer;
-  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-      writeChunkFutureMap;
-  private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
-  /**
-   * CSM metrics.
-   */
-  private final CSMMetrics metrics;
-
-  public ContainerStateMachine(ContainerDispatcher dispatcher,
-      ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) {
-    this.dispatcher = dispatcher;
-    this.chunkExecutor = chunkExecutor;
-    this.ratisServer = ratisServer;
-    this.writeChunkFutureMap = new ConcurrentHashMap<>();
-    this.stateMachineMap = new ConcurrentHashMap<>();
-    metrics = CSMMetrics.create();
-  }
-
-  @Override
-  public StateMachineStorage getStateMachineStorage() {
-    return storage;
-  }
-
-  public CSMMetrics getMetrics() {
-    return metrics;
-  }
-
-  @Override
-  public void initialize(
-      RaftServer server, RaftGroupId id, RaftStorage raftStorage)
-      throws IOException {
-    super.initialize(server, id, raftStorage);
-    storage.init(raftStorage);
-    //  TODO handle snapshots
-
-    // TODO: Add a flag that tells you that initialize has been called.
-    // Check with Ratis if this feature is done in Ratis.
-  }
-
-  @Override
-  public TransactionContext startTransaction(RaftClientRequest request)
-      throws IOException {
-    final ContainerCommandRequestProto proto =
-        getRequestProto(request.getMessage().getContent());
-
-    final SMLogEntryProto log;
-    if (proto.getCmdType() == Type.WriteChunk) {
-      final WriteChunkRequestProto write = proto.getWriteChunk();
-      // create the state machine data proto
-      final WriteChunkRequestProto dataWriteChunkProto =
-          WriteChunkRequestProto
-              .newBuilder(write)
-              .setStage(Stage.WRITE_DATA)
-              .build();
-      ContainerCommandRequestProto dataContainerCommandProto =
-          ContainerCommandRequestProto
-              .newBuilder(proto)
-              .setWriteChunk(dataWriteChunkProto)
-              .build();
-
-      // create the log entry proto
-      final WriteChunkRequestProto commitWriteChunkProto =
-          WriteChunkRequestProto.newBuilder()
-              .setBlockID(write.getBlockID())
-              .setChunkData(write.getChunkData())
-              // skipping the data field as it is
-              // already set in statemachine data proto
-              .setStage(Stage.COMMIT_DATA)
-              .build();
-      ContainerCommandRequestProto commitContainerCommandProto =
-          ContainerCommandRequestProto
-              .newBuilder(proto)
-              .setWriteChunk(commitWriteChunkProto)
-              .build();
-
-      log = SMLogEntryProto.newBuilder()
-          .setData(commitContainerCommandProto.toByteString())
-          .setStateMachineData(dataContainerCommandProto.toByteString())
-          .build();
-    } else if (proto.getCmdType() == Type.CreateContainer) {
-      log = SMLogEntryProto.newBuilder()
-          .setData(request.getMessage().getContent())
-          .setStateMachineData(request.getMessage().getContent())
-          .build();
-    } else {
-      log = SMLogEntryProto.newBuilder()
-          .setData(request.getMessage().getContent())
-          .build();
-    }
-    return new TransactionContextImpl(this, request, log);
-  }
-
-  private ContainerCommandRequestProto getRequestProto(ByteString request)
-      throws InvalidProtocolBufferException {
-    return ContainerCommandRequestProto.parseFrom(request);
-  }
-
-  private ContainerCommandResponseProto dispatchCommand(
-      ContainerCommandRequestProto requestProto) {
-    LOG.trace("dispatch {}", requestProto);
-    ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
-    LOG.trace("response {}", response);
-    return response;
-  }
-
-  private Message runCommand(ContainerCommandRequestProto requestProto) {
-    return dispatchCommand(requestProto)::toByteString;
-  }
-
-  /*
-   * writeStateMachineData calls are not synchronized with each other
-   * and also with applyTransaction.
-   */
-  @Override
-  public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) 
{
-    try {
-      metrics.incNumWriteStateMachineOps();
-      final ContainerCommandRequestProto requestProto =
-          getRequestProto(entry.getSmLogEntry().getStateMachineData());
-      Type cmdType = requestProto.getCmdType();
-      long containerId = requestProto.getContainerID();
-      stateMachineMap
-          .computeIfAbsent(containerId, k -> new StateMachineHelper());
-      CompletableFuture<Message> stateMachineFuture =
-          stateMachineMap.get(containerId)
-              .handleStateMachineData(requestProto, entry.getIndex());
-      if (stateMachineFuture == null) {
-        throw new IllegalStateException(
-            "Cmd Type:" + cmdType + " should not have state machine data");
-      }
-      return stateMachineFuture;
-    } catch (IOException e) {
-      metrics.incNumWriteStateMachineFails();
-      return completeExceptionally(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Message> query(Message request) {
-    try {
-      metrics.incNumReadStateMachineOps();
-      final ContainerCommandRequestProto requestProto =
-          getRequestProto(request.getContent());
-      return CompletableFuture.completedFuture(runCommand(requestProto));
-    } catch (IOException e) {
-      metrics.incNumReadStateMachineFails();
-      return completeExceptionally(e);
-    }
-  }
-
-  private LogEntryProto readStateMachineData(LogEntryProto entry,
-      ContainerCommandRequestProto requestProto) {
-    WriteChunkRequestProto writeChunkRequestProto =
-        requestProto.getWriteChunk();
-    // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
-    // written through writeStateMachineData.
-    Preconditions.checkArgument(writeChunkRequestProto.getStage()
-        == Stage.COMMIT_DATA);
-
-    // prepare the chunk to be read
-    ReadChunkRequestProto.Builder readChunkRequestProto =
-        ReadChunkRequestProto.newBuilder()
-            .setBlockID(writeChunkRequestProto.getBlockID())
-            .setChunkData(writeChunkRequestProto.getChunkData());
-    ContainerCommandRequestProto dataContainerCommandProto =
-        ContainerCommandRequestProto.newBuilder(requestProto)
-            .setCmdType(Type.ReadChunk)
-            .setReadChunk(readChunkRequestProto)
-            .build();
-
-    // read the chunk
-    ContainerCommandResponseProto response =
-        dispatchCommand(dataContainerCommandProto);
-    ReadChunkResponseProto responseProto = response.getReadChunk();
-
-    // assert that the response has data in it.
-    Preconditions.checkNotNull(responseProto.getData());
-
-    // reconstruct the write chunk request
-    final WriteChunkRequestProto.Builder dataWriteChunkProto =
-        WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
-            // adding the state machine data
-            .setData(responseProto.getData())
-            .setStage(Stage.WRITE_DATA);
-
-    ContainerCommandRequestProto.Builder newStateMachineProto =
-        ContainerCommandRequestProto.newBuilder(requestProto)
-            .setWriteChunk(dataWriteChunkProto);
-
-    return recreateLogEntryProto(entry,
-        newStateMachineProto.build().toByteString());
-  }
-
-  private LogEntryProto recreateLogEntryProto(LogEntryProto entry,
-      ByteString stateMachineData) {
-    // recreate the log entry
-    final SMLogEntryProto log =
-        SMLogEntryProto.newBuilder(entry.getSmLogEntry())
-            .setStateMachineData(stateMachineData)
-            .build();
-    return LogEntryProto.newBuilder(entry).setSmLogEntry(log).build();
-  }
-
-  /**
-   * Returns the combined future of all the writeChunks till the given log
-   * index. The Raft log worker will wait for the stateMachineData to complete
-   * flush as well.
-   *
-   * @param index log index till which the stateMachine data needs to be 
flushed
-   * @return Combined future of all writeChunks till the log index given.
-   */
-  @Override
-  public CompletableFuture<Void> flushStateMachineData(long index) {
-    List<CompletableFuture<Message>> futureList =
-        writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= 
index)
-            .map(x -> x.getValue()).collect(Collectors.toList());
-    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
-        futureList.toArray(new CompletableFuture[futureList.size()]));
-    return combinedFuture;
-  }
-  /*
-   * This api is used by the leader while appending logs to the follower
-   * This allows the leader to read the state machine data from the
-   * state machine implementation in case cached state machine data has been
-   * evicted.
-   */
-  @Override
-  public CompletableFuture<LogEntryProto> readStateMachineData(
-      LogEntryProto entry) {
-    SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
-    if (!smLogEntryProto.getStateMachineData().isEmpty()) {
-      return CompletableFuture.completedFuture(entry);
-    }
-
-    try {
-      final ContainerCommandRequestProto requestProto =
-          getRequestProto(entry.getSmLogEntry().getData());
-      // readStateMachineData should only be called for "write" to Ratis.
-      Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
-
-      if (requestProto.getCmdType() == Type.WriteChunk) {
-        return CompletableFuture.supplyAsync(() ->
-                readStateMachineData(entry, requestProto),
-            chunkExecutor);
-      } else if (requestProto.getCmdType() == Type.CreateContainer) {
-        LogEntryProto log =
-            recreateLogEntryProto(entry, requestProto.toByteString());
-        return CompletableFuture.completedFuture(log);
-      } else {
-        throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
-            + " cannot have state machine data");
-      }
-    } catch (Exception e) {
-      LOG.error("unable to read stateMachineData:" + e);
-      return completeExceptionally(e);
-    }
-  }
-
-  /*
-   * ApplyTransaction calls in Ratis are sequential.
-   */
-  @Override
-  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    try {
-      metrics.incNumApplyTransactionsOps();
-      ContainerCommandRequestProto requestProto =
-          getRequestProto(trx.getSMLogEntry().getData());
-      Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
-      stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
-          k -> new StateMachineHelper());
-      long index =
-          trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
-      return stateMachineMap.get(requestProto.getContainerID())
-          .executeContainerCommand(requestProto, index);
-    } catch (IOException e) {
-      metrics.incNumApplyTransactionsFails();
-      return completeExceptionally(e);
-    }
-  }
-
-  private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
-    final CompletableFuture<T> future = new CompletableFuture<>();
-    future.completeExceptionally(e);
-    return future;
-  }
-
-  @Override
-  public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
-    ratisServer.handleNodeSlowness(group, roleInfoProto);
-  }
-
-  @Override
-  public void notifyExtendedNoLeader(RaftGroup group,
-      RoleInfoProto roleInfoProto) {
-    ratisServer.handleNoLeader(group, roleInfoProto);
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  /**
-   * Class to manage the future tasks for writeChunks.
-   */
-  static class CommitChunkFutureMap {
-    private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-        block2ChunkMap = new ConcurrentHashMap<>();
-
-    synchronized int removeAndGetSize(long index) {
-      block2ChunkMap.remove(index);
-      return block2ChunkMap.size();
-    }
-
-    synchronized CompletableFuture<Message> add(long index,
-        CompletableFuture<Message> future) {
-      return block2ChunkMap.put(index, future);
-    }
-
-    synchronized List<CompletableFuture<Message>> getAll() {
-      return new ArrayList<>(block2ChunkMap.values());
-    }
-  }
-
-  /**
-   * This class maintains maps and provide utilities to enforce synchronization
-   * among createContainer, writeChunk, putBlock and closeContainer.
-   */
-  private class StateMachineHelper {
-
-    private CompletableFuture<Message> createContainerFuture;
-
-    // Map for maintaining all writeChunk futures mapped to blockId
-    private final ConcurrentHashMap<Long, CommitChunkFutureMap>
-        block2ChunkMap;
-
-    // Map for putBlock futures
-    private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-        blockCommitMap;
-
-    StateMachineHelper() {
-      createContainerFuture = null;
-      block2ChunkMap = new ConcurrentHashMap<>();
-      blockCommitMap = new ConcurrentHashMap<>();
-    }
-
-    // The following section handles writeStateMachineData transactions
-    // on a container
-
-    // enqueue the create container future during writeStateMachineData
-    // so that the write stateMachine data phase of writeChunk wait on
-    // create container to finish.
-    private CompletableFuture<Message> handleCreateContainer() {
-      createContainerFuture = new CompletableFuture<>();
-      return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
-    }
-
-    // This synchronizes on create container to finish
-    private CompletableFuture<Message> handleWriteChunk(
-        ContainerCommandRequestProto requestProto, long entryIndex) {
-      CompletableFuture<Message> containerOpFuture;
-
-      if (createContainerFuture != null) {
-        containerOpFuture = createContainerFuture
-            .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
-      } else {
-        containerOpFuture = CompletableFuture
-            .supplyAsync(() -> runCommand(requestProto), chunkExecutor);
-      }
-      writeChunkFutureMap.put(entryIndex, containerOpFuture);
-      return containerOpFuture;
-    }
-
-    CompletableFuture<Message> handleStateMachineData(
-        final ContainerCommandRequestProto requestProto, long index) {
-      Type cmdType = requestProto.getCmdType();
-      if (cmdType == Type.CreateContainer) {
-        return handleCreateContainer();
-      } else if (cmdType == Type.WriteChunk) {
-        return handleWriteChunk(requestProto, index);
-      } else {
-        return null;
-      }
-    }
-
-    // The following section handles applyTransaction transactions
-    // on a container
-
-    private CompletableFuture<Message> handlePutBlock(
-        ContainerCommandRequestProto requestProto) {
-      List<CompletableFuture<Message>> futureList = new ArrayList<>();
-      long localId =
-          requestProto.getPutBlock().getBlockData().getBlockID().getLocalID();
-      // Need not wait for create container future here as it has already
-      // finished.
-      if (block2ChunkMap.get(localId) != null) {
-        futureList.addAll(block2ChunkMap.get(localId).getAll());
-      }
-      CompletableFuture<Message> effectiveFuture =
-          runCommandAfterFutures(futureList, requestProto);
-
-      CompletableFuture<Message> putBlockFuture =
-          effectiveFuture.thenApply(message -> {
-            blockCommitMap.remove(localId);
-            return message;
-          });
-      blockCommitMap.put(localId, putBlockFuture);
-      return putBlockFuture;
-    }
-
-    // Close Container should be executed only if all pending WriteType
-    // container cmds get executed. Transactions which can return a future
-    // are WriteChunk and PutBlock.
-    private CompletableFuture<Message> handleCloseContainer(
-        ContainerCommandRequestProto requestProto) {
-      List<CompletableFuture<Message>> futureList = new ArrayList<>();
-
-      // No need to wait for create container future here as it should have
-      // already finished.
-      block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
-      futureList.addAll(blockCommitMap.values());
-
-      // There are pending write Chunk/PutBlock type requests
-      // Queue this closeContainer request behind all these requests
-      CompletableFuture<Message> closeContainerFuture =
-          runCommandAfterFutures(futureList, requestProto);
-
-      return closeContainerFuture.thenApply(message -> {
-        stateMachineMap.remove(requestProto.getContainerID());
-        return message;
-      });
-    }
-
-    private CompletableFuture<Message> handleChunkCommit(
-        ContainerCommandRequestProto requestProto, long index) {
-      WriteChunkRequestProto write = requestProto.getWriteChunk();
-      // the data field has already been removed in start Transaction
-      Preconditions.checkArgument(!write.hasData());
-      CompletableFuture<Message> stateMachineFuture =
-          writeChunkFutureMap.remove(index);
-      CompletableFuture<Message> commitChunkFuture = stateMachineFuture
-          .thenComposeAsync(v -> CompletableFuture
-              .completedFuture(runCommand(requestProto)));
-
-      long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
-      // Put the applyTransaction Future again to the Map.
-      // closeContainer should synchronize with this.
-      block2ChunkMap
-          .computeIfAbsent(localId, id -> new CommitChunkFutureMap())
-          .add(index, commitChunkFuture);
-      return commitChunkFuture.thenApply(message -> {
-        block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
-            -> chunks.removeAndGetSize(index) == 0? null: chunks);
-        return message;
-      });
-    }
-
-    private CompletableFuture<Message> runCommandAfterFutures(
-        List<CompletableFuture<Message>> futureList,
-        ContainerCommandRequestProto requestProto) {
-      CompletableFuture<Message> effectiveFuture;
-      if (futureList.isEmpty()) {
-        effectiveFuture = CompletableFuture
-            .supplyAsync(() -> runCommand(requestProto));
-
-      } else {
-        CompletableFuture<Void> allFuture = CompletableFuture.allOf(
-            futureList.toArray(new CompletableFuture[futureList.size()]));
-        effectiveFuture = allFuture
-            .thenApplyAsync(v -> runCommand(requestProto));
-      }
-      return effectiveFuture;
-    }
-
-    CompletableFuture<Message> handleCreateContainer(
-        ContainerCommandRequestProto requestProto) {
-      CompletableFuture<Message> future =
-          CompletableFuture.completedFuture(runCommand(requestProto));
-      future.thenAccept(m -> {
-        createContainerFuture.complete(m);
-        createContainerFuture = null;
-      });
-      return future;
-    }
-
-    CompletableFuture<Message> handleOtherCommands(
-        ContainerCommandRequestProto requestProto) {
-      return CompletableFuture.completedFuture(runCommand(requestProto));
-    }
-
-    CompletableFuture<Message> executeContainerCommand(
-        ContainerCommandRequestProto requestProto, long index) {
-      Type cmdType = requestProto.getCmdType();
-      switch (cmdType) {
-      case WriteChunk:
-        return handleChunkCommit(requestProto, index);
-      case CloseContainer:
-        return handleCloseContainer(requestProto);
-      case PutBlock:
-        return handlePutBlock(requestProto);
-      case CreateContainer:
-        return handleCreateContainer(requestProto);
-      default:
-        return handleOtherCommands(requestProto);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() {
-    return stateMachineMap;
-  }
-
-  @VisibleForTesting
-  public CompletableFuture<Message> getCreateContainerFuture(long containerId) 
{
-    StateMachineHelper helper = stateMachineMap.get(containerId);
-    return helper == null ? null : helper.createContainerFuture;
-  }
-
-  @VisibleForTesting
-  public List<CompletableFuture<Message>> getCommitChunkFutureMap(
-      long containerId) {
-    StateMachineHelper helper = stateMachineMap.get(containerId);
-    if (helper != null) {
-      List<CompletableFuture<Message>> futureList = new ArrayList<>();
-      stateMachineMap.get(containerId).block2ChunkMap.values()
-          .forEach(b -> futureList.addAll(b.getAll()));
-      return futureList;
-    }
-    return null;
-  }
-
-  @VisibleForTesting
-  public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() {
-    return writeChunkFutureMap.values();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
deleted file mode 100644
index c2ef504..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.transport.server
-    .XceiverServerSpi;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClientConfigKeys;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.netty.NettyConfigKeys;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.NotLeaderException;
-import org.apache.ratis.protocol.StateMachineException;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.shaded.proto.RaftProtos;
-import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
-import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Creates a ratis server endpoint that acts as the communication layer for
- * Ozone containers.
- */
-public final class XceiverServerRatis implements XceiverServerSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
-  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
-  private static long nextCallId() {
-    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
-  }
-
-  private final int port;
-  private final RaftServer server;
-  private ThreadPoolExecutor chunkExecutor;
-  private ClientId clientId = ClientId.randomId();
-  private final StateContext context;
-  private final ReplicationLevel replicationLevel;
-  private long nodeFailureTimeoutMs;
-
-  private XceiverServerRatis(DatanodeDetails dd, int port,
-      ContainerDispatcher dispatcher, Configuration conf, StateContext context)
-      throws IOException {
-    Objects.requireNonNull(dd, "id == null");
-    this.port = port;
-    RaftProperties serverProperties = newRaftProperties(conf);
-    final int numWriteChunkThreads = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
-    chunkExecutor =
-        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
-            100, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(1024),
-            new ThreadPoolExecutor.CallerRunsPolicy());
-    this.context = context;
-    this.replicationLevel =
-        conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
-            OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
-    ContainerStateMachine stateMachine =
-        new ContainerStateMachine(dispatcher, chunkExecutor, this);
-    this.server = RaftServer.newBuilder()
-        .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setProperties(serverProperties)
-        .setStateMachine(stateMachine)
-        .build();
-  }
-
-
-  private RaftProperties newRaftProperties(Configuration conf) {
-    final RaftProperties properties = new RaftProperties();
-
-    // Set rpc type
-    final String rpcType = conf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
-    RaftConfigKeys.Rpc.setType(properties, rpc);
-
-    // set raft segment size
-    final int raftSegmentSize = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
-    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
-        SizeInBytes.valueOf(raftSegmentSize));
-
-    // set raft segment pre-allocated size
-    final int raftSegmentPreallocatedSize = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
-    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-
-    // Set max write buffer size, which is the scm chunk size
-    final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
-    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
-        SizeInBytes.valueOf(maxChunkSize));
-
-    // Set the client requestTimeout
-    TimeUnit timeUnit =
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
-            .getUnit();
-    long duration = conf.getTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
-            .getDuration(), timeUnit);
-    final TimeDuration clientRequestTimeout =
-        TimeDuration.valueOf(duration, timeUnit);
-    RaftClientConfigKeys.Rpc
-        .setRequestTimeout(properties, clientRequestTimeout);
-
-    // Set the server Request timeout
-    timeUnit = 
OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
-        .getUnit();
-    duration = conf.getTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY,
-        OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
-            .getDuration(), timeUnit);
-    final TimeDuration serverRequestTimeout =
-        TimeDuration.valueOf(duration, timeUnit);
-    RaftServerConfigKeys.Rpc
-        .setRequestTimeout(properties, serverRequestTimeout);
-
-    // set timeout for a retry cache entry
-    timeUnit =
-        OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT
-            .getUnit();
-    duration = conf.getTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY,
-        OzoneConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_DEFAULT
-            .getDuration(), timeUnit);
-    final TimeDuration retryCacheTimeout =
-        TimeDuration.valueOf(duration, timeUnit);
-    RaftServerConfigKeys.RetryCache
-        .setExpiryTime(properties, retryCacheTimeout);
-
-    // Set the ratis leader election timeout
-    TimeUnit leaderElectionMinTimeoutUnit =
-        OzoneConfigKeys.
-            DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
-            .getUnit();
-    duration = conf.getTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
-        OzoneConfigKeys.
-            DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
-            .getDuration(), leaderElectionMinTimeoutUnit);
-    final TimeDuration leaderElectionMinTimeout =
-        TimeDuration.valueOf(duration, leaderElectionMinTimeoutUnit);
-    RaftServerConfigKeys.Rpc
-        .setTimeoutMin(properties, leaderElectionMinTimeout);
-    long leaderElectionMaxTimeout =
-        leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
-    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
-        TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
-    // Enable batch append on raft server
-    RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
-
-    // Set the maximum cache segments
-    RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
-
-    // set the node failure timeout
-    timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
-        .getUnit();
-    duration = conf.getTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
-        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
-            .getDuration(), timeUnit);
-    final TimeDuration nodeFailureTimeout =
-        TimeDuration.valueOf(duration, timeUnit);
-    RaftServerConfigKeys.setLeaderElectionTimeout(properties,
-        nodeFailureTimeout);
-    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
-        nodeFailureTimeout);
-    nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
-
-    // Set the ratis storage directory
-    String storageDir = HddsServerUtil.getOzoneDatanodeRatisDirectory(conf);
-    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
-
-    // For grpc set the maximum message size
-    GrpcConfigKeys.setMessageSizeMax(properties,
-        SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize));
-
-    // Set the ratis port number
-    if (rpc == SupportedRpcType.GRPC) {
-      GrpcConfigKeys.Server.setPort(properties, port);
-    } else if (rpc == SupportedRpcType.NETTY) {
-      NettyConfigKeys.Server.setPort(properties, port);
-    }
-    return properties;
-  }
-
-  public static XceiverServerRatis newXceiverServerRatis(
-      DatanodeDetails datanodeDetails, Configuration ozoneConf,
-      ContainerDispatcher dispatcher, StateContext context) throws IOException 
{
-    int localPort = ozoneConf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
-
-    // Get an available port on current node and
-    // use that as the container port
-    if (ozoneConf.getBoolean(OzoneConfigKeys
-            .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        localPort = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", localPort);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", localPort, e);
-      }
-    }
-    datanodeDetails.setPort(
-        DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
-    return new XceiverServerRatis(datanodeDetails, localPort,
-        dispatcher, ozoneConf, context);
-  }
-
-  @Override
-  public void start() throws IOException {
-    LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
-        server.getId(), getIPCPort());
-    chunkExecutor.prestartAllCoreThreads();
-    server.start();
-  }
-
-  @Override
-  public void stop() {
-    try {
-      chunkExecutor.shutdown();
-      server.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public int getIPCPort() {
-    return port;
-  }
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   *
-   * @return enum -- {Stand_Alone, Ratis, Chained}
-   */
-  @Override
-  public HddsProtos.ReplicationType getServerType() {
-    return HddsProtos.ReplicationType.RATIS;
-  }
-
-  @VisibleForTesting
-  public RaftServer getServer() {
-    return server;
-  }
-
-  private void processReply(RaftClientReply reply) throws IOException {
-    // NotLeader exception is thrown only when the raft server to which the
-    // request is submitted is not the leader. The request will be rejected
-    // and will eventually be executed once the request comes via the leader
-    // node.
-    NotLeaderException notLeaderException = reply.getNotLeaderException();
-    if (notLeaderException != null) {
-      throw notLeaderException;
-    }
-    StateMachineException stateMachineException =
-        reply.getStateMachineException();
-    if (stateMachineException != null) {
-      throw stateMachineException;
-    }
-  }
-
-  @Override
-  public void submitRequest(ContainerCommandRequestProto request,
-      HddsProtos.PipelineID pipelineID) throws IOException {
-    RaftClientReply reply;
-    RaftClientRequest raftClientRequest =
-        createRaftClientRequest(request, pipelineID,
-            RaftClientRequest.writeRequestType(replicationLevel));
-    try {
-      reply = server.submitClientRequestAsync(raftClientRequest).get();
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-    processReply(reply);
-  }
-
-  private RaftClientRequest createRaftClientRequest(
-      ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID,
-      RaftClientRequest.Type type) {
-    return new RaftClientRequest(clientId, server.getId(),
-        PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
-        nextCallId(), 0, Message.valueOf(request.toByteString()), type);
-  }
-
-  private void handlePipelineFailure(RaftGroupId groupId,
-      RoleInfoProto roleInfoProto) {
-    String msg;
-    UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
-    RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
-    switch (roleInfoProto.getRole()) {
-    case CANDIDATE:
-      msg = datanode + " is in candidate state for " +
-          roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
-      break;
-    case LEADER:
-      StringBuilder sb = new StringBuilder();
-      sb.append(datanode).append(" has not seen follower/s");
-      for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
-          .getFollowerInfoList()) {
-        if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
-          sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
-              .append(" for ").append(follower.getLastRpcElapsedTimeMs())
-              .append("ms");
-        }
-      }
-      msg = sb.toString();
-      break;
-    default:
-      LOG.error("unknown state:" + roleInfoProto.getRole());
-      throw new IllegalStateException("node" + id + " is in illegal role "
-          + roleInfoProto.getRole());
-    }
-
-    PipelineID pipelineID = PipelineID.valueOf(groupId);
-    ClosePipelineInfo.Builder closePipelineInfo =
-        ClosePipelineInfo.newBuilder()
-            .setPipelineID(pipelineID.getProtobuf())
-            .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
-            .setDetailedReason(msg);
-
-    PipelineAction action = PipelineAction.newBuilder()
-        .setClosePipeline(closePipelineInfo)
-        .setAction(PipelineAction.Action.CLOSE)
-        .build();
-    context.addPipelineActionIfAbsent(action);
-    LOG.debug(
-        "pipeline Action " + action.getAction() + "  on pipeline " + pipelineID
-            + ".Reason : " + action.getClosePipeline().getDetailedReason());
-  }
-
-  @Override
-  public List<PipelineReport> getPipelineReport() {
-    try {
-      Iterable<RaftGroupId> gids = server.getGroupIds();
-      List<PipelineReport> reports = new ArrayList<>();
-      for (RaftGroupId groupId : gids) {
-        reports.add(PipelineReport.newBuilder()
-                .setPipelineID(PipelineID.valueOf(groupId).getProtobuf())
-                .build());
-      }
-      return reports;
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
-    handlePipelineFailure(group.getGroupId(), roleInfoProto);
-  }
-
-  void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
-    handlePipelineFailure(group.getGroupId(), roleInfoProto);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
deleted file mode 100644
index 8debfe0..0000000
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-
-/**
- * This package contains classes for the server implementation
- * using Apache Ratis
- */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to