http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
deleted file mode 100644
index 994b245..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import 
org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-import static org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReportState.states
-    .noContainerReports;
-
-/**
- * Current Context of State Machine.
- */
-public class StateContext {
-  static final Logger LOG =
-      LoggerFactory.getLogger(StateContext.class);
-  private final Queue<SCMCommand> commandQueue;
-  private final Lock lock;
-  private final DatanodeStateMachine parent;
-  private final AtomicLong stateExecutionCount;
-  private final Configuration conf;
-  private DatanodeStateMachine.DatanodeStates state;
-  private SCMNodeReport nrState;
-  private ReportState  reportState;
-  private static final ReportState DEFAULT_REPORT_STATE =
-      
ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
-
-  /**
-   * Constructs a StateContext.
-   *
-   * @param conf   - Configration
-   * @param state  - State
-   * @param parent Parent State Machine
-   */
-  public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
-      state, DatanodeStateMachine parent) {
-    this.conf = conf;
-    this.state = state;
-    this.parent = parent;
-    commandQueue = new LinkedList<>();
-    lock = new ReentrantLock();
-    stateExecutionCount = new AtomicLong(0);
-    nrState = SCMNodeReport.getDefaultInstance();
-  }
-
-  /**
-   * Returns the ContainerStateMachine class that holds this state.
-   *
-   * @return ContainerStateMachine.
-   */
-  public DatanodeStateMachine getParent() {
-    return parent;
-  }
-
-  /**
-   * Get the container server port.
-   * @return The container server port if available, return -1 if otherwise
-   */
-  public int getContainerPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getContainerServerPort();
-  }
-
-  /**
-   * Gets the Ratis Port.
-   * @return int , return -1 if not valid.
-   */
-  public int getRatisPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
-  }
-
-  /**
-   * Returns true if we are entering a new state.
-   *
-   * @return boolean
-   */
-  boolean isEntering() {
-    return stateExecutionCount.get() == 0;
-  }
-
-  /**
-   * Returns true if we are exiting from the current state.
-   *
-   * @param newState - newState.
-   * @return boolean
-   */
-  boolean isExiting(DatanodeStateMachine.DatanodeStates newState) {
-    boolean isExiting = state != newState && stateExecutionCount.get() > 0;
-    if(isExiting) {
-      stateExecutionCount.set(0);
-    }
-    return isExiting;
-  }
-
-  /**
-   * Returns the current state the machine is in.
-   *
-   * @return state.
-   */
-  public DatanodeStateMachine.DatanodeStates getState() {
-    return state;
-  }
-
-  /**
-   * Sets the current state of the machine.
-   *
-   * @param state state.
-   */
-  public void setState(DatanodeStateMachine.DatanodeStates state) {
-    this.state = state;
-  }
-
-  /**
-   * Returns the node report of the datanode state context.
-   * @return the node report.
-   */
-  public SCMNodeReport getNodeReport() {
-    return nrState;
-  }
-
-  /**
-   * Sets the storage location report of the datanode state context.
-   * @param nrReport - node report
-   */
-  public void setReportState(SCMNodeReport nrReport) {
-    this.nrState = nrReport;
-  }
-
-  /**
-   * Returns the next task to get executed by the datanode state machine.
-   * @return A callable that will be executed by the
-   * {@link DatanodeStateMachine}
-   */
-  @SuppressWarnings("unchecked")
-  public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() {
-    switch (this.state) {
-    case INIT:
-      return new InitDatanodeState(this.conf, parent.getConnectionManager(),
-          this);
-    case RUNNING:
-      return new RunningDatanodeState(this.conf, parent.getConnectionManager(),
-          this);
-    case SHUTDOWN:
-      return null;
-    default:
-      throw new IllegalArgumentException("Not Implemented yet.");
-    }
-  }
-
-  /**
-   * Executes the required state function.
-   *
-   * @param service - Executor Service
-   * @param time    - seconds to wait
-   * @param unit    - Seconds.
-   * @throws InterruptedException
-   * @throws ExecutionException
-   * @throws TimeoutException
-   */
-  public void execute(ExecutorService service, long time, TimeUnit unit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    stateExecutionCount.incrementAndGet();
-    DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask();
-    if (this.isEntering()) {
-      task.onEnter();
-    }
-    task.execute(service);
-    DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
-    if (this.state != newState) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Task {} executed, state transited from {} to {}",
-            task.getClass().getSimpleName(), this.state, newState);
-      }
-      if (isExiting(newState)) {
-        task.onExit();
-      }
-      this.clearReportState();
-      this.setState(newState);
-    }
-  }
-
-  /**
-   * Returns the next command or null if it is empty.
-   *
-   * @return SCMCommand or Null.
-   */
-  public SCMCommand getNextCommand() {
-    lock.lock();
-    try {
-      return commandQueue.poll();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Adds a command to the State Machine queue.
-   *
-   * @param command - SCMCommand.
-   */
-  public void addCommand(SCMCommand command) {
-    lock.lock();
-    try {
-      commandQueue.add(command);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Returns the count of the Execution.
-   * @return long
-   */
-  public long getExecutionCount() {
-    return stateExecutionCount.get();
-  }
-
-
-  /**
-   * Gets the ReportState.
-   * @return ReportState.
-   */
-  public synchronized  ReportState getContainerReportState() {
-    if (reportState == null) {
-      return DEFAULT_REPORT_STATE;
-    }
-    return reportState;
-  }
-
-  /**
-   * Sets the ReportState.
-   * @param rState - ReportState.
-   */
-  public synchronized  void setContainerReportState(ReportState rState) {
-    this.reportState = rState;
-  }
-
-  /**
-   * Clears report state after it has been communicated.
-   */
-  public synchronized void clearReportState() {
-    if(reportState != null) {
-      setContainerReportState(null);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
deleted file mode 100644
index 80844cf..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.background;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import 
org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTaskResult;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
-
-/**
- * A per-datanode container block deleting service takes in charge
- * of deleting staled ozone blocks.
- */
-public class BlockDeletingService extends BackgroundService{
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockDeletingService.class);
-
-  private final ContainerManager containerManager;
-  private final Configuration conf;
-
-  // Throttle number of blocks to delete per task,
-  // set to 1 for testing
-  private final int blockLimitPerTask;
-
-  // Throttle the number of containers to process concurrently at a time,
-  private final int containerLimitPerInterval;
-
-  // Task priority is useful when a to-delete block has weight.
-  private final static int TASK_PRIORITY_DEFAULT = 1;
-  // Core pool size for container tasks
-  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
-
-  public BlockDeletingService(ContainerManager containerManager,
-      long serviceInterval, long serviceTimeout, Configuration conf) {
-    super("BlockDeletingService", serviceInterval,
-        TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE,
-        serviceTimeout);
-    this.containerManager = containerManager;
-    this.conf = conf;
-    this.blockLimitPerTask = conf.getInt(
-        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
-        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
-    this.containerLimitPerInterval = conf.getInt(
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
-  }
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    List<ContainerData> containers = Lists.newArrayList();
-    try {
-      // We at most list a number of containers a time,
-      // in case there are too many containers and start too many workers.
-      // We must ensure there is no empty container in this result.
-      // The chosen result depends on what container deletion policy is
-      // configured.
-      containers = containerManager.chooseContainerForBlockDeletion(
-          containerLimitPerInterval);
-      LOG.info("Plan to choose {} containers for block deletion, "
-          + "actually returns {} valid containers.",
-          containerLimitPerInterval, containers.size());
-
-      for(ContainerData container : containers) {
-        BlockDeletingTask containerTask =
-            new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
-        queue.add(containerTask);
-      }
-    } catch (StorageContainerException e) {
-      LOG.warn("Failed to initiate block deleting tasks, "
-          + "caused by unable to get containers info. "
-          + "Retry in next interval. ", e);
-    } catch (Exception e) {
-      // In case listContainer call throws any uncaught RuntimeException.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Unexpected error occurs during deleting blocks.", e);
-      }
-    }
-    return queue;
-  }
-
-  private static class ContainerBackgroundTaskResult
-      implements BackgroundTaskResult {
-    private List<String> deletedBlockIds;
-
-    ContainerBackgroundTaskResult() {
-      deletedBlockIds = new LinkedList<>();
-    }
-
-    public void addBlockId(String blockId) {
-      deletedBlockIds.add(blockId);
-    }
-
-    public void addAll(List<String> blockIds) {
-      deletedBlockIds.addAll(blockIds);
-    }
-
-    public List<String> getDeletedBlocks() {
-      return deletedBlockIds;
-    }
-
-    @Override
-    public int getSize() {
-      return deletedBlockIds.size();
-    }
-  }
-
-  private class BlockDeletingTask
-      implements BackgroundTask<BackgroundTaskResult> {
-
-    private final int priority;
-    private final ContainerData containerData;
-
-    BlockDeletingTask(ContainerData containerName, int priority) {
-      this.priority = priority;
-      this.containerData = containerName;
-    }
-
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      long startTime = Time.monotonicNow();
-      // Scan container's db and get list of under deletion blocks
-      MetadataStore meta = KeyUtils.getDB(containerData, conf);
-      // # of blocks to delete is throttled
-      KeyPrefixFilter filter = new KeyPrefixFilter(
-          OzoneConsts.DELETING_KEY_PREFIX);
-      List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
-          meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
-      if (toDeleteBlocks.isEmpty()) {
-        LOG.debug("No under deletion block found in container : {}",
-            containerData.getContainerName());
-      }
-
-      List<String> succeedBlocks = new LinkedList<>();
-      LOG.debug("Container : {}, To-Delete blocks : {}",
-          containerData.getContainerName(), toDeleteBlocks.size());
-      File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
-      if (!dataDir.exists() || !dataDir.isDirectory()) {
-        LOG.error("Invalid container data dir {} : "
-            + "not exist or not a directory", dataDir.getAbsolutePath());
-        return crr;
-      }
-
-      toDeleteBlocks.forEach(entry -> {
-        String blockName = DFSUtil.bytes2String(entry.getKey());
-        LOG.debug("Deleting block {}", blockName);
-        try {
-          ContainerProtos.KeyData data =
-              ContainerProtos.KeyData.parseFrom(entry.getValue());
-          for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
-            File chunkFile = dataDir.toPath()
-                .resolve(chunkInfo.getChunkName()).toFile();
-            if (FileUtils.deleteQuietly(chunkFile)) {
-              LOG.debug("block {} chunk {} deleted", blockName,
-                  chunkFile.getAbsolutePath());
-            }
-          }
-          succeedBlocks.add(blockName);
-        } catch (InvalidProtocolBufferException e) {
-          LOG.error("Failed to parse block info for block {}", blockName, e);
-        }
-      });
-
-      // Once files are deleted ... clean up DB
-      BatchOperation batch = new BatchOperation();
-      succeedBlocks.forEach(entry ->
-          batch.delete(DFSUtil.string2Bytes(entry)));
-      meta.writeBatch(batch);
-      // update count of pending deletion blocks in in-memory container status
-      containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),
-          containerData.getContainerName());
-
-      if (!succeedBlocks.isEmpty()) {
-        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
-            containerData.getContainerName(), succeedBlocks.size(),
-            Time.monotonicNow() - startTime);
-      }
-      crr.addAll(succeedBlocks);
-      return crr;
-    }
-
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
deleted file mode 100644
index a9e202e..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.background;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
deleted file mode 100644
index f5fbcd7..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.Type;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Container Report handler.
- */
-public class CloseContainerHandler implements CommandHandler {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerHandler.class);
-  private int invocationCount;
-  private long totalTime;
-
-  /**
-   * Constructs a ContainerReport handler.
-   */
-  public CloseContainerHandler() {
-  }
-
-  /**
-   * Handles a given SCM command.
-   *
-   * @param command           - SCM Command
-   * @param container         - Ozone Container.
-   * @param context           - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Close Container command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    String containerName = "UNKNOWN";
-    try {
-
-      SCMCloseContainerCmdResponseProto
-          closeContainerProto =
-          SCMCloseContainerCmdResponseProto
-              .parseFrom(command.getProtoBufMessage());
-      containerName = closeContainerProto.getContainerName();
-
-      container.getContainerManager().closeContainer(containerName);
-
-    } catch (Exception e) {
-      LOG.error("Can't close container " + containerName, e);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-
-  /**
-   * Returns the command type that this command handler handles.
-   *
-   * @return Type
-   */
-  @Override
-  public Type getCommandType() {
-    return Type.closeContainerCommand;
-  }
-
-  /**
-   * Returns number of times this handler has been invoked.
-   *
-   * @return int
-   */
-  @Override
-  public int getInvocationCount() {
-    return invocationCount;
-  }
-
-  /**
-   * Returns the average time this function takes to run.
-   *
-   * @return long
-   */
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
deleted file mode 100644
index 6670ea9..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import com.google.common.base.Preconditions;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Dispatches command to the correct handler.
- */
-public final class CommandDispatcher {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CommandDispatcher.class);
-  private final StateContext context;
-  private final Map<Type, CommandHandler> handlerMap;
-  private final OzoneContainer container;
-  private final SCMConnectionManager connectionManager;
-
-  /**
-   * Constructs a command Dispatcher.
-   * @param context - Context.
-   */
-  /**
-   * Constructs a command dispatcher.
-   *
-   * @param container - Ozone Container
-   * @param context - Context
-   * @param handlers - Set of handlers.
-   */
-  private CommandDispatcher(OzoneContainer container, SCMConnectionManager
-      connectionManager, StateContext context,
-      CommandHandler... handlers) {
-    Preconditions.checkNotNull(context);
-    Preconditions.checkNotNull(handlers);
-    Preconditions.checkArgument(handlers.length > 0);
-    Preconditions.checkNotNull(container);
-    Preconditions.checkNotNull(connectionManager);
-    this.context = context;
-    this.container = container;
-    this.connectionManager = connectionManager;
-    handlerMap = new HashMap<>();
-    for (CommandHandler h : handlers) {
-      if(handlerMap.containsKey(h.getCommandType())){
-        LOG.error("Duplicate handler for the same command. Exiting. Handle " +
-            "key : { }", h.getCommandType().getDescriptorForType().getName());
-        throw new IllegalArgumentException("Duplicate handler for the same " +
-            "command.");
-      }
-      handlerMap.put(h.getCommandType(), h);
-    }
-  }
-
-  /**
-   * Dispatch the command to the correct handler.
-   *
-   * @param command - SCM Command.
-   */
-  public void handle(SCMCommand command) {
-    Preconditions.checkNotNull(command);
-    CommandHandler handler = handlerMap.get(command.getType());
-    if (handler != null) {
-      handler.handle(command, container, context, connectionManager);
-    } else {
-      LOG.error("Unknown SCM Command queued. There is no handler for this " +
-          "command. Command: {}", command.getType().getDescriptorForType()
-          .getName());
-    }
-  }
-
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Helper class to construct command dispatcher.
-   */
-  public static class Builder {
-    private final List<CommandHandler> handlerList;
-    private OzoneContainer container;
-    private StateContext context;
-    private SCMConnectionManager connectionManager;
-
-    public Builder() {
-      handlerList = new LinkedList<>();
-    }
-
-    /**
-     * Adds a handler.
-     *
-     * @param handler - handler
-     * @return Builder
-     */
-    public Builder addHandler(CommandHandler handler) {
-      Preconditions.checkNotNull(handler);
-      handlerList.add(handler);
-      return this;
-    }
-
-    /**
-     * Add the OzoneContainer.
-     *
-     * @param ozoneContainer - ozone container.
-     * @return Builder
-     */
-    public Builder setContainer(OzoneContainer ozoneContainer) {
-      Preconditions.checkNotNull(ozoneContainer);
-      this.container = ozoneContainer;
-      return this;
-    }
-
-    /**
-     * Set the Connection Manager.
-     *
-     * @param scmConnectionManager
-     * @return this
-     */
-    public Builder setConnectionManager(SCMConnectionManager
-        scmConnectionManager) {
-      Preconditions.checkNotNull(scmConnectionManager);
-      this.connectionManager = scmConnectionManager;
-      return this;
-    }
-
-    /**
-     * Sets the Context.
-     *
-     * @param stateContext - StateContext
-     * @return this
-     */
-    public Builder setContext(StateContext stateContext) {
-      Preconditions.checkNotNull(stateContext);
-      this.context = stateContext;
-      return this;
-    }
-
-    /**
-     * Builds a command Dispatcher.
-     * @return Command Dispatcher.
-     */
-    public CommandDispatcher build() {
-      Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
-          " manager.");
-      Preconditions.checkNotNull(this.container, "Missing container.");
-      Preconditions.checkNotNull(this.context, "Missing context.");
-      Preconditions.checkArgument(this.handlerList.size() > 0);
-      return new CommandDispatcher(this.container, this.connectionManager,
-          this.context, handlerList.toArray(
-              new CommandHandler[handlerList.size()]));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
deleted file mode 100644
index b30d481..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-
-/**
- * Generic interface for handlers.
- */
-public interface CommandHandler {
-
-  /**
-   * Handles a given SCM command.
-   * @param command - SCM Command
-   * @param container - Ozone Container.
-   * @param context - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager);
-
-  /**
-   * Returns the command type that this command handler handles.
-   * @return Type
-   */
-  Type getCommandType();
-
-  /**
-   * Returns number of times this handler has been invoked.
-   * @return int
-   */
-  int getInvocationCount();
-
-  /**
-   * Returns the average time this function takes to run.
-   * @return  long
-   */
-  long getAverageRunTime();
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
deleted file mode 100644
index 57a1516..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Container Report handler.
- */
-public class ContainerReportHandler implements CommandHandler {
-  static final Logger LOG =
-      LoggerFactory.getLogger(ContainerReportHandler.class);
-  private int invocationCount;
-  private long totalTime;
-
-  /**
-   * Constructs a ContainerReport handler.
-   */
-  public ContainerReportHandler() {
-  }
-
-  /**
-   * Handles a given SCM command.
-   *
-   * @param command - SCM Command
-   * @param container - Ozone Container.
-   * @param context - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Container Report.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    try {
-      ContainerReportsRequestProto contianerReport =
-          container.getContainerReport();
-
-      // TODO : We send this report to all SCMs.Check if it is enough only to
-      // send to the leader once we have RAFT enabled SCMs.
-      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
-        endPoint.getEndPoint().sendContainerReport(contianerReport);
-      }
-    } catch (IOException ex) {
-      LOG.error("Unable to process the Container Report command.", ex);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-
-  /**
-   * Returns the command type that this command handler handles.
-   *
-   * @return Type
-   */
-  @Override
-  public Type getCommandType() {
-    return Type.sendContainerReport;
-  }
-
-  /**
-   * Returns number of times this handler has been invoked.
-   *
-   * @return int
-   */
-  @Override
-  public int getInvocationCount() {
-    return invocationCount;
-  }
-
-  /**
-   * Returns the average time this function takes to run.
-   *
-   * @return long
-   */
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
deleted file mode 100644
index 1c859ea..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import 
org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Handle block deletion commands.
- */
-public class DeleteBlocksCommandHandler implements CommandHandler {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
-
-  private ContainerManager containerManager;
-  private Configuration conf;
-  private int invocationCount;
-  private long totalTime;
-
-  public DeleteBlocksCommandHandler(ContainerManager containerManager,
-      Configuration conf) {
-    this.containerManager = containerManager;
-    this.conf = conf;
-  }
-
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    if (command.getType() != Type.deleteBlocksCommand) {
-      LOG.warn("Skipping handling command, expected command "
-              + "type {} but found {}",
-          Type.deleteBlocksCommand, command.getType());
-      return;
-    }
-    LOG.debug("Processing block deletion command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-
-    // move blocks to deleting state.
-    // this is a metadata update, the actual deletion happens in another
-    // recycling thread.
-    DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
-    List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
-
-
-    DeletedContainerBlocksSummary summary =
-        DeletedContainerBlocksSummary.getFrom(containerBlocks);
-    LOG.info("Start to delete container blocks, TXIDs={}, "
-            + "numOfContainers={}, numOfBlocks={}",
-        summary.getTxIDSummary(),
-        summary.getNumOfContainers(),
-        summary.getNumOfBlocks());
-
-    ContainerBlocksDeletionACKProto.Builder resultBuilder =
-        ContainerBlocksDeletionACKProto.newBuilder();
-    containerBlocks.forEach(entry -> {
-      DeleteBlockTransactionResult.Builder txResultBuilder =
-          DeleteBlockTransactionResult.newBuilder();
-      txResultBuilder.setTxID(entry.getTxID());
-      try {
-        deleteContainerBlocks(entry, conf);
-        txResultBuilder.setSuccess(true);
-      } catch (IOException e) {
-        LOG.warn("Failed to delete blocks for container={}, TXID={}",
-            entry.getContainerName(), entry.getTxID(), e);
-        txResultBuilder.setSuccess(false);
-      }
-      resultBuilder.addResults(txResultBuilder.build());
-    });
-    ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
-
-    // Send ACK back to SCM as long as meta updated
-    // TODO Or we should wait until the blocks are actually deleted?
-    if (!containerBlocks.isEmpty()) {
-      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
-        try {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sending following block deletion ACK to SCM");
-            for (DeleteBlockTransactionResult result :
-                blockDeletionACK.getResultsList()) {
-              LOG.debug(result.getTxID() + " : " + result.getSuccess());
-            }
-          }
-          endPoint.getEndPoint()
-              .sendContainerBlocksDeletionACK(blockDeletionACK);
-        } catch (IOException e) {
-          LOG.error("Unable to send block deletion ACK to SCM {}",
-              endPoint.getAddress().toString(), e);
-        }
-      }
-    }
-
-    long endTime = Time.monotonicNow();
-    totalTime += endTime - startTime;
-  }
-
-  /**
-   * Move a bunch of blocks from a container to deleting state.
-   * This is a meta update, the actual deletes happen in async mode.
-   *
-   * @param delTX a block deletion transaction.
-   * @param config configuration.
-   * @throws IOException if I/O error occurs.
-   */
-  private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
-      Configuration config) throws IOException {
-    String containerId = delTX.getContainerName();
-    ContainerData containerInfo = containerManager.readContainer(containerId);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerInfo.getDBPath());
-    }
-
-    int newDeletionBlocks = 0;
-    MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
-    for (String blk : delTX.getBlockIDList()) {
-      BatchOperation batch = new BatchOperation();
-      byte[] blkBytes = DFSUtil.string2Bytes(blk);
-      byte[] blkInfo = containerDB.get(blkBytes);
-      if (blkInfo != null) {
-        // Found the block in container db,
-        // use an atomic update to change its state to deleting.
-        batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
-            blkInfo);
-        batch.delete(blkBytes);
-        try {
-          containerDB.writeBatch(batch);
-          newDeletionBlocks++;
-          LOG.debug("Transited Block {} to DELETING state in container {}",
-              blk, containerId);
-        } catch (IOException e) {
-          // if some blocks failed to delete, we fail this TX,
-          // without sending this ACK to SCM, SCM will resend the TX
-          // with a certain number of retries.
-          throw new IOException(
-              "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
-        }
-      } else {
-        LOG.debug("Block {} not found or already under deletion in"
-                + " container {}, skip deleting it.", blk, containerId);
-      }
-    }
-
-    // update pending deletion blocks count in in-memory container status
-    containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId);
-  }
-
-  @Override
-  public Type getCommandType() {
-    return Type.deleteBlocksCommand;
-  }
-
-  @Override
-  public int getInvocationCount() {
-    return this.invocationCount;
-  }
-
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
deleted file mode 100644
index 1e9c8dc..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
deleted file mode 100644
index feb2f81..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-/**
-
- State machine class is used by the container to denote various states a
- container can be in and also is used for command processing.
-
- Container has the following states.
-
- Start - > getVersion -> Register -> Running  -> Shutdown
-
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
deleted file mode 100644
index 75142af..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * State Interface that allows tasks to maintain states.
- */
-public interface DatanodeState<T> {
-  /**
-   * Called before entering this state.
-   */
-  void onEnter();
-
-  /**
-   * Called After exiting this state.
-   */
-  void onExit();
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  void execute(ExecutorService executor);
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param time - Time
-   * @param timeUnit - Unit of time.
-   */
-  T await(long time, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException;
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
deleted file mode 100644
index 6ca94bf..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-
-import com.google.common.base.Strings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Init Datanode State is the task that gets run when we are in Init State.
- */
-public class InitDatanodeState implements DatanodeState,
-    Callable<DatanodeStateMachine.DatanodeStates> {
-  static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class);
-  private final SCMConnectionManager connectionManager;
-  private final Configuration conf;
-  private final StateContext context;
-  private Future<DatanodeStateMachine.DatanodeStates> result;
-
-  /**
-   *  Create InitDatanodeState Task.
-   *
-   * @param conf - Conf
-   * @param connectionManager - Connection Manager
-   * @param context - Current Context
-   */
-  public InitDatanodeState(Configuration conf,
-                           SCMConnectionManager connectionManager,
-                           StateContext context) {
-    this.conf = conf;
-    this.connectionManager = connectionManager;
-    this.context = context;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates call() throws Exception {
-    Collection<InetSocketAddress> addresses = null;
-    try {
-      addresses = OzoneClientUtils.getSCMAddresses(conf);
-    } catch (IllegalArgumentException e) {
-      if(!Strings.isNullOrEmpty(e.getMessage())) {
-        LOG.error("Failed to get SCM addresses: " + e.getMessage());
-      }
-      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-    }
-
-    if (addresses == null || addresses.isEmpty()) {
-      LOG.error("Null or empty SCM address list found.");
-      return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-    } else {
-      for (InetSocketAddress addr : addresses) {
-        connectionManager.addSCMServer(addr);
-      }
-    }
-
-    // If datanode ID is set, persist it to the ID file.
-    persistContainerDatanodeID();
-
-    return this.context.getState().getNextState();
-  }
-
-  /**
-   * Update Ozone container port to the datanode ID,
-   * and persist the ID to a local file.
-   */
-  private void persistContainerDatanodeID() throws IOException {
-    String dataNodeIDPath = OzoneUtils.getDatanodeIDPath(conf);
-    if (Strings.isNullOrEmpty(dataNodeIDPath)) {
-      LOG.error("A valid file path is needed for config setting {}",
-          ScmConfigKeys.OZONE_SCM_DATANODE_ID);
-      this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
-      return;
-    }
-    File idPath = new File(dataNodeIDPath);
-    int containerPort = this.context.getContainerPort();
-    int ratisPort = this.context.getRatisPort();
-    DatanodeID datanodeID = this.context.getParent().getDatanodeID();
-    if (datanodeID != null) {
-      datanodeID.setContainerPort(containerPort);
-      datanodeID.setRatisPort(ratisPort);
-      ContainerUtils.writeDatanodeIDTo(datanodeID, idPath);
-      LOG.info("Datanode ID is persisted to {}", dataNodeIDPath);
-    }
-  }
-
-  /**
-   * Called before entering this state.
-   */
-  @Override
-  public void onEnter() {
-    LOG.trace("Entering init container state");
-  }
-
-  /**
-   * Called After exiting this state.
-   */
-  @Override
-  public void onExit() {
-    LOG.trace("Exiting init container state");
-  }
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  @Override
-  public void execute(ExecutorService executor) {
-    result = executor.submit(this);
-  }
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param time     - Time
-   * @param timeUnit - Unit of time.
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates await(long time,
-      TimeUnit timeUnit) throws InterruptedException,
-      ExecutionException, TimeoutException {
-    return result.get(time, timeUnit);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
deleted file mode 100644
index fc9af5b..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
-import 
org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Class that implements handshake with SCM.
- */
-public class RunningDatanodeState implements DatanodeState {
-  static final Logger
-      LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
-  private final SCMConnectionManager connectionManager;
-  private final Configuration conf;
-  private final StateContext context;
-  private CompletionService<EndpointStateMachine.EndPointStates> ecs;
-
-  public RunningDatanodeState(Configuration conf,
-      SCMConnectionManager connectionManager,
-      StateContext context) {
-    this.connectionManager = connectionManager;
-    this.conf = conf;
-    this.context = context;
-  }
-
-  /**
-   * Reads a datanode ID from the persisted information.
-   *
-   * @param idPath - Path to the ID File.
-   * @return DatanodeID
-   * @throws IOException
-   */
-  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-      readPersistedDatanodeID(Path idPath) throws IOException {
-    Preconditions.checkNotNull(idPath);
-    DatanodeID datanodeID = null;
-    List<DatanodeID> datanodeIDs =
-        ContainerUtils.readDatanodeIDsFrom(idPath.toFile());
-    int containerPort = this.context.getContainerPort();
-    for(DatanodeID dnId : datanodeIDs) {
-      if(dnId.getContainerPort() == containerPort) {
-        datanodeID = dnId;
-        break;
-      }
-    }
-
-    if (datanodeID == null) {
-      throw new IOException("No valid datanode ID found from "
-          + idPath.toFile().getAbsolutePath()
-          + " that matches container port "
-          + containerPort);
-    } else {
-      StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-          containerIDProto =
-          StorageContainerDatanodeProtocolProtos
-              .ContainerNodeIDProto
-              .newBuilder()
-              .setDatanodeID(datanodeID.getProtoBufMessage())
-              .build();
-      return containerIDProto;
-    }
-  }
-
-  /**
-   * Returns ContainerNodeIDProto or null in case of Error.
-   *
-   * @return ContainerNodeIDProto
-   */
-  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-      getContainerNodeID() {
-    String dataNodeIDPath = OzoneUtils.getDatanodeIDPath(conf);
-    if (dataNodeIDPath == null || dataNodeIDPath.isEmpty()) {
-      LOG.error("A valid file path is needed for config setting {}",
-          ScmConfigKeys.OZONE_SCM_DATANODE_ID);
-
-      // This is an unrecoverable error.
-      this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
-      return null;
-    }
-    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto nodeID;
-    // try to read an existing ContainerNode ID.
-    try {
-      nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath));
-      if (nodeID != null) {
-        LOG.trace("Read Node ID :", nodeID.getDatanodeID().getDatanodeUuid());
-        return nodeID;
-      }
-    } catch (IOException ex) {
-      LOG.trace("Not able to find container Node ID, creating it.", ex);
-    }
-    this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
-    return null;
-  }
-
-  /**
-   * Called before entering this state.
-   */
-  @Override
-  public void onEnter() {
-    LOG.trace("Entering handshake task.");
-  }
-
-  /**
-   * Called After exiting this state.
-   */
-  @Override
-  public void onExit() {
-    LOG.trace("Exiting handshake task.");
-  }
-
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  @Override
-  public void execute(ExecutorService executor) {
-    ecs = new ExecutorCompletionService<>(executor);
-    for (EndpointStateMachine endpoint : connectionManager.getValues()) {
-      Callable<EndpointStateMachine.EndPointStates> endpointTask
-          = getEndPointTask(endpoint);
-      ecs.submit(endpointTask);
-    }
-  }
-  //TODO : Cache some of these tasks instead of creating them
-  //all the time.
-  private Callable<EndpointStateMachine.EndPointStates>
-      getEndPointTask(EndpointStateMachine endpoint) {
-    switch (endpoint.getState()) {
-    case GETVERSION:
-      return new VersionEndpointTask(endpoint, conf);
-    case REGISTER:
-      return  RegisterEndpointTask.newBuilder()
-          .setConfig(conf)
-          .setEndpointStateMachine(endpoint)
-          .setNodeID(getContainerNodeID())
-          .build();
-    case HEARTBEAT:
-      return HeartbeatEndpointTask.newBuilder()
-          .setConfig(conf)
-          .setEndpointStateMachine(endpoint)
-          .setNodeID(getContainerNodeID())
-          .setContext(context)
-          .build();
-    case SHUTDOWN:
-      break;
-    default:
-      throw new IllegalArgumentException("Illegal Argument.");
-    }
-    return null;
-  }
-
-  /**
-   * Computes the next state the container state machine must move to by 
looking
-   * at all the state of endpoints.
-   * <p>
-   * if any endpoint state has moved to Shutdown, either we have an
-   * unrecoverable error or we have been told to shutdown. Either case the
-   * datanode state machine should move to Shutdown state, otherwise we
-   * remain in the Running state.
-   *
-   * @return next container state.
-   */
-  private DatanodeStateMachine.DatanodeStates
-      computeNextContainerState(
-      List<Future<EndpointStateMachine.EndPointStates>> results) {
-    for (Future<EndpointStateMachine.EndPointStates> state : results) {
-      try {
-        if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) {
-          // if any endpoint tells us to shutdown we move to shutdown state.
-          return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Error in executing end point task.", e);
-      }
-    }
-    return DatanodeStateMachine.DatanodeStates.RUNNING;
-  }
-
-  /**
-   * Wait for execute to finish.
-   *
-   * @param duration - Time
-   * @param timeUnit - Unit of duration.
-   */
-  @Override
-  public DatanodeStateMachine.DatanodeStates
-      await(long duration, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    int count = connectionManager.getValues().size();
-    int returned = 0;
-    long timeLeft = timeUnit.toMillis(duration);
-    long startTime = Time.monotonicNow();
-    List<Future<EndpointStateMachine.EndPointStates>> results = new
-        LinkedList<>();
-
-    while (returned < count && timeLeft > 0) {
-      Future<EndpointStateMachine.EndPointStates> result =
-          ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
-      if (result != null) {
-        results.add(result);
-        returned++;
-      }
-      timeLeft = timeLeft - (Time.monotonicNow() - startTime);
-    }
-    return computeNextContainerState(results);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
deleted file mode 100644
index 6b8d16c..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.states.datanode;
-/**
- This package contians files that guide the state transitions from
- Init->Running->Shutdown for the datanode.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
deleted file mode 100644
index b4cce23..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.container.common.helpers
-    .DeletedContainerBlocksSummary;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine.EndPointStates;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
-import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
-import org.apache.hadoop.ozone.protocol.proto
-     .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.time.ZonedDateTime;
-import java.util.concurrent.Callable;
-
-/**
- * Heartbeat class for SCMs.
- */
-public class HeartbeatEndpointTask
-    implements Callable<EndpointStateMachine.EndPointStates> {
-  static final Logger LOG =
-      LoggerFactory.getLogger(HeartbeatEndpointTask.class);
-  private final EndpointStateMachine rpcEndpoint;
-  private final Configuration conf;
-  private ContainerNodeIDProto containerNodeIDProto;
-  private StateContext context;
-
-  /**
-   * Constructs a SCM heart beat.
-   *
-   * @param conf Config.
-   */
-  public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
-      Configuration conf, StateContext context) {
-    this.rpcEndpoint = rpcEndpoint;
-    this.conf = conf;
-    this.context = context;
-  }
-
-  /**
-   * Get the container Node ID proto.
-   *
-   * @return ContainerNodeIDProto
-   */
-  public ContainerNodeIDProto getContainerNodeIDProto() {
-    return containerNodeIDProto;
-  }
-
-  /**
-   * Set container node ID proto.
-   *
-   * @param containerNodeIDProto - the node id.
-   */
-  public void setContainerNodeIDProto(ContainerNodeIDProto
-      containerNodeIDProto) {
-    this.containerNodeIDProto = containerNodeIDProto;
-  }
-
-  /**
-   * Computes a result, or throws an exception if unable to do so.
-   *
-   * @return computed result
-   * @throws Exception if unable to compute a result
-   */
-  @Override
-  public EndpointStateMachine.EndPointStates call() throws Exception {
-    rpcEndpoint.lock();
-    try {
-      Preconditions.checkState(this.containerNodeIDProto != null);
-      DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this
-          .containerNodeIDProto.getDatanodeID());
-
-      SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
-          .sendHeartbeat(datanodeID, this.context.getNodeReport(),
-              this.context.getContainerReportState());
-      processResponse(reponse, datanodeID);
-      rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
-      rpcEndpoint.zeroMissedCount();
-    } catch (IOException ex) {
-      rpcEndpoint.logIfNeeded(ex);
-    } finally {
-      rpcEndpoint.unlock();
-    }
-    return rpcEndpoint.getState();
-  }
-
-  /**
-   * Returns a builder class for HeartbeatEndpointTask task.
-   * @return   Builder.
-   */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * Add this command to command processing Queue.
-   *
-   * @param response - SCMHeartbeat response.
-   */
-  private void processResponse(SCMHeartbeatResponseProto response,
-      final DatanodeID datanodeID) {
-    for (SCMCommandResponseProto commandResponseProto : response
-        .getCommandsList()) {
-      // Verify the response is indeed for this datanode.
-      Preconditions.checkState(commandResponseProto.getDatanodeUUID()
-          .equalsIgnoreCase(datanodeID.getDatanodeUuid().toString()),
-          "Unexpected datanode ID in the response.");
-      switch (commandResponseProto.getCmdType()) {
-      case sendContainerReport:
-        this.context.addCommand(SendContainerCommand.getFromProtobuf(
-            commandResponseProto.getSendReport()));
-        break;
-      case reregisterCommand:
-        if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Received SCM notification to register."
-                + " Interrupt HEARTBEAT and transit to REGISTER state.");
-          }
-          rpcEndpoint.setState(EndPointStates.REGISTER);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Illegal state {} found, expecting {}.",
-                rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
-          }
-        }
-        break;
-      case deleteBlocksCommand:
-        DeleteBlocksCommand db = DeleteBlocksCommand
-            .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
-        if (!db.blocksTobeDeleted().isEmpty()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(DeletedContainerBlocksSummary
-                .getFrom(db.blocksTobeDeleted())
-                .toString());
-          }
-          this.context.addCommand(db);
-        }
-        break;
-      case closeContainerCommand:
-        CloseContainerCommand closeContainer =
-            CloseContainerCommand.getFromProtobuf(
-                commandResponseProto.getCloseContainerProto());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Received SCM container close request for container {}",
-              closeContainer.getContainerName());
-        }
-        this.context.addCommand(closeContainer);
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown response : "
-            + commandResponseProto.getCmdType().name());
-      }
-    }
-  }
-
-  /**
-   * Builder class for HeartbeatEndpointTask.
-   */
-  public static class Builder {
-    private EndpointStateMachine endPointStateMachine;
-    private Configuration conf;
-    private ContainerNodeIDProto containerNodeIDProto;
-    private StateContext context;
-
-    /**
-     * Constructs the builder class.
-     */
-    public Builder() {
-    }
-
-    /**
-     * Sets the endpoint state machine.
-     *
-     * @param rpcEndPoint - Endpoint state machine.
-     * @return Builder
-     */
-    public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) {
-      this.endPointStateMachine = rpcEndPoint;
-      return this;
-    }
-
-    /**
-     * Sets the Config.
-     *
-     * @param config - config
-     * @return Builder
-     */
-    public Builder setConfig(Configuration config) {
-      this.conf = config;
-      return this;
-    }
-
-    /**
-     * Sets the NodeID.
-     *
-     * @param nodeID - NodeID proto
-     * @return Builder
-     */
-    public Builder setNodeID(ContainerNodeIDProto nodeID) {
-      this.containerNodeIDProto = nodeID;
-      return this;
-    }
-
-    /**
-     * Sets the context.
-     * @param stateContext - State context.
-     * @return this.
-     */
-    public Builder setContext(StateContext stateContext) {
-      this.context = stateContext;
-      return this;
-    }
-
-    public HeartbeatEndpointTask build() {
-      if (endPointStateMachine == null) {
-        LOG.error("No endpoint specified.");
-        throw new IllegalArgumentException("A valid endpoint state machine is" 
+
-            " needed to construct HeartbeatEndpointTask task");
-      }
-
-      if (conf == null) {
-        LOG.error("No config specified.");
-        throw new IllegalArgumentException("A valid configration is needed to" 
+
-            " construct HeartbeatEndpointTask task");
-      }
-
-      if (containerNodeIDProto == null) {
-        LOG.error("No nodeID specified.");
-        throw new IllegalArgumentException("A vaild Node ID is needed to " +
-            "construct HeartbeatEndpointTask task");
-      }
-
-      HeartbeatEndpointTask task = new HeartbeatEndpointTask(this
-          .endPointStateMachine, this.conf, this.context);
-      task.setContainerNodeIDProto(containerNodeIDProto);
-      return task;
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to