Repository: hadoop
Updated Branches:
  refs/heads/ozone-0.2 1e1721507 -> 7f495ee08


HDDS-460. Replication manager failed to import container data. Contributed by 
Elek, Marton.

(cherry picked from commit 042bf74d5eb3dc627658b8c4c027628569169513)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f495ee0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f495ee0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f495ee0

Branch: refs/heads/ozone-0.2
Commit: 7f495ee0804dee729df03440b7eda7c4a0a98d4c
Parents: 1e17215
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 19 23:51:50 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 19 23:52:46 2018 +0530

----------------------------------------------------------------------
 .../statemachine/DatanodeStateMachine.java      |  19 ++-
 .../ReplicateContainerCommandHandler.java       | 120 ++------------
 .../replication/ContainerReplicator.java        |  27 +++
 .../DownloadAndImportReplicator.java            | 136 ++++++++++++++++
 .../replication/GrpcReplicationClient.java      |   2 +-
 .../replication/ReplicationSupervisor.java      | 142 ++++++++++++++++
 .../container/replication/ReplicationTask.java  | 102 ++++++++++++
 .../TestReplicateContainerCommandHandler.java   | 163 -------------------
 .../replication/TestReplicationSupervisor.java  | 143 ++++++++++++++++
 .../container/replication/package-info.java     |  22 +++
 10 files changed, 602 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 875d063..1bade8e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -40,7 +40,12 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .ReplicateContainerCommandHandler;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
+import 
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -69,6 +74,7 @@ public class DatanodeStateMachine implements Closeable {
   private AtomicLong nextHB;
   private Thread stateMachineThread = null;
   private Thread cmdProcessThread = null;
+  private final ReplicationSupervisor supervisor;
 
   /**
    * Constructs a a datanode state machine.
@@ -89,14 +95,21 @@ public class DatanodeStateMachine implements Closeable {
         new OzoneConfiguration(conf), context);
     nextHB = new AtomicLong(Time.monotonicNow());
 
+    ContainerReplicator replicator =
+        new DownloadAndImportReplicator(container.getContainerSet(),
+            container.getDispatcher(),
+            new SimpleContainerDownloader(conf), new TarContainerPacker());
+
+    supervisor =
+        new ReplicationSupervisor(container.getContainerSet(), replicator, 10);
+
     // When we add new handlers just adding a new handler here should do the
      // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
         .addHandler(new CloseContainerCommandHandler())
         .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
             conf))
-        .addHandler(new ReplicateContainerCommandHandler(conf,
-            container.getContainerSet(), container.getDispatcher()))
+        .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
@@ -295,6 +308,7 @@ public class DatanodeStateMachine implements Closeable {
   public void startDaemon() {
     Runnable startStateMachineTask = () -> {
       try {
+        supervisor.start();
         start();
         LOG.info("Ozone container server started.");
       } catch (Exception ex) {
@@ -323,6 +337,7 @@ public class DatanodeStateMachine implements Closeable {
    */
   public synchronized void stopDaemon() {
     try {
+      supervisor.stop();
       context.setState(DatanodeStates.SHUTDOWN);
       reportManager.shutdown();
       this.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index cb677c2..09c379f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -16,33 +16,17 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
-import java.io.FileInputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
-import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
@@ -58,39 +42,19 @@ public class ReplicateContainerCommandHandler implements 
CommandHandler {
   static final Logger LOG =
       LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
 
-  private ContainerDispatcher containerDispatcher;
-
   private int invocationCount;
 
   private long totalTime;
 
-  private ContainerDownloader downloader;
-
   private Configuration conf;
 
-  private TarContainerPacker packer = new TarContainerPacker();
-
-  private ContainerSet containerSet;
-
-  private Lock lock = new ReentrantLock();
+  private ReplicationSupervisor supervisor;
 
   public ReplicateContainerCommandHandler(
       Configuration conf,
-      ContainerSet containerSet,
-      ContainerDispatcher containerDispatcher,
-      ContainerDownloader downloader) {
+      ReplicationSupervisor supervisor) {
     this.conf = conf;
-    this.containerSet = containerSet;
-    this.downloader = downloader;
-    this.containerDispatcher = containerDispatcher;
-  }
-
-  public ReplicateContainerCommandHandler(
-      Configuration conf,
-      ContainerSet containerSet,
-      ContainerDispatcher containerDispatcher) {
-    this(conf, containerSet, containerDispatcher,
-        new SimpleContainerDownloader(conf));
+    this.supervisor = supervisor;
   }
 
   @Override
@@ -108,72 +72,12 @@ public class ReplicateContainerCommandHandler implements 
CommandHandler {
           String.format("Replication command is received for container %d "
               + "but the size of source datanodes was 0.", containerID));
 
-      LOG.info("Starting replication of container {} from {}", containerID,
-          sourceDatanodes);
-      CompletableFuture<Path> tempTarFile = downloader
-          .getContainerDataFromReplicas(containerID,
-              sourceDatanodes);
-
-      CompletableFuture<Void> result =
-          tempTarFile.thenAccept(path -> {
-            LOG.info("Container {} is downloaded, starting to import.",
-                containerID);
-            importContainer(containerID, path);
-          });
-
-      result.whenComplete((aVoid, throwable) -> {
-        if (throwable != null) {
-          LOG.error("Container replication was unsuccessful .", throwable);
-        } else {
-          LOG.info("Container {} is replicated successfully", containerID);
-        }
-      });
-    } finally {
-      updateCommandStatus(context, command, true, LOG);
+      ReplicationTask replicationTask =
+          new ReplicationTask(containerID, sourceDatanodes);
+      supervisor.addTask(replicationTask);
 
-    }
-  }
-
-  protected void importContainer(long containerID, Path tarFilePath) {
-    lock.lock();
-    try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Handler handler = containerDispatcher.getHandler(
-            originalContainerData.getContainerType());
-
-        Container container = handler.importContainer(containerID,
-            originalContainerData.getMaxSize(),
-            tempContainerTarStream,
-            packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } catch (Exception e) {
-      LOG.error(
-          "Can't import the downloaded container data id=" + containerID,
-          e);
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error(
-            "Container import is failed and the downloaded file can't be "
-                + "deleted: "
-                + tarFilePath.toAbsolutePath().toString());
-      }
     } finally {
-      lock.unlock();
+      updateCommandStatus(context, command, true, LOG);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
new file mode 100644
index 0000000..827b9d6
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+/**
+ * Service to do the real replication task.
+ *
+ * An implementation should download the container and im
+ */
+public interface ContainerReplicator {
+  void replicate(ReplicationTask task);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
new file mode 100644
index 0000000..5ef5841
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default replication implementation.
+ * <p>
+ * This class does the real job. Executes the download and import the container
+ * to the container set.
+ */
+public class DownloadAndImportReplicator implements ContainerReplicator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DownloadAndImportReplicator.class);
+
+  private final ContainerSet containerSet;
+
+  private final ContainerDispatcher containerDispatcher;
+
+  private final ContainerDownloader downloader;
+
+  private final TarContainerPacker packer;
+
+  public DownloadAndImportReplicator(
+      ContainerSet containerSet,
+      ContainerDispatcher containerDispatcher,
+      ContainerDownloader downloader,
+      TarContainerPacker packer) {
+    this.containerSet = containerSet;
+    this.containerDispatcher = containerDispatcher;
+    this.downloader = downloader;
+    this.packer = packer;
+  }
+
+  public void importContainer(long containerID, Path tarFilePath) {
+    try {
+      ContainerData originalContainerData;
+      try (FileInputStream tempContainerTarStream = new FileInputStream(
+          tarFilePath.toFile())) {
+        byte[] containerDescriptorYaml =
+            packer.unpackContainerDescriptor(tempContainerTarStream);
+        originalContainerData = ContainerDataYaml.readContainer(
+            containerDescriptorYaml);
+      }
+
+      try (FileInputStream tempContainerTarStream = new FileInputStream(
+          tarFilePath.toFile())) {
+
+        Handler handler = containerDispatcher.getHandler(
+            originalContainerData.getContainerType());
+
+        Container container = handler.importContainer(containerID,
+            originalContainerData.getMaxSize(),
+            tempContainerTarStream,
+            packer);
+
+        containerSet.addContainer(container);
+      }
+
+    } catch (Exception e) {
+      LOG.error(
+          "Can't import the downloaded container data id=" + containerID,
+          e);
+      try {
+        Files.delete(tarFilePath);
+      } catch (Exception ex) {
+        LOG.error(
+            "Container import is failed and the downloaded file can't be "
+                + "deleted: "
+                + tarFilePath.toAbsolutePath().toString());
+      }
+    }
+  }
+
+  @Override
+  public void replicate(ReplicationTask task) {
+    long containerID = task.getContainerId();
+
+    List<DatanodeDetails> sourceDatanodes = task.getSources();
+
+    LOG.info("Starting replication of container {} from {}", containerID,
+        sourceDatanodes);
+
+    CompletableFuture<Path> tempTarFile = downloader
+        .getContainerDataFromReplicas(containerID,
+            sourceDatanodes);
+
+    try {
+      //wait for the download. This thread pool is limiting the paralell
+      //downloads, so it's ok to block here and wait for the full download.
+      Path path = tempTarFile.get();
+      LOG.info("Container {} is downloaded, starting to import.",
+          containerID);
+      importContainer(containerID, path);
+      LOG.info("Container {} is replicated successfully", containerID);
+      task.setStatus(Status.DONE);
+    } catch (Exception e) {
+      LOG.error("Container replication was unsuccessful .", e);
+      task.setStatus(Status.FAILED);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 91d098f..3aafb0c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -157,8 +157,8 @@ public class GrpcReplicationClient {
     public void onCompleted() {
       try {
         stream.close();
-        response.complete(outputPath);
         LOG.info("Container is downloaded to {}", outputPath);
+        response.complete(outputPath);
       } catch (IOException e) {
         response.completeExceptionally(e);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
new file mode 100644
index 0000000..1d8d5f6
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Single point to schedule the downloading tasks based on priorities.
+ */
+public class ReplicationSupervisor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationSupervisor.class);
+
+  private final Set<Worker> threadPool = new HashSet<>();
+
+  private final Map<Long, ReplicationTask> queue = new TreeMap();
+
+  private final ContainerSet containerSet;
+
+  private final ContainerReplicator replicator;
+
+  private final int poolSize;
+
+  public ReplicationSupervisor(
+      ContainerSet containerSet,
+      ContainerReplicator replicator, int poolSize) {
+    this.containerSet = containerSet;
+    this.replicator = replicator;
+    this.poolSize = poolSize;
+  }
+
+  public synchronized void addTask(ReplicationTask task) {
+    queue.putIfAbsent(task.getContainerId(), task);
+    synchronized (threadPool) {
+      threadPool.notify();
+    }
+  }
+
+  public void start() {
+    for (int i = 0; i < poolSize; i++) {
+      Worker worker = new Worker();
+      Thread thread = new Thread(worker, "ContainerReplication-" + i);
+      thread.setDaemon(true);
+      thread.start();
+      threadPool.add(worker);
+    }
+  }
+
+  public synchronized ReplicationTask selectTask() {
+    for (ReplicationTask task : queue.values()) {
+      if (task.getStatus() == Status.QUEUED) {
+        if (containerSet.getContainer(task.getContainerId()) == null) {
+          task.setStatus(Status.DOWNLOADING);
+          return task;
+        } else {
+          LOG.debug("Container {} has already been downloaded.",
+              task.getContainerId());
+          queue.remove(task.getContainerId());
+        }
+      } else if (task.getStatus() == Status.FAILED) {
+        LOG.error(
+            "Container {} can't be downloaded from any of the datanodes.",
+            task.getContainerId());
+        queue.remove(task.getContainerId());
+      } else if (task.getStatus() == Status.DONE) {
+        queue.remove(task.getContainerId());
+        LOG.info("Container {} is replicated.", task.getContainerId());
+      }
+    }
+    //no available task.
+    return null;
+  }
+
+  public void stop() {
+    for (Worker worker : threadPool) {
+      worker.stop();
+    }
+  }
+
+  @VisibleForTesting
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  private class Worker implements Runnable {
+
+    private boolean running = true;
+
+    @Override
+    public void run() {
+      try {
+        while (running) {
+          ReplicationTask task = selectTask();
+          if (task == null) {
+            synchronized (threadPool) {
+              threadPool.wait();
+            }
+          } else {
+            replicator.replicate(task);
+          }
+        }
+      } catch (Exception ex) {
+        LOG.error("Error on doing replication", ex);
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          LOG.error("Error on waiting after failed replication task", e);
+        }
+      }
+    }
+
+    public void stop() {
+      running = false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
new file mode 100644
index 0000000..9019811
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * The task to download a container from the sources.
+ */
+public class ReplicationTask {
+
+  private volatile Status status = Status.QUEUED;
+
+  private final long containerId;
+
+  private List<DatanodeDetails> sources;
+
+  private final Instant queued = Instant.now();
+
+  public ReplicationTask(long containerId,
+      List<DatanodeDetails> sources) {
+    this.containerId = containerId;
+    this.sources = sources;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ReplicationTask that = (ReplicationTask) o;
+    return containerId == that.containerId;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(containerId);
+  }
+
+  public long getContainerId() {
+    return containerId;
+  }
+
+  public List<DatanodeDetails> getSources() {
+    return sources;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public void setStatus(
+      Status status) {
+    this.status = status;
+  }
+
+  @Override
+  public String toString() {
+    return "ReplicationTask{" +
+        "status=" + status +
+        ", containerId=" + containerId +
+        ", sources=" + sources +
+        ", queued=" + queued +
+        '}';
+  }
+
+  public Instant getQueued() {
+    return queued;
+  }
+
+  /**
+   * Status of the replication.
+   */
+  public enum Status {
+    QUEUED,
+    DOWNLOADING,
+    FAILED,
+    DONE
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
deleted file mode 100644
index 6529922..0000000
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.TestGenericTestUtils;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Test replication command handler.
- */
-public class TestReplicateContainerCommandHandler {
-
-  private static final String EXCEPTION_MESSAGE = "Oh my god";
-  private ReplicateContainerCommandHandler handler;
-  private StubDownloader downloader;
-  private ReplicateContainerCommand command;
-  private List<Long> importedContainerIds;
-
-  @Before
-  public void init() {
-    importedContainerIds = new ArrayList<>();
-
-    OzoneConfiguration conf = new OzoneConfiguration();
-    ContainerSet containerSet = Mockito.mock(ContainerSet.class);
-    ContainerDispatcher containerDispatcher =
-        Mockito.mock(ContainerDispatcher.class);
-
-    downloader = new StubDownloader();
-
-    handler = new ReplicateContainerCommandHandler(conf, containerSet,
-        containerDispatcher, downloader) {
-      @Override
-      protected void importContainer(long containerID, Path tarFilePath) {
-        importedContainerIds.add(containerID);
-      }
-    };
-
-    //the command
-    ArrayList<DatanodeDetails> datanodeDetails = new ArrayList<>();
-    datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
-    datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
-
-    command = new ReplicateContainerCommand(1L, datanodeDetails);
-  }
-
-  @Test
-  public void handle() throws TimeoutException, InterruptedException {
-    //GIVEN
-
-    //WHEN
-    handler.handle(command, null, Mockito.mock(StateContext.class), null);
-
-    TestGenericTestUtils
-        .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
-
-    Assert.assertNotNull(downloader.futureByContainers.get(1L));
-    downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
-
-    TestGenericTestUtils
-        .waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
-  }
-
-  @Test
-  public void handleWithErrors() throws TimeoutException, InterruptedException 
{
-    //GIVEN
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(ReplicateContainerCommandHandler.LOG);
-
-    //WHEN
-    handler.handle(command, null, Mockito.mock(StateContext.class), null);
-
-    //THEN
-    TestGenericTestUtils
-        .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
-
-    Assert.assertNotNull(downloader.futureByContainers.get(1L));
-    downloader.futureByContainers.get(1L)
-        .completeExceptionally(new IllegalArgumentException(
-            EXCEPTION_MESSAGE));
-
-    TestGenericTestUtils
-        .waitFor(() -> {
-          String output = logCapturer.getOutput();
-          return output.contains("unsuccessful") && output
-            .contains(EXCEPTION_MESSAGE); },
-            100,
-            2000);
-  }
-
-  /**
-   * Can't handle a command if there are no source replicas.
-   */
-  @Test(expected = IllegalArgumentException.class)
-  public void handleWithoutReplicas()
-      throws TimeoutException, InterruptedException {
-    //GIVEN
-    ReplicateContainerCommand commandWithoutReplicas =
-        new ReplicateContainerCommand(1L, new ArrayList<>());
-
-    //WHEN
-    handler
-        .handle(commandWithoutReplicas,
-            null,
-            Mockito.mock(StateContext.class),
-            null);
-
-  }
-  private static class StubDownloader implements ContainerDownloader {
-
-    private Map<Long, CompletableFuture<Path>> futureByContainers =
-        new HashMap<>();
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public CompletableFuture<Path> getContainerDataFromReplicas(
-        long containerId, List<DatanodeDetails> sources) {
-      CompletableFuture<Path> future = new CompletableFuture<>();
-      futureByContainers.put(containerId, future);
-      return future;
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
new file mode 100644
index 0000000..d433319
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test the replication supervisor.
+ */
+public class TestReplicationSupervisor {
+
+  private OzoneConfiguration conf = new OzoneConfiguration();
+
+  @Test
+  public void normal() {
+    //GIVEN
+    ContainerSet set = new ContainerSet();
+
+    FakeReplicator replicator = new FakeReplicator(set);
+    ReplicationSupervisor supervisor =
+        new ReplicationSupervisor(set, replicator, 5);
+
+    List<DatanodeDetails> datanodes = IntStream.range(1, 3)
+        .mapToObj(v -> Mockito.mock(DatanodeDetails.class))
+        .collect(Collectors.toList());
+
+    try {
+      supervisor.start();
+      //WHEN
+      supervisor.addTask(new ReplicationTask(1L, datanodes));
+      supervisor.addTask(new ReplicationTask(1L, datanodes));
+      supervisor.addTask(new ReplicationTask(1L, datanodes));
+      supervisor.addTask(new ReplicationTask(2L, datanodes));
+      supervisor.addTask(new ReplicationTask(2L, datanodes));
+      supervisor.addTask(new ReplicationTask(3L, datanodes));
+      try {
+        Thread.sleep(300);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      //THEN
+      System.out.println(replicator.replicated.get(0));
+
+      Assert
+          .assertEquals(3, replicator.replicated.size());
+
+    } finally {
+      supervisor.stop();
+    }
+  }
+
+  @Test
+  public void duplicateMessageAfterAWhile() throws InterruptedException {
+    //GIVEN
+    ContainerSet set = new ContainerSet();
+
+    FakeReplicator replicator = new FakeReplicator(set);
+    ReplicationSupervisor supervisor =
+        new ReplicationSupervisor(set, replicator, 2);
+
+    List<DatanodeDetails> datanodes = IntStream.range(1, 3)
+        .mapToObj(v -> Mockito.mock(DatanodeDetails.class))
+        .collect(Collectors.toList());
+
+    try {
+      supervisor.start();
+      //WHEN
+      supervisor.addTask(new ReplicationTask(1L, datanodes));
+      Thread.sleep(400);
+      supervisor.addTask(new ReplicationTask(1L, datanodes));
+      Thread.sleep(300);
+
+      //THEN
+      System.out.println(replicator.replicated.get(0));
+
+      Assert
+          .assertEquals(1, replicator.replicated.size());
+
+      //the last item is still in the queue as we cleanup the queue during the
+      // selection
+      Assert.assertEquals(1, supervisor.getQueueSize());
+
+    } finally {
+      supervisor.stop();
+    }
+  }
+
+  private class FakeReplicator implements ContainerReplicator {
+
+    private List<ReplicationTask> replicated = new ArrayList<>();
+
+    private ContainerSet containerSet;
+
+    FakeReplicator(ContainerSet set) {
+      this.containerSet = set;
+    }
+
+    @Override
+    public void replicate(ReplicationTask task) {
+      KeyValueContainerData kvcd =
+          new KeyValueContainerData(task.getContainerId(), 100L);
+      KeyValueContainer kvc =
+          new KeyValueContainer(kvcd, conf);
+      try {
+        //download is slow
+        Thread.sleep(100);
+        replicated.add(task);
+        containerSet.addContainer(kvc);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f495ee0/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..5c905e0
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Tests for the container replication.
+ */
+package org.apache.hadoop.ozone.container.replication;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to