Repository: giraph Updated Branches: refs/heads/trunk 43909035c -> 6ee97e77e
GIRAPH-998: Close writers in parallel (majaakbiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6ee97e77 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6ee97e77 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6ee97e77 Branch: refs/heads/trunk Commit: 6ee97e77eb1f45699d2f34fc2e28c89b6fd6c1b3 Parents: 4390903 Author: Maja Kabiljo <[email protected]> Authored: Tue Mar 24 15:47:44 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Mar 24 15:47:44 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../MultiThreadedSuperstepOutput.java | 32 ++++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/6ee97e77/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d2e74e2..61cb3ce 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-998: Close writers in parallel (majaakbiljo) + GIRAPH-999: Add support for Mapping multi-input formats (dlogothetis via majakabiljo) GIRAPH-997: Upgrade findbugs to 3.0.0 (dlogothetis via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/6ee97e77/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java index 452d93a..b54ee64 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java @@ -22,6 +22,8 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.SimpleVertexWriter; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.VertexWriter; +import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.ProgressableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -32,6 +34,7 @@ import com.google.common.collect.Sets; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; /** * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is @@ -116,8 +119,31 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable, occupiedVertexWriters.size() + " vertex writers were not returned!"); } - for (VertexWriter<I, V, E> vertexWriter : availableVertexWriters) { - vertexWriter.close(context); - } + + // Closing writers can take time - use multiple threads and call progress + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + while (true) { + VertexWriter<I, V, E> vertexWriter; + synchronized (availableVertexWriters) { + if (availableVertexWriters.isEmpty()) { + return null; + } + vertexWriter = availableVertexWriters.remove( + availableVertexWriters.size() - 1); + } + vertexWriter.close(context); + } + } + }; + } + }; + ProgressableUtils.getResultsWithNCallables(callableFactory, + Math.min(configuration.getNumOutputThreads(), + availableVertexWriters.size()), "close-writers-%d", context); } }
