[GIRAPH-1013] Adding more libraries, algos and examples

Summary:
Adding more libraries, algos and examples

Only changes from our internal state:

New classes:
PairReduce
MaxMessageCombiner
PartitioningStats
TestMessageChain

Change to:
Pieces
SendMessageChain

Test Plan: mvn clean install -Phadoop_facebook

Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov

Reviewed By: sergey.edunov

Differential Revision: https://reviews.facebook.net/D40935


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/3b7c68e5
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/3b7c68e5
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/3b7c68e5

Branch: refs/heads/trunk
Commit: 3b7c68e54a76d02ae721161b1b81c15c2e22e44a
Parents: d7e4bde
Author: Igor Kabiljo <[email protected]>
Authored: Mon Jun 29 23:20:58 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Thu Jul 2 16:32:41 2015 -0700

----------------------------------------------------------------------
 .../library/algo/BreadthFirstSearch.java        | 202 +++++++++++
 .../library/algo/DistributedIndependentSet.java | 337 ++++++++++++++++++
 .../block_app/library/algo/package-info.java    |  21 ++
 .../library/coarsening/CoarseningUtils.java     | 347 +++++++++++++++++++
 .../library/coarsening/package-info.java        |  21 ++
 .../library/stats/DirectedGraphStats.java       | 187 ++++++++++
 .../library/stats/PartitioningStats.java        | 125 +++++++
 .../block_app/library/stats/package-info.java   |  21 ++
 .../AbstractPageRankExampleBlockFactory.java    |  54 +++
 .../pagerank/PageRankExampleBlockFactory.java   |  47 +++
 ...eRankWithConvergenceExampleBlockFactory.java |  70 ++++
 ...PiecesAndConvergenceExampleBlockFactory.java | 146 ++++++++
 .../PageRankWithPiecesExampleBlockFactory.java  |  83 +++++
 ...ansferAndConvergenceExampleBlockFactory.java |  74 ++++
 .../examples/pagerank/TestPageRankExample.java  | 127 +++++++
 .../block_app/library/TestMessageChain.java     | 224 ++++++++++++
 .../algo/BreadthFirstSearchBlockFactory.java    |  79 +++++
 .../algo/BreadthFirstSearchVertexValue.java     |  61 ++++
 .../DistributedIndependentSetBlockFactory.java  |  60 ++++
 .../DistributedIndependentSetVertexValue.java   |  56 +++
 .../library/algo/TestBreadthFirstSearch.java    | 165 +++++++++
 .../algo/TestDistributedIndependentSet.java     | 220 ++++++++++++
 .../library/coarsening/TestCoarseningUtils.java | 133 +++++++
 .../kryo/KryoWritableWrapperJava8Test.java      | 174 ++++++++++
 .../framework/api/local/LocalBlockRunner.java   |   2 +
 .../apache/giraph/block_app/library/Pieces.java |  33 +-
 .../block_app/library/SendMessageChain.java     |  72 +++-
 .../block_app/library/gc/WorkerGCPiece.java     |  42 +++
 .../block_app/library/gc/package-info.java      |  21 ++
 .../function/primitive/Double2ObjFunction.java  |  36 ++
 .../function/primitive/DoubleConsumer.java      |  32 ++
 .../giraph/combiner/MaxMessageCombiner.java     |  74 ++++
 .../apache/giraph/reducers/impl/PairReduce.java | 104 ++++++
 33 files changed, 3433 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
new file mode 100644
index 0000000..fe290fb
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Class for running breadth-first search on the graph.
+ *
+ * Graph is expected to be symmetric before calling any of the methods here.
+ */
+public class BreadthFirstSearch {
+  private static final Logger LOG = Logger.getLogger(BreadthFirstSearch.class);
+  private static final IntWritable NOT_REACHABLE_VERTEX_VALUE =
+      new IntWritable(-1);
+
+  private BreadthFirstSearch() {
+  }
+
+  /**
+   * Default block, which calculates connected components using the vertex's
+   * default edges.
+   */
+  public static <I extends WritableComparable, V extends Writable>
+  Block bfs(
+    SupplierFromVertex<I, V, Writable, Boolean> isVertexInSeedSet,
+    SupplierFromVertex<I, V, Writable, IntWritable> getDistance,
+    ConsumerWithVertex<I, V, Writable, IntWritable> setDistance
+  ) {
+    ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
+    ObjectTransfer<Boolean> vertexUpdatedDistance = new ObjectTransfer<>();
+
+    return new SequenceBlock(
+      createInitializePiece(
+        vertexUpdatedDistance,
+        isVertexInSeedSet,
+        getDistance,
+        setDistance,
+        VertexSuppliers.vertexEdgesSupplier()
+      ),
+      RepeatUntilBlock.unlimited(
+        createPropagateConnectedComponentsPiece(
+          vertexUpdatedDistance,
+          vertexUpdatedDistance,
+          converged,
+          getDistance,
+          setDistance,
+          VertexSuppliers.vertexEdgesSupplier()
+        ),
+        converged
+      )
+    );
+  }
+
+  /**
+   * Initialize vertex values for connected components calculation
+   */
+  private static <I extends WritableComparable, V extends Writable>
+  Piece<I, V, Writable, NoMessage, Object> createInitializePiece(
+    Consumer<Boolean> vertexUpdatedDistance,
+    SupplierFromVertex<I, V, Writable, Boolean> isVertexInSeedSet,
+    SupplierFromVertex<I, V, Writable, IntWritable> getDistance,
+    ConsumerWithVertex<I, V, Writable, IntWritable> setDistance,
+    SupplierFromVertex<I, V, Writable, ? extends Iterable<? extends Edge<I, 
?>>>
+      edgeSupplier
+  ) {
+    IntWritable zero = new IntWritable(0);
+    return Pieces.forAllVerticesOnReceive(
+      "InitializeBFS",
+      (vertex) -> {
+        if (isVertexInSeedSet.get(vertex)) {
+          setDistance.apply(vertex, zero);
+          vertexUpdatedDistance.apply(true);
+        } else {
+          setDistance.apply(vertex, NOT_REACHABLE_VERTEX_VALUE);
+          vertexUpdatedDistance.apply(false);
+        }
+      }
+    );
+  }
+
+  /**
+   * Propagate connected components to neighbor pieces
+   */
+  private static <I extends WritableComparable, V extends Writable>
+  Block createPropagateConnectedComponentsPiece(
+      Supplier<Boolean> vertexToPropagate,
+      Consumer<Boolean> vertexUpdatedDistance,
+      Consumer<Boolean> converged,
+      SupplierFromVertex<I, V, Writable, IntWritable> getDistance,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setDistance,
+      SupplierFromVertex<I, V, Writable, ? extends Iterable<?
+        extends Edge<I, ?>>> edgeSupplier) {
+    return new Piece<I, V, Writable, IntWritable, Object>() {
+      private ReducerHandle<IntWritable, IntWritable> propagatedAggregator;
+
+      @Override
+      public void registerReducers(
+          CreateReducersApi reduceApi, Object executionStage) {
+        propagatedAggregator = reduceApi.createLocalReducer(SumReduce.INT);
+      }
+
+      @Override
+      public VertexSender<I, V, Writable> getVertexSender(
+        BlockWorkerSendApi<I, V, Writable, IntWritable> workerApi,
+        Object executionStage
+      ) {
+        return (vertex) -> {
+          if (vertexToPropagate.get()) {
+            workerApi.sendMessageToMultipleEdges(
+              Iterators.transform(
+                edgeSupplier.get(vertex).iterator(),
+                Edge::getTargetVertexId
+              ),
+              getDistance.get(vertex)
+            );
+            reduceInt(propagatedAggregator, 1);
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        converged.apply(
+            propagatedAggregator.getReducedValue(master).get() == 0);
+        LOG.info("BFS: " + propagatedAggregator.getReducedValue(master).get() +
+                 " many vertices sent in this iteration");
+      }
+
+      @Override
+      public VertexReceiver<I, V, Writable, IntWritable> getVertexReceiver(
+        BlockWorkerReceiveApi<I> workerApi,
+        Object executionStage
+      ) {
+        IntWritable next = new IntWritable();
+        return (vertex, messages) -> {
+          vertexUpdatedDistance.apply(false);
+          for (IntWritable receivedValue : messages) {
+            IntWritable currentValue = getDistance.get(vertex);
+            next.set(receivedValue.get() + 1);
+            if (currentValue.compareTo(NOT_REACHABLE_VERTEX_VALUE) == 0 ||
+                currentValue.compareTo(next) > 0) {
+              setDistance.apply(vertex, next);
+              vertexUpdatedDistance.apply(true);
+            }
+          }
+        };
+      }
+
+      @Override
+      public Class<IntWritable> getMessageClass() {
+        return IntWritable.class;
+      }
+
+      @Override
+      public String toString() {
+        return "PropagateConnectedComponentsPiece";
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java
new file mode 100644
index 0000000..8beef59
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.algo;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Class for computing maximal independent sets of a graph.
+ *
+ * Graph is expected to be symmetric before calling methods here.
+ */
+public class DistributedIndependentSet {
+  private static final Logger LOG =
+      Logger.getLogger(DistributedIndependentSet.class);
+  private static final IntWritable UNKNOWN = new IntWritable(-1);
+  private static final IntWritable NOT_IN_SET = new IntWritable(-2);
+  private static final IntWritable IN_SET = new IntWritable(-3);
+
+  private DistributedIndependentSet() {
+  }
+
+  /**
+   * Default block which decomposes the input graph into independent sets of
+   * vertices. An independent set of vertices is defined as a set of vertices
+   * that do not have edge to each other, i.e. the sub-graph induced by an
+   * independent set of vertices is an empty graph.
+   *
+   * The algorithm finds independent sets as follows:
+   *   1) Find the maximal independent set of vertices amongst unassigned
+   *      vertices.
+   *   2) Assign found vertices to a new independent set.
+   *   3) If all vertices of the graph are assigned, go to step 1. Otherwise
+   *      terminate.
+   *
+   * @param numberClass Independent set id type
+   * @param chooseNumber Process that returns a deterministic id of type
+   *                     <code>numberClass</code> for each vertex
+   * @param getIndependentSet Getter for independent set id of each vertex
+   * @param setIndependentSet Setter for independent set id of each vertex
+   */
+  public static
+  <I extends WritableComparable, V extends Writable,
+  N extends WritableComparable>
+  Block independentSets(
+      Class<N> numberClass,
+      SupplierFromVertex<I, V, Writable, N> chooseNumber,
+      SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet
+  ) {
+    ObjectTransfer<Boolean> done = new ObjectTransfer<>();
+    IntRef iteration = new IntRef(-1);
+
+    return new SequenceBlock(
+        Pieces.<I, V, Writable>forAllVerticesOnReceive(
+            "InitializeIndependentSet",
+            (vertex) -> setIndependentSet.apply(vertex, UNKNOWN)),
+        RepeatUntilBlock.unlimited(
+            // find maximal independent sets amongst remaining un-assigned
+            // vertices, one after another, until all vertices of the graph
+            // are assigned to independent sets.
+            findMaximalIndependentSet(
+                numberClass,
+                chooseNumber,
+                getIndependentSet,
+                setIndependentSet,
+                iteration, done
+            ),
+            done
+        )
+    );
+  }
+
+  /**
+   * Independent set calculation with vertex ids as messages to choose the
+   * vertices in a set.
+   */
+  public static <I extends WritableComparable, V extends Writable>
+  Block independentSets(
+      Class<I> vertexIdClass,
+      SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet
+  ) {
+    return independentSets(
+        vertexIdClass,
+        VertexSuppliers.vertexIdSupplier(),
+        getIndependentSet,
+        setIndependentSet
+    );
+  }
+
+  /**
+     * Finds a maximal independent set, amongst un-assigned vertices of the
+     * graph. The algorithm is as follows:
+     *   1) Normalize the state of all un-assigned vertices of the graph to
+     *      UNKNOWN.
+     *   2) Each UNKNOWN vertex sends a number (output of <code>chooseNumber
+     *      </code>) to all its neighbors.
+     *   3) Each UNKNOWN vertex finds the maximum value of all incoming
+     *      messages. If the max value is less than the chosen number for the
+     *      vertex (output of <code>chooseNumber</code> in step 2), the vertex
+     *      assigns itself to the independent set (changes its state to 
IN_SET),
+     *      and sends 'ack' messages to all its neighbors.
+     *   4) Each UNKNOWN vertex that receives an 'ack' message, changes its
+     *      state to NOT_IN_SET.
+     *   5) If there are any UNKNOWN vertex, go to step 2. Otherwise, we found
+     *      the maximal independent set, and terminate.
+     */
+  private static
+  <I extends WritableComparable, V extends Writable,
+  N extends WritableComparable>
+  Block findMaximalIndependentSet(
+      Class<N> numberClass,
+      SupplierFromVertex<I, V, Writable, N> chooseNumber,
+      SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet,
+      IntRef iteration, ObjectTransfer<Boolean> done
+  ) {
+    ObjectTransfer<Boolean> foundMIS = new ObjectTransfer<>();
+    return new SequenceBlock(
+        createInitializePiece(getIndependentSet, setIndependentSet, iteration),
+        RepeatUntilBlock.unlimited(
+            new SequenceBlock(
+                // Announce the number to all the neighbors.
+                createAnnounceBlock(
+                    numberClass,
+                    chooseNumber,
+                    getIndependentSet,
+                    setIndependentSet
+                ),
+                // Select the vertices in the independent set, and refine the
+                // state of neighboring vertices so not to consider them for
+                // this independent set.
+                createSelectAndRefinePiece(
+                    getIndependentSet,
+                    setIndependentSet,
+                    iteration,
+                    foundMIS,
+                    done
+                )
+            ),
+            foundMIS
+        )
+    );
+  }
+
+  /**
+   * Piece to initialize vertex values so they are either assigned to a
+   * previously found independent set, or UNKNOWN to be considered for the
+   * discovery of the current independent set.
+   */
+  private static <I extends WritableComparable, V extends Writable>
+  Block createInitializePiece(
+      SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet,
+      IntRef iteration) {
+    return new Piece<I, V, Writable, NoMessage, Object>() {
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        iteration.value++;
+        LOG.info("Start finding independent set with ID " + iteration.value);
+      }
+
+      @Override
+      public VertexReceiver<I, V, Writable, NoMessage> getVertexReceiver(
+          final BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        return (vertex, messages) -> {
+          if (getIndependentSet.get(vertex).equals(NOT_IN_SET)) {
+            setIndependentSet.apply(vertex, UNKNOWN);
+          }
+        };
+      }
+
+      @Override
+      public String toString() {
+        return "InitializePiece";
+      }
+    };
+  }
+
+  /**
+   * A Block that assigns some of the UNKNOWN vertices to the new independent
+   * set based on numbers each vertex broadcast to all its neighbors.
+   */
+  private static
+  <I extends WritableComparable, V extends Writable,
+  N extends WritableComparable>
+  Block createAnnounceBlock(
+      Class<N> numberClass,
+      SupplierFromVertex<I, V, Writable, N> chooseNumber,
+      SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet) {
+    return Pieces.<I, V, Writable, N>sendMessageToNeighbors(
+        "AnnounceNumber",
+        numberClass,
+        (vertex) -> {
+          if (getIndependentSet.get(vertex).equals(UNKNOWN)) {
+            return chooseNumber.get(vertex);
+          } else {
+            return null;
+          }
+        },
+        (vertex, messages) -> {
+          if (getIndependentSet.get(vertex).equals(UNKNOWN)) {
+            N curVal = chooseNumber.get(vertex);
+            boolean myValIsMax = true;
+            for (N receivedValue : messages) {
+              if (receivedValue.compareTo(curVal) > 0) {
+                myValIsMax = false;
+                break;
+              }
+            }
+            if (myValIsMax) {
+              setIndependentSet.apply(vertex, IN_SET);
+            }
+          }
+        }
+    );
+  }
+
+  /**
+   * Piece to confirm selection of some vertices for the independent set. Also,
+   * changes the state of neighboring vertices of newly assigned vertices to
+   * NOT_IN_SET, so not to consider them for the discovery of the current
+   * independent set.
+   *
+   * @param foundMIS Specifies the end of discovery for current independent 
set.
+   * @param done Specifies the end of whole computation of decomposing to
+   *             independent sets.
+   */
+  private static <I extends WritableComparable, V extends Writable>
+  Block createSelectAndRefinePiece(
+      SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet,
+      ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet,
+      IntRef iteration,
+      Consumer<Boolean> foundMIS,
+      Consumer<Boolean> done) {
+    return new Piece<I, V, Writable, BooleanWritable, Object>() {
+      private ReducerHandle<IntWritable, IntWritable> numVerticesUnknown;
+      private ReducerHandle<IntWritable, IntWritable> numVerticesNotAssigned;
+
+      @Override
+      public void registerReducers(
+          CreateReducersApi reduceApi, Object executionStage) {
+        numVerticesUnknown = reduceApi.createLocalReducer(SumReduce.INT);
+        numVerticesNotAssigned = reduceApi.createLocalReducer(SumReduce.INT);
+      }
+
+      @Override
+      public VertexSender<I, V, Writable> getVertexSender(
+          final BlockWorkerSendApi<I, V, Writable, BooleanWritable> workerApi,
+          Object executionStage) {
+        BooleanWritable ack = new BooleanWritable(true);
+        IntWritable one = new IntWritable(1);
+        return (vertex) -> {
+          IntWritable vertexState = getIndependentSet.get(vertex);
+          if (vertexState.equals(IN_SET)) {
+            setIndependentSet.apply(vertex, new IntWritable(iteration.value));
+            workerApi.sendMessageToAllEdges(vertex, ack);
+          } else if (vertexState.equals(UNKNOWN)) {
+            numVerticesUnknown.reduce(one);
+            numVerticesNotAssigned.reduce(one);
+          } else if (vertexState.equals(NOT_IN_SET)) {
+            numVerticesNotAssigned.reduce(one);
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        done.apply(numVerticesNotAssigned.getReducedValue(master).get() == 0);
+        foundMIS.apply(numVerticesUnknown.getReducedValue(master).get() == 0);
+      }
+
+      @Override
+      public VertexReceiver<I, V, Writable, BooleanWritable> getVertexReceiver(
+          final BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        return (vertex, messages) -> {
+          if (getIndependentSet.get(vertex).equals(UNKNOWN) &&
+              Iterables.size(messages) > 0) {
+            setIndependentSet.apply(vertex, NOT_IN_SET);
+          }
+        };
+      }
+
+      @Override
+      public Class<BooleanWritable> getMessageClass() {
+        return BooleanWritable.class;
+      }
+
+      @Override
+      public String toString() {
+        return "SelectAndRefinePiece";
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java
new file mode 100644
index 0000000..03a550a
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utility algorithms
+ */
+package org.apache.giraph.block_app.library.algo;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java
new file mode 100644
index 0000000..c74a056
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.coarsening;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.WritableWriter;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.writable.tuple.PairWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Utilities class that creates a new coarsened graph, while keeping the
+ * original graph, allowing us to execute things on either representation
+ * of the graph.
+ */
+public class CoarseningUtils {
+
+  private CoarseningUtils() { }
+
+  //CHECKSTYLE: stop ParameterNumberCheck
+
+  /**
+   * Create block that creates coarsened graph from given set of vertices
+   * (original vertices, that return true for originalNodesChecker).
+   *
+   * Old graph is not deleted, if needed, you can trivially do that by calling
+   * Pieces.removeVertices(originalNodesChecker) after this block.
+   *
+   *
+   * @param idTypeOps Type ops for vertex ids
+   * @param edgeTypeOps Type ops for edge values
+   * @param coarsenedVertexIdSupplier Supplier called on original nodes,
+   *                                  that should return it's coarsened vertex
+   *                                  id
+   * @param vertexInfoClass Part of the vertex value that is needed to create
+   *                        coarsened vertices
+   * @param vertexInfoSupplier Supplier providing part of the vertex value that
+   *                           is needed to create coarsened vertices
+   * @param vertexInfoCombiner Combiner that aggregates individual vertex info
+   *                           values into coarsened value
+   * @param coarsenedVertexValueInitializer Function that initializes coarsened
+   *                                        vertex value, from an aggregated
+   *                                        vertex info of original vertex
+   *                                        values that are coarsened into this
+   *                                        vertex
+   * @param edgeCoarseningCombiner Combiner that aggregates individual edge
+   *                               values into coarsened edge value
+   * @param originalNodesChecker Vertices with id that this gives true for will
+   *                             be coarsened
+   * @param clusterNodesChecker Should return true only for coarsened ids
+   *                            (ones returned by coarsenedVertexIdSupplier)
+   * @return Block that does coarsening
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable,
+  VI extends Writable>
+  Block createCustomCoarseningBlock(
+      PrimitiveIdTypeOps<I> idTypeOps,
+      TypeOps<E> edgeTypeOps,
+      SupplierFromVertex<I, V, E, I> coarsenedVertexIdSupplier,
+      Class<VI> vertexInfoClass,
+      SupplierFromVertex<I, V, E, VI> vertexInfoSupplier,
+      MessageCombiner<? super I, VI> vertexInfoCombiner,
+      ConsumerWithVertex<I, V, E, VI> coarsenedVertexValueInitializer,
+      MessageCombiner<? super I, E> edgeCoarseningCombiner,
+      Function<I, Boolean> originalNodesChecker,
+      Function<I, Boolean> clusterNodesChecker) {
+
+  //CHECKSTYLE: resume ParameterNumberCheck
+
+    ObjectTransfer<Iterable<PairWritable<I, E>>> edgesHolder =
+        new ObjectTransfer<>();
+
+    MessageValueFactory<PairWritable<I, E>> pairMessageFactory =
+        () -> new PairWritable<I, E>(
+            idTypeOps.create(), edgeTypeOps.create());
+    Piece<I, V, E, PairWritable<I, E>, Object> calcWeightsOnEdgesPiece =
+        new Piece<I, V, E, PairWritable<I, E>, Object>() {
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, PairWritable<I, E>> workerApi,
+          Object executionStage) {
+        PairWritable<I, E> message = pairMessageFactory.newInstance();
+        return (vertex) -> {
+          idTypeOps.set(
+              message.getLeft(), coarsenedVertexIdSupplier.get(vertex));
+          for (Edge<I, E> edge : vertex.getEdges()) {
+            edgeTypeOps.set(message.getRight(), edge.getValue());
+            workerApi.sendMessage(edge.getTargetVertexId(), message);
+          }
+        };
+      }
+
+      @Override
+      public VertexReceiver<I, V, E, PairWritable<I, E>> getVertexReceiver(
+          BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        return (vertex, messages) -> {
+          edgesHolder.apply(messages);
+        };
+      }
+
+      @Override
+      protected MessageValueFactory<PairWritable<I, E>> getMessageFactory(
+          ImmutableClassesGiraphConfiguration conf) {
+        return pairMessageFactory;
+      }
+
+      @Override
+      public String toString() {
+        return "CoarseningCalcWeightsOnEdges";
+      }
+    };
+
+    WritableWriter<E> edgeValueWriter = new WritableWriter<E>() {
+      @Override
+      public void write(DataOutput out, E value) throws IOException {
+        value.write(out);
+      }
+
+      @Override
+      public E readFields(DataInput in) throws IOException {
+        E edge = edgeTypeOps.create();
+        edge.readFields(in);
+        return edge;
+      }
+    };
+
+    Piece<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>, Object>
+    createNewVerticesPiece =
+        new Piece<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>, Object>() {
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>>
+            workerApi,
+          Object executionStage) {
+        return (vertex) -> {
+          Basic2ObjectMap<I, E> map =
+              idTypeOps.create2ObjectOpenHashMap(edgeValueWriter);
+          for (PairWritable<I, E> message : edgesHolder.get()) {
+
+            E value = map.get(message.getLeft());
+            if (value == null) {
+              value = edgeCoarseningCombiner.createInitialMessage();
+              map.put(message.getLeft(), value);
+            }
+
+            edgeCoarseningCombiner.combine(
+                message.getLeft(), value, message.getRight());
+          }
+
+          workerApi.sendMessage(
+              coarsenedVertexIdSupplier.get(vertex),
+              new PairWritable<>(vertexInfoSupplier.get(vertex), map));
+        };
+      }
+
+      @Override
+      public
+      VertexReceiver<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>>
+      getVertexReceiver(
+          BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        return (vertex, messages) -> {
+          VI vertexInfo = vertexInfoCombiner.createInitialMessage();
+          Basic2ObjectMap<I, E> map = idTypeOps.create2ObjectOpenHashMap(null);
+          for (PairWritable<VI, Basic2ObjectMap<I, E>> message : messages) {
+            vertexInfoCombiner.combine(
+                vertex.getId(), vertexInfo, message.getLeft());
+
+
+            for (Iterator<I> iter = message.getRight().fastKeyIterator();
+                iter.hasNext();) {
+              I key = iter.next();
+              E value = map.get(key);
+              if (value == null) {
+                value = edgeCoarseningCombiner.createInitialMessage();
+                map.put(key, value);
+              }
+              edgeCoarseningCombiner.combine(
+                  vertex.getId(), value, message.getRight().get(key));
+            }
+          }
+
+          for (Iterator<I> iter = map.fastKeyIterator(); iter.hasNext();) {
+            I key = iter.next();
+            Edge<I, E> edge = EdgeFactory.create(
+                idTypeOps.createCopy(key), map.get(key));
+            vertex.addEdge(edge);
+          }
+
+          coarsenedVertexValueInitializer.apply(vertex, vertexInfo);
+        };
+      }
+
+      @Override
+      protected MessageValueFactory<PairWritable<VI, Basic2ObjectMap<I, E>>>
+      getMessageFactory(
+          ImmutableClassesGiraphConfiguration conf) {
+        return () -> new PairWritable<>(
+            ReflectionUtils.newInstance(vertexInfoClass),
+            idTypeOps.create2ObjectOpenHashMap(edgeValueWriter));
+      }
+
+      @Override
+      public String toString() {
+        return "CoarseningCreateNewVertices";
+      }
+    };
+
+    return new SequenceBlock(
+        new FilteringPiece<>(
+            (vertex) -> originalNodesChecker.apply(vertex.getId()),
+            calcWeightsOnEdgesPiece),
+        new FilteringPiece<>(
+            (vertex) -> originalNodesChecker.apply(vertex.getId()),
+            (vertex) -> clusterNodesChecker.apply(vertex.getId()),
+            createNewVerticesPiece));
+  }
+
+  /**
+   * Create block that creates coarsened graph from given set of vertices
+   * (original vertices, that return true for originalNodesChecker), when
+   * vertex and edge values are primitives (or TypeOps exist for them).
+   * Coarsening vertex values and edge values are computed as sum of their
+   * individual values.
+   *
+   * Old graph is not deleted, if needed, you can trivially do that by calling
+   * Pieces.removeVertices(originalNodesChecker) after this block.
+   *
+   * @param idTypeOps Vertex id TypeOps
+   * @param valueTypeOps Vertex value TypeOps
+   * @param edgeTypeOps Edge value TypeOps
+   * @param coarsenedVertexIdSupplier Supplier called on original nodes,
+   *                                  that should return it's coarsened vertex
+   *                                  id
+   * @param originalNodesChecker Vertices with id that this gives true for will
+   *                             be coarsened
+   * @param clusterNodesChecker Should return true only for coarsened ids
+   *                            (ones returned by coarsenedVertexIdSupplier)
+   * @return Block that does coarsening
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Block createCoarseningBlock(
+      PrimitiveIdTypeOps<I> idTypeOps,
+      NumericTypeOps<V> valueTypeOps,
+      NumericTypeOps<E> edgeTypeOps,
+      SupplierFromVertex<I, V, E, I> coarsenedVertexIdSupplier,
+      Function<I, Boolean> originalNodesChecker,
+      Function<I, Boolean> clusterNodesChecker) {
+    return CoarseningUtils.<I, V, E, V>createCustomCoarseningBlock(
+        idTypeOps,
+        edgeTypeOps,
+        coarsenedVertexIdSupplier,
+        valueTypeOps.getTypeClass(),
+        VertexSuppliers.vertexValueSupplier(),
+        new MessageCombiner<I, V>() {
+          @Override
+          public void combine(
+              I vertexIndex, V originalMessage, V messageToCombine) {
+            valueTypeOps.plusInto(originalMessage, messageToCombine);
+          }
+
+          @Override
+          public V createInitialMessage() {
+            return valueTypeOps.createZero();
+          }
+        },
+        (vertex, value) -> valueTypeOps.set(vertex.getValue(), value),
+        new MessageCombiner<I, E>() {
+          @Override
+          public void combine(
+              I vertexIndex, E originalMessage, E messageToCombine) {
+            edgeTypeOps.plusInto(originalMessage, messageToCombine);
+          }
+
+          @Override
+          public E createInitialMessage() {
+            return edgeTypeOps.createZero();
+          }
+        },
+        originalNodesChecker,
+        clusterNodesChecker);
+  }
+
+  /**
+   * Uses configuration to figure out vertex id, value and edge value types,
+   * and calls above createCoarseningBlock with it, look at it's
+   * documentation for more details.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Block createCoarseningBlock(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      SupplierFromVertex<I, V, E, I> coarsenedVertexIdSupplier,
+      Function<I, Boolean> originalNodesChecker,
+      Function<I, Boolean> clusterNodesChecker) {
+    return createCoarseningBlock(
+        TypeOpsUtils.getPrimitiveIdTypeOps(conf.getVertexIdClass()),
+        (NumericTypeOps<V>)
+          TypeOpsUtils.getTypeOps(conf.getVertexValueClass()),
+        (NumericTypeOps<E>) TypeOpsUtils.getTypeOps(conf.getEdgeValueClass()),
+        coarsenedVertexIdSupplier, originalNodesChecker, clusterNodesChecker);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java
new file mode 100644
index 0000000..16693fa
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Coarsening utils
+ */
+package org.apache.giraph.block_app.library.coarsening;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java
new file mode 100644
index 0000000..1739718
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.stats;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import 
org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.block_app.reducers.map.BasicMapReduce;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.reducers.impl.MaxReduce;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.ops.IntTypeOps;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+
+/** Utility class for calculating stats of a directed graph */
+public class DirectedGraphStats {
+  private static final Logger LOG = Logger.getLogger(DirectedGraphStats.class);
+  private static final double LOG_2 = Math.log(2);
+  private static final int MAX_LOG_DEGREE = 20;
+
+  private DirectedGraphStats() { }
+
+  /**
+   * Calculate and print on master statistics about in and out degrees
+   * of all vertices.
+   */
+  public static <I extends WritableComparable>
+  Block createInAndOutDegreeStatsBlock(Class<I> idClass) {
+    ObjectTransfer<Iterable<I>> inEdges = new ObjectTransfer<>();
+
+    Block announceToNeighbors = Pieces.sendMessageToNeighbors(
+        "AnnounceToNeighbors",
+        idClass,
+        VertexSuppliers.<I, Writable, Writable>vertexIdSupplier(),
+        inEdges.<I, Writable, Writable>castToConsumer());
+
+    return new SequenceBlock(
+        announceToNeighbors,
+        new AggregateInAndOutDegreeStatsPiece<>(
+            inEdges.<I, Writable, Writable>castToSupplier()));
+  }
+
+  /** Aggregating in and out degree statistics */
+  private static class AggregateInAndOutDegreeStatsPiece
+      <I extends WritableComparable>
+      extends Piece<I, Writable, Writable, Writable, Object> {
+    private final
+    SupplierFromVertex<I, Writable, Writable, Iterable<I>> inEdges;
+
+    private ReducerHandle<IntWritable, IntWritable> maxDegreeAgg;
+
+    private ReducerMapHandle<IntWritable, LongWritable, LongWritable>
+    inHistograms;
+    private ReducerMapHandle<IntWritable, LongWritable, LongWritable>
+    outHistograms;
+
+    private ReducerMapHandle<IntWritable, LongWritable, LongWritable>
+    inVsOutHistograms;
+
+
+    public AggregateInAndOutDegreeStatsPiece(
+        SupplierFromVertex<I, Writable, Writable, Iterable<I>> inEdges) {
+      this.inEdges = inEdges;
+    }
+
+    @Override
+    public void registerReducers(
+        CreateReducersApi reduceApi, Object executionStage) {
+      inHistograms = BasicMapReduce.createLocalMapHandles(
+        IntTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG, reduceApi);
+      outHistograms = BasicMapReduce.createLocalMapHandles(
+        IntTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG, reduceApi);
+
+      inVsOutHistograms = BasicMapReduce.createLocalMapHandles(
+        IntTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG, reduceApi);
+
+      maxDegreeAgg =
+          reduceApi.createLocalReducer(new MaxReduce<>(IntTypeOps.INSTANCE));
+    }
+
+    @Override
+    public VertexSender<I, Writable, Writable> getVertexSender(
+        BlockWorkerSendApi<I, Writable, Writable, Writable> workerApi,
+        Object executionStage) {
+      final IntWritable indexWrap = new IntWritable();
+
+      return new InnerVertexSender() {
+        @Override
+        public void vertexSend(Vertex<I, Writable, Writable> vertex) {
+          Iterable<I> in = inEdges.get(vertex);
+
+          int inCount = Iterables.size(in);
+          int outCount = vertex.getNumEdges();
+
+          reduceInt(maxDegreeAgg, Math.max(inCount, outCount));
+          increment(inHistograms, inCount);
+          increment(outHistograms, outCount);
+          increment(inVsOutHistograms,
+              log2(inCount + 1) * MAX_LOG_DEGREE + log2(outCount + 1));
+
+          // TODO add count for common edges.
+        }
+
+        private int log2(int value) {
+          return (int) (Math.log(value) / LOG_2);
+        }
+
+        private void increment(
+            ReducerMapHandle<IntWritable, LongWritable, LongWritable>
+              reduceHandle,
+            int index) {
+          indexWrap.set(index);
+          reduceLong(inHistograms.get(indexWrap), 1);
+        }
+      };
+    }
+
+    @Override
+    public void masterCompute(BlockMasterApi master, Object executionStage) {
+
+      int maxDegree = maxDegreeAgg.getReducedValue(master).get();
+      LOG.info("Max degree : " + maxDegree);
+
+      StringBuilder sb = new StringBuilder("In and out degree histogram:\n");
+      sb.append("degree\tnumIn\tnumOut\n");
+
+      final IntWritable index = new IntWritable();
+
+      for (int i = 0; i <= maxDegree; i++) {
+        index.set(i);
+        long numIn = inHistograms.get(index).getReducedValue(master).get();
+        long numOut = outHistograms.get(index).getReducedValue(master).get();
+        if (numIn > 0 || numOut > 0) {
+          sb.append(i + "\t" + numIn + "\t" + numOut + "\n");
+        }
+      }
+      LOG.info(sb);
+
+      sb = new StringBuilder("In vs out degree log/log histogram:\n");
+      sb.append("<inDeg\t<outDeg\tnum\n");
+
+      for (int in = 0; in < MAX_LOG_DEGREE; in++) {
+        for (int out = 0; out < MAX_LOG_DEGREE; out++) {
+          index.set(in * MAX_LOG_DEGREE + out);
+          long num = 
inVsOutHistograms.get(index).getReducedValue(master).get();
+          if (num > 0) {
+            sb.append(Math.pow(2, in) + "\t" + Math.pow(2, out) +
+                "\t" + num + "\n");
+          }
+        }
+      }
+      LOG.info(sb);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java
new file mode 100644
index 0000000..950c144
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.stats;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.library.SendMessageChain;
+import org.apache.giraph.function.primitive.DoubleConsumer;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.reducers.impl.PairReduce;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility blocks for calculating stats for a given partitioning - an
+ * assignment of vertices to buckets.
+ */
+public class PartitioningStats {
+  private static final Logger LOG = Logger.getLogger(PartitioningStats.class);
+
+  private PartitioningStats() { }
+
+  /**
+   * Calculate edge locality - ratio of edges that are within a same bucket.
+   */
+  public static <V extends Writable> Block calculateEdgeLocality(
+      SupplierFromVertex<WritableComparable, V, Writable, LongWritable>
+        bucketSupplier,
+      DoubleConsumer edgeLocalityConsumer) {
+    final Pair<LongWritable, LongWritable> pair =
+        Pair.of(new LongWritable(), new LongWritable());
+    return SendMessageChain.<WritableComparable, V, Writable, LongWritable>
+    startSendToNeighbors(
+        "CalcLocalEdgesPiece",
+        LongWritable.class,
+        bucketSupplier
+    ).endReduceWithMaster(
+        "AggregateEdgeLocalityPiece",
+        new PairReduce<>(SumReduce.LONG, SumReduce.LONG),
+        (vertex, messages) -> {
+          long bucket = bucketSupplier.get(vertex).get();
+          int local = 0;
+          int total = 0;
+          for (LongWritable otherCluster : messages) {
+            total++;
+            if (bucket == otherCluster.get()) {
+              local++;
+            }
+          }
+          pair.getLeft().set(local);
+          pair.getRight().set(total);
+          return pair;
+        },
+        (reducedPair, master) -> {
+          long localEdges = reducedPair.getLeft().get();
+          long totalEdges = reducedPair.getRight().get();
+          double edgeLocality = (double) localEdges / totalEdges;
+          LOG.info("locality ratio = " + edgeLocality);
+          master.getCounter(
+              "Edge locality stats", "edge locality (in percent * 1000)")
+            .setValue((long) (edgeLocality * 100000));
+          edgeLocalityConsumer.apply(edgeLocality);
+        }
+    );
+  }
+
+  /**
+   * Calculates average fanout - average number of distinct buckets that vertex
+   * has neighbors in.
+   */
+  public static <V extends Writable> Block calculateFanout(
+      SupplierFromVertex<WritableComparable, V, Writable, LongWritable>
+        bucketSupplier,
+      DoubleConsumer averageFanoutConsumer) {
+    final Pair<LongWritable, LongWritable> pair =
+        Pair.of(new LongWritable(), new LongWritable(1));
+    return SendMessageChain.<WritableComparable, V, Writable, LongWritable>
+    startSendToNeighbors(
+        "CalcFanoutPiece",
+        LongWritable.class,
+        bucketSupplier
+    ).endReduceWithMaster(
+        "AggregateFanoutPiece",
+        new PairReduce<>(SumReduce.LONG, SumReduce.LONG),
+        (vertex, messages) -> {
+          LongSet setOfNeighborBuckets = new LongOpenHashSet();
+          for (LongWritable neighborBucket : messages) {
+            setOfNeighborBuckets.add(neighborBucket.get());
+          }
+          pair.getLeft().set(setOfNeighborBuckets.size());
+          return pair;
+        },
+        (reducedPair, master) -> {
+          long fanout = reducedPair.getLeft().get();
+          long numVertices = reducedPair.getRight().get();
+          double avgFanout = (double) fanout / numVertices;
+          LOG.info("fanout ratio = " + avgFanout);
+          master.getCounter("Fanout stats", "fanout (in percent * 1000)")
+            .setValue((long) (avgFanout * 100000));
+          averageFanoutConsumer.apply(avgFanout);
+        }
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java
new file mode 100644
index 0000000..4240dd7
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for calculating graph statistics.
+ */
+package org.apache.giraph.block_app.library.stats;

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java
new file mode 100644
index 0000000..13dfbdb
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.AbstractBlockFactory;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Parent class for PageRank example of using BlockFactory. Contains same 
typing information needed
+ * for all examples.
+ */
+public abstract class AbstractPageRankExampleBlockFactory extends 
AbstractBlockFactory<Object> {
+  public static final IntConfOption NUM_ITERATIONS = new IntConfOption(
+      "page_rank_example.num_iterations", 10, "num iterations");
+
+  @Override
+  public Object createExecutionStage(GiraphConfiguration conf) {
+    return new Object();
+  }
+
+  @Override
+  protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) {
+    return LongWritable.class;
+  }
+
+  @Override
+  protected Class<DoubleWritable> getVertexValueClass(GiraphConfiguration 
conf) {
+    return DoubleWritable.class;
+  }
+
+  @Override
+  protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) {
+    return NullWritable.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java
new file mode 100644
index 0000000..dd70c0e
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatBlock;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * PageRank example of using BlockFactory - in it's simplest form, using 
functional primitives.
+ */
+public class PageRankExampleBlockFactory extends 
AbstractPageRankExampleBlockFactory {
+  @Override
+  @SuppressWarnings("rawtypes")
+  public Block createBlock(GiraphConfiguration conf) {
+    Block iter = Pieces.<WritableComparable, DoubleWritable, Writable, 
DoubleWritable>
+      sendMessageToNeighbors(
+        "IterationPiece",
+        SumMessageCombiner.DOUBLE,
+        (vertex) -> new DoubleWritable(vertex.getValue().get() / 
vertex.getNumEdges()),
+        (vertex, value) -> {
+          double sum = value != null ? value.get() : 0;
+          vertex.getValue().set(0.15f + 0.85f * sum);
+        });
+    return new RepeatBlock(NUM_ITERATIONS.get(conf), iter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java
new file mode 100644
index 0000000..1585557
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.library.SendMessageChain;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ *  PageRank example with convergence check, using functional primitives and 
SendMessageChain
+ *  for send/receive/reply logic.
+ */
+public class PageRankWithConvergenceExampleBlockFactory
+    extends AbstractPageRankExampleBlockFactory {
+  private static final double EPS = 1e-3;
+  private static final LongWritable ONE = new LongWritable(1);
+  private static final LongWritable ZERO = new LongWritable(0);
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public Block createBlock(GiraphConfiguration conf) {
+    ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
+
+    Block iter = SendMessageChain.<WritableComparable, DoubleWritable, 
Writable, DoubleWritable>
+      startSendToNeighbors(
+        "PageRankUpdate",
+        SumMessageCombiner.DOUBLE,
+        (vertex) -> new DoubleWritable(vertex.getValue().get() / 
vertex.getNumEdges())
+      ).endReduce(
+        "PageRankCheckConvergence",
+        SumReduce.LONG,
+        (vertex, value) -> {
+          double sum = value != null ? value.get() : 0;
+          double newValue = 0.15f + 0.85f * sum;
+          double change = Math.abs(newValue - vertex.getValue().get());
+          vertex.getValue().set(newValue);
+          return (change > EPS) ? ONE : ZERO;
+        },
+        (changingCount) -> converged.apply(changingCount.get() == 0)
+      );
+
+    return new RepeatUntilBlock(
+        NUM_ITERATIONS.get(conf),
+        iter,
+        converged);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java
new file mode 100644
index 0000000..878bf1f
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * PageRank example with convergence check.
+ */
+@SuppressWarnings("rawtypes")
+public class PageRankWithPiecesAndConvergenceExampleBlockFactory extends 
AbstractPageRankExampleBlockFactory {
+  private static final double EPS = 1e-3;
+
+  @Override
+  public Block createBlock(GiraphConfiguration conf) {
+    ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
+    ObjectTransfer<Double> vertexValueChange = new ObjectTransfer<>();
+    return new RepeatUntilBlock(
+      NUM_ITERATIONS.get(conf),
+      new SequenceBlock(
+        new PageRankUpdatePiece(vertexValueChange),
+        new PageRankConvergencePiece(vertexValueChange, converged)
+      ),
+      converged);
+  }
+
+  /** One PageRank iteration */
+  public static class PageRankUpdatePiece
+      extends Piece<WritableComparable, DoubleWritable, Writable, 
DoubleWritable, Object> {
+    private final Consumer<Double> changeConsumer;
+
+    public PageRankUpdatePiece(Consumer<Double> changeConsumer) {
+      this.changeConsumer = changeConsumer;
+    }
+
+    @Override
+    public VertexSender<WritableComparable, DoubleWritable, Writable> 
getVertexSender(
+        final BlockWorkerSendApi<WritableComparable, DoubleWritable, Writable,
+          DoubleWritable> workerApi,
+        Object executionStage) {
+      DoubleWritable message = new DoubleWritable();
+      return (vertex) -> {
+        message.set(vertex.getValue().get() / vertex.getNumEdges());
+        workerApi.sendMessageToAllEdges(vertex, message);
+      };
+    }
+
+    @Override
+    public VertexReceiver<WritableComparable, DoubleWritable, Writable, 
DoubleWritable>
+    getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi, 
Object executionStage) {
+      return (vertex, messages) -> {
+        double sum = 0;
+        for (DoubleWritable value : messages) {
+          sum += value.get();
+        }
+        double newValue = 0.15f + 0.85f * sum;
+        changeConsumer.apply(Math.abs(newValue - vertex.getValue().get()));
+        vertex.getValue().set(newValue);
+      };
+    }
+
+    @Override
+    public MessageCombiner<? super WritableComparable, DoubleWritable> 
getMessageCombiner(
+        ImmutableClassesGiraphConfiguration conf) {
+      return SumMessageCombiner.DOUBLE;
+    }
+  }
+
+  /** PageRank convergence check */
+  public static class PageRankConvergencePiece
+      extends Piece<WritableComparable, DoubleWritable, Writable, NoMessage, 
Object> {
+    private ReducerHandle<LongWritable, LongWritable> countModified;
+
+    private final Supplier<Double> changeSupplier;
+    private final Consumer<Boolean> converged;
+
+    public PageRankConvergencePiece(
+        Supplier<Double> changeSupplier,
+        Consumer<Boolean> converged) {
+      this.changeSupplier = changeSupplier;
+      this.converged = converged;
+    }
+
+    @Override
+    public void registerReducers(CreateReducersApi reduceApi, Object 
executionStage) {
+      countModified = reduceApi.createLocalReducer(SumReduce.LONG);
+    }
+
+    @Override
+    public VertexSender<WritableComparable, DoubleWritable, Writable>
+    getVertexSender(
+        BlockWorkerSendApi<WritableComparable, DoubleWritable, Writable, 
NoMessage> workerApi,
+        Object executionStage) {
+      return (vertex) -> {
+        double change = changeSupplier.get();
+        if (change > EPS) {
+          reduceLong(countModified, 1);
+        }
+      };
+    }
+
+    @Override
+    public void masterCompute(BlockMasterApi master, Object executionStage) {
+      LongWritable count = countModified.getReducedValue(master);
+      converged.apply(count.get() == 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java
new file mode 100644
index 0000000..8881752
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * PageRank example of using BlockFactory - in it's simplest form.
+ * This single class represents everything needed for the application.
+ * To use it - set BlockUtils.BLOCK_FACTORY_CLASS property to this class.
+ *
+ * Note, as a general practice, for iteration Piece to be reusable, it should 
not
+ * assume fixed vertex value class, but specify interface it needs, and use it 
instead.
+ */
+public class PageRankWithPiecesExampleBlockFactory extends 
AbstractPageRankExampleBlockFactory {
+  @Override
+  public Block createBlock(GiraphConfiguration conf) {
+    return new RepeatBlock(NUM_ITERATIONS.get(conf), new 
PageRankUpdatePiece());
+  }
+
+  /** One PageRank iteration */
+  @SuppressWarnings("rawtypes")
+  public static class PageRankUpdatePiece
+      extends Piece<WritableComparable, DoubleWritable, Writable, 
DoubleWritable, Object> {
+    @Override
+    public VertexSender<WritableComparable, DoubleWritable, Writable>
+    getVertexSender(BlockWorkerSendApi<WritableComparable, DoubleWritable, 
Writable,
+          DoubleWritable> workerApi,
+        Object executionStage) {
+      DoubleWritable message = new DoubleWritable();
+      return (vertex) -> {
+        message.set(vertex.getValue().get() / vertex.getNumEdges());
+        workerApi.sendMessageToAllEdges(vertex, message);
+      };
+    }
+
+    @Override
+    public VertexReceiver<WritableComparable, DoubleWritable, Writable, 
DoubleWritable>
+    getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi, 
Object executionStage) {
+      return (vertex, messages) -> {
+        double sum = 0;
+        for (DoubleWritable value : messages) {
+          sum += value.get();
+        }
+        vertex.getValue().set(0.15f + 0.85f * sum);
+      };
+    }
+
+    @Override
+    public MessageCombiner<? super WritableComparable, DoubleWritable> 
getMessageCombiner(
+        ImmutableClassesGiraphConfiguration conf) {
+      return SumMessageCombiner.DOUBLE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java
new file mode 100644
index 0000000..c939701
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ *  PageRank example with convergence check, using functional primitives
+ */
+public class PageRankWithTransferAndConvergenceExampleBlockFactory
+    extends AbstractPageRankExampleBlockFactory {
+  private static final double EPS = 1e-3;
+  private static final LongWritable ONE = new LongWritable(1);
+  private static final LongWritable ZERO = new LongWritable(0);
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public Block createBlock(GiraphConfiguration conf) {
+    ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
+    ObjectTransfer<Double> vertexValueChange = new ObjectTransfer<>();
+
+    Block iter = Pieces.<WritableComparable, DoubleWritable, Writable, 
DoubleWritable>
+    sendMessageToNeighbors(
+        "IterationPiece",
+        SumMessageCombiner.DOUBLE,
+        (vertex) -> new DoubleWritable(vertex.getValue().get() / 
vertex.getNumEdges()),
+        (vertex, value) -> {
+          double sum = value != null ? value.get() : 0;
+          double newValue = 0.15f + 0.85f * sum;
+          vertexValueChange.apply(Math.abs(newValue - 
vertex.getValue().get()));
+          vertex.getValue().set(newValue);
+        });
+
+    Block checkConverged = Pieces.reduce(
+        "CheckConvergedPiece",
+        SumReduce.LONG,
+        (vertex) -> {
+          double change = vertexValueChange.get();
+          return (change > EPS) ? ONE : ZERO;
+        },
+        (changingCount) -> converged.apply(changingCount.get() == 0));
+
+    return new RepeatUntilBlock(
+        NUM_ITERATIONS.get(conf),
+        new SequenceBlock(iter, checkConverged),
+        converged);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java
 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java
new file mode 100644
index 0000000..b6b65ba
--- /dev/null
+++ 
b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.examples.pagerank;
+
+import org.apache.giraph.block_app.framework.BlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPageRankExample {
+  private <I extends WritableComparable>
+  void testTenIterations(final Class<I> type,
+      final Class<? extends BlockFactory<?>> factory) throws Exception {
+    TestGraphUtils.runTest(
+        TestGraphUtils.chainModifiers(
+            new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
+            new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
+        (graph) -> {
+          float outside = 0.8759f;
+          float inside = 1.2481f;
+          float isolated = 0.15f;
+
+          for (int i : new int[] {0, 1, 4, 5}) {
+            Assert.assertEquals(outside, graph.getValue(i).get(), 0.001);
+          }
+          for (int i : new int[] {2, 3}) {
+            Assert.assertEquals(inside, graph.getValue(i).get(), 0.001);
+          }
+          for (int i : new int[] {6}) {
+            Assert.assertEquals(isolated, graph.getValue(i).get(), 0.001);
+          }
+        },
+        (GiraphConfiguration conf) -> {
+          GiraphConstants.VERTEX_ID_CLASS.set(conf, type);
+          BlockUtils.setBlockFactoryClass(conf, factory);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 10);
+        });
+  }
+
+  private void testTenIterationsOnAllTypes(Class<? extends BlockFactory<?>> 
factory)
+      throws Exception {
+    testTenIterations(
+        IntWritable.class,
+        factory);
+    testTenIterations(
+        LongWritable.class,
+        factory);
+  }
+
+  private <I extends WritableComparable>
+  void testConverging(
+      final Class<I> type, Class<? extends BlockFactory<?>> factory) throws 
Exception {
+    TestGraphUtils.runTest(
+        new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
+        (graph) -> {
+        },
+        (GiraphConfiguration conf) -> {
+          GiraphConstants.VERTEX_ID_CLASS.set(conf, type);
+          BlockUtils.setBlockFactoryClass(conf, factory);
+          AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 
Integer.MAX_VALUE);
+        });
+    // Test is that this doesn't loop forever, even though 
num_iterations=Integer.MAX_VALUE
+  }
+
+  private <I extends WritableComparable>
+  void testConvergingOnAllTypes(Class<? extends BlockFactory<?>> factory) 
throws Exception {
+    testConverging(
+        IntWritable.class,
+        factory);
+    testConverging(
+        LongWritable.class,
+        factory);
+  }
+
+  @Test
+  public void testExample() throws Exception {
+    testTenIterationsOnAllTypes(PageRankWithPiecesExampleBlockFactory.class);
+  }
+
+  @Test
+  public void testConvergenceExample() throws Exception {
+    
testTenIterationsOnAllTypes(PageRankWithPiecesAndConvergenceExampleBlockFactory.class);
+    
testConvergingOnAllTypes(PageRankWithPiecesAndConvergenceExampleBlockFactory.class);
+  }
+
+  @Test
+  public void testFunctionalExample() throws Exception {
+    testTenIterationsOnAllTypes(PageRankExampleBlockFactory.class);
+  }
+
+  @Test
+  public void testFunctionalConvergenceExample() throws Exception {
+    
testTenIterationsOnAllTypes(PageRankWithTransferAndConvergenceExampleBlockFactory.class);
+    
testConvergingOnAllTypes(PageRankWithTransferAndConvergenceExampleBlockFactory.class);
+  }
+
+  @Test
+  public void testFunctionalChainConvergenceExample() throws Exception {
+    
testTenIterationsOnAllTypes(PageRankWithConvergenceExampleBlockFactory.class);
+    testConvergingOnAllTypes(PageRankWithConvergenceExampleBlockFactory.class);
+  }
+}

Reply via email to