AMBARI-13248: Parallel library should process all futures even if one of them throws an exception (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/474e4086 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/474e4086 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/474e4086 Branch: refs/heads/branch-dev-patch-upgrade Commit: 474e40862f0e66d12d333d83509453ab5c8728e6 Parents: 156afda Author: Jayush Luniya <[email protected]> Authored: Sat Sep 26 11:27:32 2015 -0700 Committer: Jayush Luniya <[email protected]> Committed: Sat Sep 26 11:27:32 2015 -0700 ---------------------------------------------------------------------- .../apache/ambari/server/utils/Parallel.java | 30 ++++++++++------ .../ambari/server/utils/TestParallel.java | 37 +++++++++++++++++++- 2 files changed, 56 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/474e4086/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java index c6e2156..0a3e6c4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -185,11 +186,16 @@ public class Parallel { boolean completed = true; R[] result = (R[]) new Object[futures.size()]; - try { - for (int i = 0; i < futures.size(); i++) { - Future<ResultWrapper<R>> futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS); + for (int i = 0; i < futures.size(); i++) { + try { + Future<ResultWrapper<R>> futureResult = null; + try { + futureResult = completionService.poll(POLL_DURATION_MILLISECONDS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.error("Caught InterruptedException in Parallel.forLoop", e); + } if (futureResult == null) { - // Time out! no progress was made during the last poll duration. Abort the threads and cancel the threads. + // Timed out! no progress was made during the last poll duration. Abort the threads and cancel the threads. LOG.error("Completion service in Parallel.forLoop timed out!"); completed = false; for(int fIndex = 0; fIndex < futures.size(); fIndex++) { @@ -204,6 +210,7 @@ public class Parallel { LOG.debug(" Task - {} successfully cancelled", fIndex); } } + // Finished processing all futures break; } else { ResultWrapper<R> res = futureResult.get(); @@ -214,13 +221,16 @@ public class Parallel { completed = false; } } + } catch (InterruptedException e) { + LOG.error("Caught InterruptedException in Parallel.forLoop", e); + completed = false; + } catch (ExecutionException e) { + LOG.error("Caught ExecutionException in Parallel.forLoop", e); + completed = false; + } catch (CancellationException e) { + LOG.error("Caught CancellationException in Parallel.forLoop", e); + completed = false; } - } catch (InterruptedException e) { - LOG.error("Caught InterruptedException in Parallel.forLoop", e); - completed = false; - } catch (ExecutionException e) { - LOG.error("Caught ExecutionException in Parallel.forLoop", e); - completed = false; } // Return parallel loop result return new ParallelLoopResult<R>(completed, Arrays.asList(result)); http://git-wip-us.apache.org/repos/asf/ambari/blob/474e4086/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java index 0628f20..bfeb446 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java @@ -109,7 +109,7 @@ public class TestParallel { * @throws Exception */ @Test - public void testNestedParallelForLoopIterationFailures() throws Exception { + public void testNestedParallelForLoop() throws Exception { final List<Integer> input = new LinkedList<Integer>(); for(int i = 0; i < 10; i++) { input.add(i); @@ -185,4 +185,39 @@ public class TestParallel { } } } + + /** + * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration exceptions + * @throws Exception + */ + @Test + public void testParallelForLoopIterationExceptions() throws Exception { + final List<Integer> input = new LinkedList<Integer>(); + for(int i = 0; i < 10; i++) { + input.add(i); + } + final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7}); + ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new LoopBody<Integer, Integer>() { + @Override + public Integer run(Integer in1) { + if(failForList.contains(in1)) { + throw new RuntimeException("Ignore this exception"); + } + return in1 * in1; + } + }); + Assert.assertFalse(loopResult.getIsCompleted()); + Assert.assertNotNull(loopResult.getResult()); + List<Integer> output = loopResult.getResult(); + Assert.assertEquals(input.size(), output.size()); + + for(int i = 0; i < input.size(); i++) { + if(failForList.contains(i)) { + Assert.assertNull(output.get(i)); + output.set(i, i * i); + } else { + Assert.assertEquals(i * i, (int) output.get(i)); + } + } + } }
