Repository: hadoop
Updated Branches:
  refs/heads/trunk c9077a9f5 -> 7db3bb3ac


HDDS-544. Unconditional wait findbug warning from ReplicationSupervisor.
Contributed by Arpit Agarwal.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7db3bb3a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7db3bb3a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7db3bb3a

Branch: refs/heads/trunk
Commit: 7db3bb3ac185929fbe8f5b9b59cdd157eec2ac64
Parents: c9077a9
Author: Anu Engineer <aengin...@apache.org>
Authored: Sun Oct 21 23:18:38 2018 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Sun Oct 21 23:18:38 2018 -0700

----------------------------------------------------------------------
 .../statemachine/DatanodeStateMachine.java      |   1 -
 .../replication/ReplicationSupervisor.java      | 137 +++++++++----------
 .../replication/TestReplicationSupervisor.java  |  31 ++---
 3 files changed, 73 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7db3bb3a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 1bade8e..85fa304 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -308,7 +308,6 @@ public class DatanodeStateMachine implements Closeable {
   public void startDaemon() {
     Runnable startStateMachineTask = () -> {
       try {
-        supervisor.start();
         start();
         LOG.info("Ozone container server started.");
       } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7db3bb3a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 1d8d5f6..c59d643 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentHashMap.KeySetView;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
@@ -37,106 +39,91 @@ public class ReplicationSupervisor {
   private static final Logger LOG =
       LoggerFactory.getLogger(ReplicationSupervisor.class);
 
-  private final Set<Worker> threadPool = new HashSet<>();
-
-  private final Map<Long, ReplicationTask> queue = new TreeMap();
-
   private final ContainerSet containerSet;
-
   private final ContainerReplicator replicator;
+  private final ThreadPoolExecutor executor;
 
-  private final int poolSize;
+  /**
+   * A set of container IDs that are currently being downloaded
+   * or queued for download. Tracked so we don't schedule > 1
+   * concurrent download for the same container.
+   */
+  private final KeySetView<Object, Boolean> containersInFlight;
 
   public ReplicationSupervisor(
       ContainerSet containerSet,
       ContainerReplicator replicator, int poolSize) {
     this.containerSet = containerSet;
     this.replicator = replicator;
-    this.poolSize = poolSize;
-  }
-
-  public synchronized void addTask(ReplicationTask task) {
-    queue.putIfAbsent(task.getContainerId(), task);
-    synchronized (threadPool) {
-      threadPool.notify();
-    }
+    this.containersInFlight = ConcurrentHashMap.newKeySet();
+    this.executor = new ThreadPoolExecutor(
+        0, poolSize, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("ContainerReplicationThread-%d")
+            .build());
   }
 
-  public void start() {
-    for (int i = 0; i < poolSize; i++) {
-      Worker worker = new Worker();
-      Thread thread = new Thread(worker, "ContainerReplication-" + i);
-      thread.setDaemon(true);
-      thread.start();
-      threadPool.add(worker);
-    }
-  }
-
-  public synchronized ReplicationTask selectTask() {
-    for (ReplicationTask task : queue.values()) {
-      if (task.getStatus() == Status.QUEUED) {
-        if (containerSet.getContainer(task.getContainerId()) == null) {
-          task.setStatus(Status.DOWNLOADING);
-          return task;
-        } else {
-          LOG.debug("Container {} has already been downloaded.",
-              task.getContainerId());
-          queue.remove(task.getContainerId());
-        }
-      } else if (task.getStatus() == Status.FAILED) {
-        LOG.error(
-            "Container {} can't be downloaded from any of the datanodes.",
-            task.getContainerId());
-        queue.remove(task.getContainerId());
-      } else if (task.getStatus() == Status.DONE) {
-        queue.remove(task.getContainerId());
-        LOG.info("Container {} is replicated.", task.getContainerId());
-      }
+  /**
+   * Queue an asynchronous download of the given container.
+   */
+  public void addTask(ReplicationTask task) {
+    if (containersInFlight.add(task.getContainerId())) {
+      executor.submit(new TaskRunner(task));
     }
-    //no available task.
-    return null;
   }
 
   public void stop() {
-    for (Worker worker : threadPool) {
-      worker.stop();
+    try {
+      executor.shutdown();
+      if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
+        executor.shutdownNow();
+      }
+    } catch (InterruptedException ie) {
+      // Ignore, we don't really care about the failure.
+      Thread.currentThread().interrupt();
     }
   }
 
+  /**
+   * Get the number of containers currently being downloaded
+   * or scheduled for download.
+   * @return Count of in-flight replications.
+   */
   @VisibleForTesting
-  public int getQueueSize() {
-    return queue.size();
+  public int getInFlightReplications() {
+    return containersInFlight.size();
   }
 
-  private class Worker implements Runnable {
+  private final class TaskRunner implements Runnable {
+    private final ReplicationTask task;
 
-    private boolean running = true;
+    private TaskRunner(ReplicationTask task) {
+      this.task = task;
+    }
 
     @Override
     public void run() {
       try {
-        while (running) {
-          ReplicationTask task = selectTask();
-          if (task == null) {
-            synchronized (threadPool) {
-              threadPool.wait();
-            }
-          } else {
-            replicator.replicate(task);
-          }
+        if (containerSet.getContainer(task.getContainerId()) != null) {
+          LOG.debug("Container {} has already been downloaded.",
+              task.getContainerId());
+          return;
         }
-      } catch (Exception ex) {
-        LOG.error("Error on doing replication", ex);
-        try {
-          Thread.sleep(200);
-        } catch (InterruptedException e) {
-          LOG.error("Error on waiting after failed replication task", e);
+
+        task.setStatus(Status.DOWNLOADING);
+        replicator.replicate(task);
+
+        if (task.getStatus() == Status.FAILED) {
+          LOG.error(
+              "Container {} can't be downloaded from any of the datanodes.",
+              task.getContainerId());
+        } else if (task.getStatus() == Status.DONE) {
+          LOG.info("Container {} is replicated.", task.getContainerId());
         }
+      } finally {
+        containersInFlight.remove(task.getContainerId());
       }
     }
-
-    public void stop() {
-      running = false;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7db3bb3a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index d433319..030412c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -41,7 +42,7 @@ public class TestReplicationSupervisor {
   private OzoneConfiguration conf = new OzoneConfiguration();
 
   @Test
-  public void normal() {
+  public void normal() throws Exception {
     //GIVEN
     ContainerSet set = new ContainerSet();
 
@@ -54,7 +55,6 @@ public class TestReplicationSupervisor {
         .collect(Collectors.toList());
 
     try {
-      supervisor.start();
       //WHEN
       supervisor.addTask(new ReplicationTask(1L, datanodes));
       supervisor.addTask(new ReplicationTask(1L, datanodes));
@@ -62,16 +62,11 @@ public class TestReplicationSupervisor {
       supervisor.addTask(new ReplicationTask(2L, datanodes));
       supervisor.addTask(new ReplicationTask(2L, datanodes));
       supervisor.addTask(new ReplicationTask(3L, datanodes));
-      try {
-        Thread.sleep(300);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
       //THEN
-      System.out.println(replicator.replicated.get(0));
+      LambdaTestUtils.await(200_000, 1000,
+          () -> supervisor.getInFlightReplications() == 0);
 
-      Assert
-          .assertEquals(3, replicator.replicated.size());
+      Assert.assertEquals(3, replicator.replicated.size());
 
     } finally {
       supervisor.stop();
@@ -79,7 +74,7 @@ public class TestReplicationSupervisor {
   }
 
   @Test
-  public void duplicateMessageAfterAWhile() throws InterruptedException {
+  public void duplicateMessageAfterAWhile() throws Exception {
     //GIVEN
     ContainerSet set = new ContainerSet();
 
@@ -92,22 +87,18 @@ public class TestReplicationSupervisor {
         .collect(Collectors.toList());
 
     try {
-      supervisor.start();
       //WHEN
       supervisor.addTask(new ReplicationTask(1L, datanodes));
-      Thread.sleep(400);
+      LambdaTestUtils.await(200_000, 1000,
+          () -> supervisor.getInFlightReplications() == 0);
       supervisor.addTask(new ReplicationTask(1L, datanodes));
-      Thread.sleep(300);
+      LambdaTestUtils.await(200_000, 1000,
+          () -> supervisor.getInFlightReplications() == 0);
 
       //THEN
       System.out.println(replicator.replicated.get(0));
 
-      Assert
-          .assertEquals(1, replicator.replicated.size());
-
-      //the last item is still in the queue as we cleanup the queue during the
-      // selection
-      Assert.assertEquals(1, supervisor.getQueueSize());
+      Assert.assertEquals(1, replicator.replicated.size());
 
     } finally {
       supervisor.stop();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to