Repository: giraph
Updated Branches:
  refs/heads/trunk 43909035c -> 6ee97e77e


GIRAPH-998: Close writers in parallel (majaakbiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6ee97e77
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6ee97e77
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6ee97e77

Branch: refs/heads/trunk
Commit: 6ee97e77eb1f45699d2f34fc2e28c89b6fd6c1b3
Parents: 4390903
Author: Maja Kabiljo <[email protected]>
Authored: Tue Mar 24 15:47:44 2015 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Tue Mar 24 15:47:44 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 ++
 .../MultiThreadedSuperstepOutput.java           | 32 ++++++++++++++++++--
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6ee97e77/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d2e74e2..61cb3ce 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-998: Close writers in parallel (majaakbiljo)
+
   GIRAPH-999: Add support for Mapping multi-input formats (dlogothetis via 
majakabiljo)
 
   GIRAPH-997: Upgrade findbugs to 3.0.0 (dlogothetis via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/6ee97e77/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
 
b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
index 452d93a..b54ee64 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -22,6 +22,8 @@ import 
org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.SimpleVertexWriter;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -32,6 +34,7 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 /**
  * Class to use as {@link SuperstepOutput} when chosen VertexOutputFormat is
@@ -116,8 +119,31 @@ public class MultiThreadedSuperstepOutput<I extends 
WritableComparable,
           occupiedVertexWriters.size() +
           " vertex writers were not returned!");
     }
-    for (VertexWriter<I, V, E> vertexWriter : availableVertexWriters) {
-      vertexWriter.close(context);
-    }
+
+    // Closing writers can take time - use multiple threads and call progress
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            while (true) {
+              VertexWriter<I, V, E> vertexWriter;
+              synchronized (availableVertexWriters) {
+                if (availableVertexWriters.isEmpty()) {
+                  return null;
+                }
+                vertexWriter = availableVertexWriters.remove(
+                    availableVertexWriters.size() - 1);
+              }
+              vertexWriter.close(context);
+            }
+          }
+        };
+      }
+    };
+    ProgressableUtils.getResultsWithNCallables(callableFactory,
+        Math.min(configuration.getNumOutputThreads(),
+            availableVertexWriters.size()), "close-writers-%d", context);
   }
 }

Reply via email to