Repository: hadoop Updated Branches: refs/heads/trunk c9077a9f5 -> 7db3bb3ac
HDDS-544. Unconditional wait findbug warning from ReplicationSupervisor. Contributed by Arpit Agarwal. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7db3bb3a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7db3bb3a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7db3bb3a Branch: refs/heads/trunk Commit: 7db3bb3ac185929fbe8f5b9b59cdd157eec2ac64 Parents: c9077a9 Author: Anu Engineer <aengin...@apache.org> Authored: Sun Oct 21 23:18:38 2018 -0700 Committer: Anu Engineer <aengin...@apache.org> Committed: Sun Oct 21 23:18:38 2018 -0700 ---------------------------------------------------------------------- .../statemachine/DatanodeStateMachine.java | 1 - .../replication/ReplicationSupervisor.java | 137 +++++++++---------- .../replication/TestReplicationSupervisor.java | 31 ++--- 3 files changed, 73 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7db3bb3a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 1bade8e..85fa304 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -308,7 +308,6 @@ public class DatanodeStateMachine implements Closeable { public void startDaemon() { Runnable startStateMachineTask = () -> { try { - supervisor.start(); start(); LOG.info("Ozone container server started."); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7db3bb3a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 1d8d5f6..c59d643 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.ozone.container.replication; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentHashMap.KeySetView; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; @@ -37,106 +39,91 @@ public class ReplicationSupervisor { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSupervisor.class); - private final Set<Worker> threadPool = new HashSet<>(); - - private final Map<Long, ReplicationTask> queue = new TreeMap(); - private final ContainerSet containerSet; - private final ContainerReplicator replicator; + private final ThreadPoolExecutor executor; - private final int poolSize; + /** + * A set of container IDs that are currently being downloaded + * or queued for download. Tracked so we don't schedule > 1 + * concurrent download for the same container. + */ + private final KeySetView<Object, Boolean> containersInFlight; public ReplicationSupervisor( ContainerSet containerSet, ContainerReplicator replicator, int poolSize) { this.containerSet = containerSet; this.replicator = replicator; - this.poolSize = poolSize; - } - - public synchronized void addTask(ReplicationTask task) { - queue.putIfAbsent(task.getContainerId(), task); - synchronized (threadPool) { - threadPool.notify(); - } + this.containersInFlight = ConcurrentHashMap.newKeySet(); + this.executor = new ThreadPoolExecutor( + 0, poolSize, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ContainerReplicationThread-%d") + .build()); } - public void start() { - for (int i = 0; i < poolSize; i++) { - Worker worker = new Worker(); - Thread thread = new Thread(worker, "ContainerReplication-" + i); - thread.setDaemon(true); - thread.start(); - threadPool.add(worker); - } - } - - public synchronized ReplicationTask selectTask() { - for (ReplicationTask task : queue.values()) { - if (task.getStatus() == Status.QUEUED) { - if (containerSet.getContainer(task.getContainerId()) == null) { - task.setStatus(Status.DOWNLOADING); - return task; - } else { - LOG.debug("Container {} has already been downloaded.", - task.getContainerId()); - queue.remove(task.getContainerId()); - } - } else if (task.getStatus() == Status.FAILED) { - LOG.error( - "Container {} can't be downloaded from any of the datanodes.", - task.getContainerId()); - queue.remove(task.getContainerId()); - } else if (task.getStatus() == Status.DONE) { - queue.remove(task.getContainerId()); - LOG.info("Container {} is replicated.", task.getContainerId()); - } + /** + * Queue an asynchronous download of the given container. + */ + public void addTask(ReplicationTask task) { + if (containersInFlight.add(task.getContainerId())) { + executor.submit(new TaskRunner(task)); } - //no available task. - return null; } public void stop() { - for (Worker worker : threadPool) { - worker.stop(); + try { + executor.shutdown(); + if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // Ignore, we don't really care about the failure. + Thread.currentThread().interrupt(); } } + /** + * Get the number of containers currently being downloaded + * or scheduled for download. + * @return Count of in-flight replications. + */ @VisibleForTesting - public int getQueueSize() { - return queue.size(); + public int getInFlightReplications() { + return containersInFlight.size(); } - private class Worker implements Runnable { + private final class TaskRunner implements Runnable { + private final ReplicationTask task; - private boolean running = true; + private TaskRunner(ReplicationTask task) { + this.task = task; + } @Override public void run() { try { - while (running) { - ReplicationTask task = selectTask(); - if (task == null) { - synchronized (threadPool) { - threadPool.wait(); - } - } else { - replicator.replicate(task); - } + if (containerSet.getContainer(task.getContainerId()) != null) { + LOG.debug("Container {} has already been downloaded.", + task.getContainerId()); + return; } - } catch (Exception ex) { - LOG.error("Error on doing replication", ex); - try { - Thread.sleep(200); - } catch (InterruptedException e) { - LOG.error("Error on waiting after failed replication task", e); + + task.setStatus(Status.DOWNLOADING); + replicator.replicate(task); + + if (task.getStatus() == Status.FAILED) { + LOG.error( + "Container {} can't be downloaded from any of the datanodes.", + task.getContainerId()); + } else if (task.getStatus() == Status.DONE) { + LOG.info("Container {} is replicated.", task.getContainerId()); } + } finally { + containersInFlight.remove(task.getContainerId()); } } - - public void stop() { - running = false; - } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7db3bb3a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index d433319..030412c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -41,7 +42,7 @@ public class TestReplicationSupervisor { private OzoneConfiguration conf = new OzoneConfiguration(); @Test - public void normal() { + public void normal() throws Exception { //GIVEN ContainerSet set = new ContainerSet(); @@ -54,7 +55,6 @@ public class TestReplicationSupervisor { .collect(Collectors.toList()); try { - supervisor.start(); //WHEN supervisor.addTask(new ReplicationTask(1L, datanodes)); supervisor.addTask(new ReplicationTask(1L, datanodes)); @@ -62,16 +62,11 @@ public class TestReplicationSupervisor { supervisor.addTask(new ReplicationTask(2L, datanodes)); supervisor.addTask(new ReplicationTask(2L, datanodes)); supervisor.addTask(new ReplicationTask(3L, datanodes)); - try { - Thread.sleep(300); - } catch (InterruptedException e) { - e.printStackTrace(); - } //THEN - System.out.println(replicator.replicated.get(0)); + LambdaTestUtils.await(200_000, 1000, + () -> supervisor.getInFlightReplications() == 0); - Assert - .assertEquals(3, replicator.replicated.size()); + Assert.assertEquals(3, replicator.replicated.size()); } finally { supervisor.stop(); @@ -79,7 +74,7 @@ public class TestReplicationSupervisor { } @Test - public void duplicateMessageAfterAWhile() throws InterruptedException { + public void duplicateMessageAfterAWhile() throws Exception { //GIVEN ContainerSet set = new ContainerSet(); @@ -92,22 +87,18 @@ public class TestReplicationSupervisor { .collect(Collectors.toList()); try { - supervisor.start(); //WHEN supervisor.addTask(new ReplicationTask(1L, datanodes)); - Thread.sleep(400); + LambdaTestUtils.await(200_000, 1000, + () -> supervisor.getInFlightReplications() == 0); supervisor.addTask(new ReplicationTask(1L, datanodes)); - Thread.sleep(300); + LambdaTestUtils.await(200_000, 1000, + () -> supervisor.getInFlightReplications() == 0); //THEN System.out.println(replicator.replicated.get(0)); - Assert - .assertEquals(1, replicator.replicated.size()); - - //the last item is still in the queue as we cleanup the queue during the - // selection - Assert.assertEquals(1, supervisor.getQueueSize()); + Assert.assertEquals(1, replicator.replicated.size()); } finally { supervisor.stop(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org