Repository: giraph
Updated Branches:
  refs/heads/trunk 27f234f1f -> a1a236fa6


[GIRAPH-1013] Add library of common pieces and functions

Summary:
StripingUtils has been modified, to be compiled with Java7, and to
have snippet of MIT lincense for used hash algorithm.

Test Plan: mvn clean install

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D39915


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a1a236fa
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a1a236fa
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a1a236fa

Branch: refs/heads/trunk
Commit: a1a236fa6f8bd2c9e3ba479e8cc1aa94a7f1f402
Parents: 27f234f
Author: Igor Kabiljo <[email protected]>
Authored: Wed Jun 10 15:33:35 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Tue Jun 16 11:13:37 2015 -0700

----------------------------------------------------------------------
 .../apache/giraph/block_app/library/Pieces.java | 368 +++++++++++++++++++
 .../block_app/library/SendMessageChain.java     | 288 +++++++++++++++
 .../block_app/library/VertexSuppliers.java      | 176 +++++++++
 .../library/internal/SendMessagePiece.java      | 158 ++++++++
 .../internal/SendMessageWithCombinerPiece.java  | 167 +++++++++
 .../library/internal/package-info.java          |  21 ++
 .../iteration/IterationCounterPiece.java        |  35 ++
 .../library/iteration/IterationStage.java       |  28 ++
 .../library/iteration/IterationStageImpl.java   |  48 +++
 .../library/iteration/package-info.java         |  21 ++
 .../giraph/block_app/library/package-info.java  |  21 ++
 .../library/striping/StripingUtils.java         | 225 ++++++++++++
 .../library/striping/package-info.java          |  21 ++
 .../apache/giraph/function/PairPredicate.java   |  35 ++
 .../org/apache/giraph/function/Predicate.java   |  36 ++
 .../function/primitive/Int2ObjFunction.java     |  36 ++
 .../function/primitive/Obj2IntFunction.java     |  33 ++
 17 files changed, 1717 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
new file mode 100644
index 0000000..88b78a3
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import 
org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.internal.SendMessagePiece;
+import 
org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility class for creating common Pieces and computations for processing
+ * graphs.
+ */
+public class Pieces {
+  private static final Logger LOG = Logger.getLogger(Pieces.class);
+
+  private Pieces() { }
+
+  /**
+   * For each vertex execute given process function.
+   * Computation is happening in send phase of the returned Piece.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Piece<I, V, E, NoMessage, Object> forAllVertices(
+      final String pieceName, final Consumer<Vertex<I, V, E>> process) {
+    return new Piece<I, V, E, NoMessage, Object>() {
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<I, V, E> vertex) {
+            process.apply(vertex);
+          }
+        };
+      }
+
+      @Override
+      public String toString() {
+        return pieceName;
+      }
+    };
+  }
+
+  /**
+   * For each vertex execute given process function.
+   * Computation is happening in the receive phase of the returned Piece.
+   * This function should be used if you need returned Piece to interact with
+   * subsequent Piece, as that requires passed function to be executed
+   * during receive phase,
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Piece<I, V, E, NoMessage, Object> forAllVerticesOnReceive(
+      final String pieceName, final Consumer<Vertex<I, V, E>> process) {
+    return new Piece<I, V, E, NoMessage, Object>() {
+      @Override
+      public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
+          BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        return new InnerVertexReceiver() {
+          @Override
+          public void vertexReceive(
+              Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
+            process.apply(vertex);
+          }
+        };
+      }
+
+      @Override
+      public String toString() {
+        return pieceName;
+      }
+    };
+  }
+
+  /**
+   * Creates Piece which removes vertices for which supplier returns true.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Piece<I, V, E, NoMessage, Object> removeVertices(
+      final String pieceName,
+      final SupplierFromVertex<I, V, E, Boolean> shouldRemoveVertex) {
+    return new Piece<I, V, E, NoMessage, Object>() {
+      private ReducerHandle<LongWritable, LongWritable> countRemovedAgg;
+
+      @Override
+      public void registerReducers(
+          CreateReducersApi reduceApi, Object executionStage) {
+        countRemovedAgg = reduceApi.createLocalReducer(SumReduce.LONG);
+      }
+
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          final BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<I, V, E> vertex) {
+            if (shouldRemoveVertex.get(vertex)) {
+              workerApi.removeVertexRequest(vertex.getId());
+              reduceLong(countRemovedAgg, 1);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        LOG.info("Removed " + countRemovedAgg.getReducedValue(master) +
+            " vertices from the graph, during stage " + executionStage);
+      }
+
+      @Override
+      public String toString() {
+        return pieceName;
+      }
+    };
+  }
+
+  /**
+   * Creates single reducer piece - given reduce class, supplier of values on
+   * worker, reduces and passes the result to given consumer on master.
+   *
+   * @param <S> Single value type, objects passed on workers
+   * @param <R> Reduced value type
+   * @param <I> Vertex id type
+   * @param <V> Vertex value type
+   * @param <E> Edge value type
+   */
+  public static
+  <S, R extends Writable, I extends WritableComparable, V extends Writable,
+  E extends Writable>
+  Piece<I, V, E, NoMessage, Object> reduce(
+      final String name,
+      final ReduceOperation<S, R> reduceOp,
+      final SupplierFromVertex<I, V, E, S> valueSupplier,
+      final Consumer<R> reducedValueConsumer) {
+    return new Piece<I, V, E, NoMessage, Object>() {
+      private ReducerHandle<S, R> handle;
+
+      @Override
+      public void registerReducers(
+          CreateReducersApi reduceApi, Object executionStage) {
+        handle = reduceApi.createLocalReducer(reduceOp);
+      }
+
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<I, V, E> vertex) {
+            handle.reduce(valueSupplier.get(vertex));
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        reducedValueConsumer.apply(handle.getReducedValue(master));
+      }
+
+      @Override
+      public String toString() {
+        return name;
+      }
+    };
+  }
+
+  /**
+   * Creates single reducer and broadcast piece - given reduce class, supplier
+   * of values on worker, reduces and broadcasts the value, passing it to the
+   * consumer on worker for each vertex.
+   *
+   * @param <S> Single value type, objects passed on workers
+   * @param <R> Reduced value type
+   * @param <I> Vertex id type
+   * @param <V> Vertex value type
+   * @param <E> Edge value type
+   */
+  public static
+  <S, R extends Writable, I extends WritableComparable, V extends Writable,
+  E extends Writable>
+  Piece<I, V, E, NoMessage, Object> reduceAndBroadcast(
+      final String name,
+      final ReduceOperation<S, R> reduceOp,
+      final SupplierFromVertex<I, V, E, S> valueSupplier,
+      final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
+    return new Piece<I, V, E, NoMessage, Object>() {
+      private final ReducerAndBroadcastWrapperHandle<S, R> handle =
+          new ReducerAndBroadcastWrapperHandle<>();
+
+      @Override
+      public void registerReducers(
+          CreateReducersApi reduceApi, Object executionStage) {
+        handle.registeredReducer(reduceApi.createLocalReducer(reduceOp));
+      }
+
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
+          Object executionStage) {
+        return new InnerVertexSender() {
+          @Override
+          public void vertexSend(Vertex<I, V, E> vertex) {
+            handle.reduce(valueSupplier.get(vertex));
+          }
+        };
+      }
+
+      @Override
+      public void masterCompute(BlockMasterApi master, Object executionStage) {
+        handle.broadcastValue(master);
+      }
+
+      @Override
+      public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
+          BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        final R value = handle.getBroadcast(workerApi);
+        return new InnerVertexReceiver() {
+          @Override
+          public void vertexReceive(
+              Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
+            reducedValueConsumer.apply(vertex, value);
+          }
+        };
+      }
+
+      @Override
+      public String toString() {
+        return name;
+      }
+    };
+  }
+
+  /**
+   * Creates Piece that for each vertex, sends message provided by
+   * messageSupplier to all targets provided by targetsSupplier.
+   * Received messages are then passed to and processed by provided
+   * messagesConsumer.
+   *
+   * If messageSupplier or targetsSupplier returns null, current vertex
+   * is not going to send any messages.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable,
+  M extends Writable>
+  SendMessagePiece<I, V, E, M> sendMessage(
+      String name,
+      Class<M> messageClass,
+      SupplierFromVertex<I, V, E, M> messageSupplier,
+      SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
+      ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
+    return new SendMessagePiece<>(
+        name, messageClass, messageSupplier, targetsSupplier, 
messagesConsumer);
+  }
+
+  /**
+   * Creates Piece that for each vertex, sends message provided by
+   * messageSupplier to all neighbors of current vertex.
+   * Received messages are then passed to and processed by provided
+   * messagesConsumer.
+   *
+   * If messageSupplier returns null, current vertex
+   * is not going to send any messages.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable,
+  M extends Writable>
+  SendMessagePiece<I, V, E, M> sendMessageToNeighbors(
+      String name,
+      Class<M> messageClass,
+      SupplierFromVertex<I, V, E, M> messageSupplier,
+      ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
+    return sendMessage(
+        name, messageClass, messageSupplier,
+        VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
+        messagesConsumer);
+  }
+
+  /**
+   * Creates Piece that for each vertex, sends message provided by
+   * messageSupplier to all targets provided by targetsSupplier,
+   * and uses given messageCombiner to combine messages together.
+   * Received combined message is then passed to and processed by provided
+   * messageConsumer. (null is passed to it, if vertex received no messages)
+   *
+   * If messageSupplier or targetsSupplier returns null, current vertex
+   * is not going to send any messages.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable,
+  M extends Writable>
+  SendMessageWithCombinerPiece<I, V, E, M> sendMessage(
+      String name,
+      MessageCombiner<? super I, M> messageCombiner,
+      SupplierFromVertex<I, V, E, M> messageSupplier,
+      SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
+      ConsumerWithVertex<I, V, E, M> messagesConsumer) {
+    return new SendMessageWithCombinerPiece<>(
+        name, messageCombiner,
+        messageSupplier, targetsSupplier, messagesConsumer);
+  }
+
+  /**
+   * Creates Piece that for each vertex, sends message provided by
+   * messageSupplier to all neighbors of current vertex,
+   * and uses given messageCombiner to combine messages together.
+   * Received combined message is then passed to and processed by provided
+   * messageConsumer. (null is passed to it, if vertex received no messages)
+   *
+   * If messageSupplier returns null, current vertex
+   * is not going to send any messages.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable,
+  M extends Writable>
+  SendMessageWithCombinerPiece<I, V, E, M> sendMessageToNeighbors(
+      String name,
+      MessageCombiner<? super I, M> messageCombiner,
+      SupplierFromVertex<I, V, E, M> messageSupplier,
+      ConsumerWithVertex<I, V, E, M> messagesConsumer) {
+    return sendMessage(
+        name, messageCombiner, messageSupplier,
+        VertexSuppliers.<I, V, E>vertexNeighborsSupplier(),
+        messagesConsumer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
new file mode 100644
index 0000000..b606a34
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.FunctionWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Utility class for creating sequences of sending replies to received
+ * messages. Current instance of this object represents partial chain,
+ * where we have specified which messages will be send at the lastly defined
+ * link in the chain thus far, but we haven't specified yet what to do when
+ * vertices receive them.
+ *
+ * Contains set of:
+ * - static startX methods, used to create the chain
+ * - thenX methods, used to add one more Piece to the chain, can be
+ *   "chained" arbitrary number of times.
+ * - endX methods, used to finish the chain, returning
+ *   the Block representing the whole chain
+ *
+ * If messageSupplier or targetsSupplier returns null, current vertex
+ * is not going to send any messages.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <P> Previous value
+ */
+public class SendMessageChain<I extends WritableComparable, V extends Writable,
+    E extends Writable, P> {
+  /**
+   * Represent current partial chain. Given a way to consume messages
+   * received in lastly defined link in this chain, it will produce block
+   * representing a chain created thus far.
+   */
+  private final Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator;
+
+  private SendMessageChain(
+      Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator) {
+    this.blockCreator = blockCreator;
+  }
+
+  /**
+   * Start chain with sending message provided by messageSupplier to all
+   * targets provided by targetsSupplier.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  SendMessageChain<I, V, E, Iterable<M>> startSend(
+      final String name,
+      final Class<M> messageClass,
+      final SupplierFromVertex<I, V, E, M> messageSupplier,
+      final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
+    return new SendMessageChain<>(
+        new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
+          @Override
+          public Block apply(
+              ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
+            return Pieces.sendMessage(
+                name, messageClass, messageSupplier,
+                targetsSupplier, messagesConsumer);
+          }
+        });
+  }
+
+  /**
+   * Start chain with sending message provided by messageSupplier to all
+   * targets provided by targetsSupplier, and use given messageCombiner to
+   * combine messages together.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  SendMessageChain<I, V, E, M> startSend(
+      final String name,
+      final MessageCombiner<? super I, M> messageCombiner,
+      final SupplierFromVertex<I, V, E, M> messageSupplier,
+      final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
+    return new SendMessageChain<>(
+        new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
+          @Override
+          public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
+            return Pieces.sendMessage(
+                name, messageCombiner, messageSupplier,
+                targetsSupplier, messagesConsumer);
+          }
+        });
+  }
+
+  /**
+   * Start chain with sending message provided by messageSupplier to all
+   * neighbors of a current vertex.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  SendMessageChain<I, V, E, Iterable<M>> startSendToNeighbors(
+      final String name,
+      final Class<M> messageClass,
+      final SupplierFromVertex<I, V, E, M> messageSupplier) {
+    return startSend(name, messageClass, messageSupplier,
+        VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
+  }
+
+  /**
+   * Start chain with sending message provided by messageSupplier to all
+   * neighbors of a current vertex, and use given messageCombiner to
+   * combine messages together.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  SendMessageChain<I, V, E, M> startSendToNeighbors(
+      final String name,
+      final MessageCombiner<? super I, M> messageCombiner,
+      final SupplierFromVertex<I, V, E, M> messageSupplier) {
+    return startSend(name, messageCombiner, messageSupplier,
+        VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
+  }
+
+  /**
+   * Give previously received message(s) to messageSupplier, and send message
+   * it returns to all targets provided by targetsSupplier.
+   */
+  public <M extends Writable>
+  SendMessageChain<I, V, E, Iterable<M>> thenSend(
+      final String name,
+      final Class<M> messageClass,
+      final FunctionWithVertex<I, V, E, P, M> messageSupplier,
+      final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
+    final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
+
+    return new SendMessageChain<>(
+        new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() {
+          @Override
+          public Block apply(
+              ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
+            return new SequenceBlock(
+              blockCreator.apply(
+                  prevMessagesTransfer.<I, V, E>castToConsumer()),
+              Pieces.sendMessage(
+                name, messageClass,
+                new SupplierFromVertex<I, V, E, M>() {
+                  @Override
+                  public M get(Vertex<I, V, E> vertex) {
+                    return messageSupplier.apply(
+                        vertex, prevMessagesTransfer.get());
+                  }
+                },
+                targetsSupplier, messagesConsumer));
+          }
+        });
+  }
+
+  /**
+   * Give previously received message(s) to messageSupplier, and send message
+   * it returns to all neighbors of current vertex.
+   */
+  public <M extends Writable>
+  SendMessageChain<I, V, E, Iterable<M>> thenSendToNeighbors(
+      final String name,
+      final Class<M> messageClass,
+      final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
+    return thenSend(name, messageClass, messageSupplier,
+        VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
+  }
+
+  /**
+   * Give previously received message(s) to messageSupplier, and send message
+   * it returns to all targets provided by targetsSupplier, and use given
+   * messageCombiner to combine messages together.
+   */
+  public <M extends Writable>
+  SendMessageChain<I, V, E, M> thenSend(
+      final String name,
+      final MessageCombiner<? super I, M> messageCombiner,
+      final FunctionWithVertex<I, V, E, P, M> messageSupplier,
+      final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) {
+    final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
+
+    return new SendMessageChain<>(
+        new Function<ConsumerWithVertex<I, V, E, M>, Block>() {
+          @Override
+          public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) {
+            return new SequenceBlock(
+              blockCreator.apply(
+                  prevMessagesTransfer.<I, V, E>castToConsumer()),
+              Pieces.sendMessage(
+                name, messageCombiner,
+                new SupplierFromVertex<I, V, E, M>() {
+                  @Override
+                  public M get(Vertex<I, V, E> vertex) {
+                    return messageSupplier.apply(
+                        vertex, prevMessagesTransfer.get());
+                  }
+                },
+                targetsSupplier, messagesConsumer));
+          }
+        });
+  }
+
+  /**
+   * Give previously received message(s) to messageSupplier, and send message
+   * it returns to all neighbors of current vertex, and use given
+   * messageCombiner to combine messages together.
+   */
+  public <M extends Writable>
+  SendMessageChain<I, V, E, M> thenSendToNeighbors(
+      final String name,
+      final MessageCombiner<? super I, M> messageCombiner,
+      final FunctionWithVertex<I, V, E, P, M> messageSupplier) {
+    return thenSend(name, messageCombiner, messageSupplier,
+        VertexSuppliers.<I, V, E>vertexNeighborsSupplier());
+  }
+
+  /**
+   * End chain by giving received messages to valueSupplier,
+   * to produce value that should be reduced, and consumed on master
+   * by reducedValueConsumer.
+   */
+  public <S, R extends Writable>
+  Block endReduce(String name, ReduceOperation<S, R> reduceOp,
+      final FunctionWithVertex<I, V, E, P, S> valueSupplier,
+      Consumer<R> reducedValueConsumer) {
+    final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
+
+    return new SequenceBlock(
+      blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
+      Pieces.reduce(
+        name,
+        reduceOp,
+        new SupplierFromVertex<I, V, E, S>() {
+          @Override
+          public S get(Vertex<I, V, E> vertex) {
+            return valueSupplier.apply(vertex, prevMessagesTransfer.get());
+          }
+        },
+        reducedValueConsumer));
+  }
+
+  /**
+   * End chain by processing messages received within the last link
+   * in the chain.
+   */
+  public Block endConsume(ConsumerWithVertex<I, V, E, P> messagesConsumer) {
+    return blockCreator.apply(messagesConsumer);
+  }
+
+  /**
+   * End chain by providing a function that will produce Block to be attached
+   * to the end of current chain, given a supplier of messages received
+   * within the last link in the chain.
+   */
+  public Block endCustom(
+      Function<SupplierFromVertex<I, V, E, P>, Block> createBlockToAttach) {
+    final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>();
+    return new SequenceBlock(
+        blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()),
+        createBlockToAttach.apply(
+            prevMessagesTransfer.<I, V, E>castToSupplier()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java
new file mode 100644
index 0000000..321ed3a
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+
+import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.function.PairPredicate;
+import org.apache.giraph.function.Predicate;
+import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+/**
+ * SupplierFromVertex that extract common information from
+ * vertex itself.
+ */
+@SuppressWarnings("rawtypes")
+public class VertexSuppliers {
+  /** Hide constructor */
+  private VertexSuppliers() { }
+
+  /**
+   * Supplier which extracts and returns vertex ID.
+   * (note - do not modify the object, as it is not returning a copy)
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, I> vertexIdSupplier() {
+    return new SupplierFromVertex<I, V, E, I>() {
+      @Override
+      public I get(Vertex<I, V, E> vertex) {
+        return vertex.getId();
+      }
+    };
+  }
+
+  /**
+   * Supplier which extracts and returns vertex value.
+   * (note - doesn't create a copy of vertex value)
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, V> vertexValueSupplier() {
+    return new SupplierFromVertex<I, V, E, V>() {
+      @Override
+      public V get(Vertex<I, V, E> vertex) {
+        return vertex.getValue();
+      }
+    };
+  }
+
+  /**
+   * Supplier which extracts and returns edges object.
+   * (note - doesn't create a copy of vertex value)
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, Iterable<Edge<I, E>>> vertexEdgesSupplier() {
+    return new SupplierFromVertex<I, V, E, Iterable<Edge<I, E>>>() {
+      @Override
+      public Iterable<Edge<I, E>> get(Vertex<I, V, E> vertex) {
+        return vertex.getEdges();
+      }
+    };
+  }
+
+  /**
+   * Supplier which extracts and returns Iterator over all neighbor IDs.
+   * Note - iterator returns reused object, so you need to "use" them,
+   * before calling next() again.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplier() {
+    return new SupplierFromVertex<I, V, E, Iterator<I>>() {
+      @Override
+      public Iterator<I> get(final Vertex<I, V, E> vertex) {
+        return new TargetVertexIdIterator<>(vertex);
+      }
+    };
+  }
+
+  /**
+   * Supplier which extracts and returns Iterator over neighbor IDs
+   * that return true for given predicate.
+   * Note - iterator returns reused object, so you need to "use" them,
+   * before calling next() again.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplier(
+      final Predicate<I> toSupply) {
+    return new SupplierFromVertex<I, V, E, Iterator<I>>() {
+      @Override
+      public Iterator<I> get(final Vertex<I, V, E> vertex) {
+        return Iterators.filter(
+            new TargetVertexIdIterator<>(vertex),
+            new com.google.common.base.Predicate<I>() {
+              @Override
+              public boolean apply(I input) {
+                return toSupply.apply(input);
+              }
+            });
+      }
+    };
+  }
+
+  /**
+   * Supplier which gives Iterator over neighbor IDs that return true for given
+   * predicate over (index, target)
+   * Note - iterator returns reused object, so you need to "use" them,
+   * before calling next() again.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplierWithIndex(
+      final PairPredicate<IntRef, I> toSupply) {
+    return new SupplierFromVertex<I, V, E, Iterator<I>>() {
+      @Override
+      public Iterator<I> get(final Vertex<I, V, E> vertex) {
+        // Every time we return an iterator, we return with a fresh (0) index.
+        return filterWithIndex(
+            new TargetVertexIdIterator<>(vertex), toSupply);
+      }
+    };
+  }
+
+  /**
+   * Returns the elements of {@code unfiltered} that satisfy a
+   * predicate over (index, t).
+   */
+  private static <T> UnmodifiableIterator<T> filterWithIndex(
+      final Iterator<T> unfiltered, final PairPredicate<IntRef, T> predicate) {
+    checkNotNull(unfiltered);
+    checkNotNull(predicate);
+    return new AbstractIterator<T>() {
+      private final IntRef index = new IntRef(0);
+      @Override protected T computeNext() {
+        while (unfiltered.hasNext()) {
+          T element = unfiltered.next();
+          boolean res = predicate.apply(index, element);
+          index.value++;
+          if (res) {
+            return element;
+          }
+        }
+        return endOfData();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java
new file mode 100644
index 0000000..e8bf569
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.internal;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.striping.StripingUtils;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.function.Predicate;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+/**
+ * Piece that sends a message provided through messageProducer to given set of
+ * neighbors, and passes them to messagesConsumer.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ */
+@SuppressWarnings("rawtypes")
+public class SendMessagePiece<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> extends Piece<I, V, E, M, Object> {
+  private final String name;
+  private final Class<M> messageClass;
+  private final SupplierFromVertex<I, V, E, M> messageSupplier;
+  private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier;
+  private final ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer;
+
+  public SendMessagePiece(String name,
+      Class<M> messageClass,
+      SupplierFromVertex<I, V, E, M> messageSupplier,
+      SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
+      ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) {
+    Preconditions.checkNotNull(messageClass);
+    this.name = name;
+    this.messageClass = messageClass;
+    this.messageSupplier = messageSupplier;
+    this.targetsSupplier = targetsSupplier;
+    this.messagesConsumer = messagesConsumer;
+  }
+
+  /**
+   * Stripe message sending computation across multiple stripes, in
+   * each stripe only part of the vertices will receive messages.
+   *
+   * @param stripes Number of stripes
+   * @param stripeSupplier Stripe supplier function, if IDs are Longs, you can
+   *                       use StripingUtils::fastHashStripingPredicate
+   * @return Resulting block
+   */
+  public Block stripeByReceiver(
+      int stripes,
+      Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
+    return StripingUtils.generateStripedBlock(
+        stripes,
+        new Function<Predicate<I>, Block>() {
+          @Override
+          public Block apply(final Predicate<I> stripePredicate) {
+            return FilteringPiece.createReceiveFiltering(
+                new SupplierFromVertex<I, V, E, Boolean>() {
+                  @Override
+                  public Boolean get(Vertex<I, V, E> vertex) {
+                    return stripePredicate.apply(vertex.getId());
+                  }
+                },
+                new SendMessagePiece<>(
+                  name,
+                  messageClass,
+                  messageSupplier,
+                  new SupplierFromVertex<I, V, E, Iterator<I>>() {
+                    @Override
+                    public Iterator<I> get(Vertex<I, V, E> vertex) {
+                      return Iterators.filter(
+                          targetsSupplier.get(vertex),
+                          new com.google.common.base.Predicate<I>() {
+                            @Override
+                            public boolean apply(I targetId) {
+                              return stripePredicate.apply(targetId);
+                            }
+                          });
+                    }
+                  },
+                  messagesConsumer));
+          }
+        },
+        stripeSupplier);
+  }
+
+
+  @Override
+  public VertexSender<I, V, E> getVertexSender(
+      final BlockWorkerSendApi<I, V, E, M> workerApi,
+      Object executionStage) {
+    return new InnerVertexSender() {
+      @Override
+      public void vertexSend(Vertex<I, V, E> vertex) {
+        Iterator<I> targets = targetsSupplier.get(vertex);
+        M message = messageSupplier.get(vertex);
+        if (message != null && targets != null && targets.hasNext()) {
+          workerApi.sendMessageToMultipleEdges(targets, message);
+        }
+      }
+    };
+  }
+
+  @Override
+  public VertexReceiver<I, V, E, M> getVertexReceiver(
+      BlockWorkerReceiveApi<I> workerApi,
+      Object executionStage) {
+    return new InnerVertexReceiver() {
+      @Override
+      public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+        messagesConsumer.apply(vertex, messages);
+      }
+    };
+  }
+
+  @Override
+  public Class<M> getMessageClass() {
+    return messageClass;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java
new file mode 100644
index 0000000..c44ef8d
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.internal;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.striping.StripingUtils;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.function.Predicate;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+/**
+ * Piece that sends a message provided through messageProducer to given set of
+ * neighbors, uses a message combiner and passes them to messagesConsumer.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ */
+public class SendMessageWithCombinerPiece<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends Piece<I, V, E, M, Object> {
+  private final String name;
+  private final MessageCombiner<? super I, M> messageCombiner;
+  private final SupplierFromVertex<I, V, E, M> messageSupplier;
+  private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier;
+  private final ConsumerWithVertex<I, V, E, M> messagesConsumer;
+
+  public SendMessageWithCombinerPiece(String name,
+      MessageCombiner<? super I, M> messageCombiner,
+      SupplierFromVertex<I, V, E, M> messageSupplier,
+      SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier,
+      ConsumerWithVertex<I, V, E, M> messagesConsumer) {
+    Preconditions.checkNotNull(messageCombiner);
+    this.name = name;
+    this.messageCombiner = messageCombiner;
+    this.messageSupplier = messageSupplier;
+    this.targetsSupplier = targetsSupplier;
+    this.messagesConsumer = messagesConsumer;
+  }
+
+  /**
+   * Stripe message sending computation across multiple stripes, in
+   * each stripe only part of the vertices will receive messages.
+   *
+   * @param stripes Number of stripes
+   * @param stripeSupplier Stripe supplier function, if IDs are Longs, you can
+   *                       use StripingUtils::fastHashStripingPredicate
+   * @return Resulting block
+   */
+  public Block stripeByReceiver(
+      int stripes,
+      Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
+    return StripingUtils.generateStripedBlock(
+        stripes,
+        new Function<Predicate<I>, Block>() {
+          @Override
+          public Block apply(final Predicate<I> stripePredicate) {
+            return FilteringPiece.createReceiveFiltering(
+                new SupplierFromVertex<I, V, E, Boolean>() {
+                  @Override
+                  public Boolean get(Vertex<I, V, E> vertex) {
+                    return stripePredicate.apply(vertex.getId());
+                  }
+                },
+                new SendMessageWithCombinerPiece<>(
+                  name,
+                  messageCombiner,
+                  messageSupplier,
+                  new SupplierFromVertex<I, V, E, Iterator<I>>() {
+                    @Override
+                    public Iterator<I> get(Vertex<I, V, E> vertex) {
+                      return Iterators.filter(
+                          targetsSupplier.get(vertex),
+                          new com.google.common.base.Predicate<I>() {
+                            @Override
+                            public boolean apply(I targetId) {
+                              return stripePredicate.apply(targetId);
+                            }
+                          });
+                    }
+                  },
+                  messagesConsumer));
+          }
+        },
+        stripeSupplier);
+  }
+
+  @Override
+  public VertexSender<I, V, E> getVertexSender(
+      final BlockWorkerSendApi<I, V, E, M> workerApi,
+      Object executionStage) {
+    return new InnerVertexSender() {
+      @Override
+      public void vertexSend(Vertex<I, V, E> vertex) {
+        Iterator<I> targets = targetsSupplier.get(vertex);
+        M message = messageSupplier.get(vertex);
+        if (message != null && targets != null && targets.hasNext()) {
+          workerApi.sendMessageToMultipleEdges(targets, message);
+        }
+      }
+    };
+  }
+
+  @Override
+  public VertexReceiver<I, V, E, M> getVertexReceiver(
+      BlockWorkerReceiveApi<I> workerApi,
+      Object executionStage) {
+    return new InnerVertexReceiver() {
+      @Override
+      public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
+        Iterator<M> iter = messages.iterator();
+        M combinedMessage = null;
+        if (iter.hasNext()) {
+          combinedMessage = iter.next();
+          // When message combiner is used, there is never more then one 
message
+          Preconditions.checkArgument(!iter.hasNext());
+        }
+        messagesConsumer.apply(vertex, combinedMessage);
+      }
+    };
+  }
+
+  @Override
+  public MessageCombiner<? super I, M> getMessageCombiner(
+      ImmutableClassesGiraphConfiguration conf) {
+    return messageCombiner;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java
new file mode 100644
index 0000000..6e8b3eb
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Internal implementation of Pieces needed for Pieces utility class.
+ */
+package org.apache.giraph.block_app.library.internal;

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java
new file mode 100644
index 0000000..70e65da
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.iteration;
+
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Piece that increments execution stage iteration.
+ */
+@SuppressWarnings("rawtypes")
+public class IterationCounterPiece extends Piece<WritableComparable,
+    Writable, Writable, Writable, IterationStage> {
+  @Override
+  public IterationStage nextExecutionStage(IterationStage executionStage) {
+    return executionStage.changedIteration(executionStage.getIteration() + 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java
new file mode 100644
index 0000000..ccac323
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.iteration;
+
+
+/**
+ * Execution stage that contains iteration information.
+ * Iteration can be incremented via IterationCounterPiece.
+ */
+public interface IterationStage {
+  int getIteration();
+  IterationStage changedIteration(int iteration);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java
new file mode 100644
index 0000000..2f94b25
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.iteration;
+
+/**
+ * Implementation of IterationStage
+ */
+public class IterationStageImpl implements IterationStage {
+  private final int iteration;
+
+  public IterationStageImpl() {
+    this.iteration = 0;
+  }
+
+  public IterationStageImpl(int iteration) {
+    this.iteration = iteration;
+  }
+
+  @Override
+  public int getIteration() {
+    return iteration;
+  }
+
+  @Override
+  public IterationStage changedIteration(int iteration) {
+    return new IterationStageImpl(iteration);
+  }
+
+  @Override
+  public String toString() {
+    return "IterationStage[" + iteration + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java
new file mode 100644
index 0000000..c33a18f
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for having iteration within execution stage object.
+ */
+package org.apache.giraph.block_app.library.iteration;

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java
new file mode 100644
index 0000000..801150c
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Core library of Pieces and Suppliers, providing most common usages.
+ */
+package org.apache.giraph.block_app.library;

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java
new file mode 100644
index 0000000..34e5fec
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.striping;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.FilteringBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.function.Function;
+import org.apache.giraph.function.Predicate;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.primitive.Obj2IntFunction;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility functions for doing superstep striping.
+ *
+ * We need to make sure that partitioning (which uses mod for distributing
+ * data across workers) is independent from striping itself. So we are using
+ * fastHash function below, taken from https://code.google.com/p/fast-hash/.
+ */
+public class StripingUtils {
+  private StripingUtils() { }
+
+  /* The MIT License
+
+  Copyright (C) 2012 Zilong Tan ([email protected])
+
+  Permission is hereby granted, free of charge, to any person
+  obtaining a copy of this software and associated documentation
+  files (the "Software"), to deal in the Software without
+  restriction, including without limitation the rights to use, copy,
+  modify, merge, publish, distribute, sublicense, and/or sell copies
+  of the Software, and to permit persons to whom the Software is
+  furnished to do so, subject to the following conditions:
+
+  The above copyright notice and this permission notice shall be
+  included in all copies or substantial portions of the Software.
+
+  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+  BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+  ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+  CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+  SOFTWARE.
+  */
+  /**
+   * Returns 32-bit hash of a given value.
+   *
+   * Fast and generally good hashing function, adapted from C++ implementation:
+   * https://code.google.com/p/fast-hash/
+   */
+  public static int fastHash(long h) {
+    h ^= h >> 23;
+    h *= 0x2127599bf4325c37L;
+    h ^= h >> 47;
+    return ((int) (h - (h >> 32))) & 0x7fffffff;
+  }
+
+  /**
+   * Returns number in [0, stripes) range, from given input {@code value}.
+   */
+  public static int fastStripe(long value, int stripes) {
+    return fastHash(value) % stripes;
+  }
+
+  /**
+   * Fast hash-based striping for LongWritable IDs, returns a function
+   * that for a given ID returns it's stripe index.
+   */
+  public static
+  Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) {
+    return new Obj2IntFunction<LongWritable>() {
+      @Override
+      public int apply(LongWritable id) {
+        return fastStripe(id.get(), stripes);
+      }
+    };
+  }
+
+  /**
+   * Fast hash-based striping for LongWritable IDs, returns a function
+   * that for a given stripe index returns a predicate checking whether ID is
+   * in that stripe.
+   */
+  public static
+  Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate(
+      final int stripes) {
+    return new Int2ObjFunction<Predicate<LongWritable>>() {
+      @Override
+      public Predicate<LongWritable> apply(final int stripe) {
+        return new Predicate<LongWritable>() {
+          @Override
+          public boolean apply(LongWritable id) {
+            return fastStripe(id.get(), stripes) == stripe;
+          }
+        };
+      }
+    };
+  }
+
+  /**
+   * Generate striped block, with given number of {@code stripes},
+   * using given {@code blockGenerator} to generate block for each stripe.
+   *
+   * @param stripes Number of stripes
+   * @param blockGenerator Function given predicate representing whether
+   *                       ID is in current stripe, should return Block
+   *                       for current stripe
+   * @return Resulting block
+   */
+  public static Block generateStripedBlock(
+      int stripes,
+      Function<Predicate<LongWritable>, Block> blockGenerator) {
+    return generateStripedBlockImpl(
+        stripes, blockGenerator,
+        StripingUtils.fastHashStripingPredicate(stripes));
+  }
+
+  /**
+   * Generate striped block, with given number of {@code stripes},
+   * using given {@code blockGenerator} to generate block for each stripe,
+   * and using striping based on given {@code stripeSupplier}.
+   *
+   * @param stripes Number of stripes
+   * @param blockGenerator Function given predicate representing whether
+   *                       ID is in current stripe, should return Block
+   *                       for current stripe
+   * @param stripeSupplier Function given number of stripes,
+   *                       generates a function that given stripe index,
+   *                       returns predicate checking whether ID is in that
+   *                       stripe.
+   * @return Resulting block
+   */
+  public static <I extends WritableComparable>
+  Block generateStripedBlock(
+      int stripes,
+      Function<Predicate<I>, Block> blockGenerator,
+      Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
+    return generateStripedBlockImpl(
+        stripes, blockGenerator, stripeSupplier.apply(stripes));
+  }
+
+  /**
+   * Stripe given block, by calling vertexSend only in it's corresponding
+   * stripe. All other methods are called number of stripes times.
+   *
+   * @param stripes Number of stripes
+   * @param block Block to stripe
+   * @return Resulting block
+   */
+  public static Block stripeBlockBySenders(
+      int stripes,
+      Block block) {
+    return generateStripedBlockImpl(
+        stripes,
+        StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block),
+        StripingUtils.fastHashStripingPredicate(stripes));
+  }
+
+  /**
+   * Given a block, creates a function that will given a predicate filter
+   * calls to vertexSend function based on that predicate.
+   *
+   * Useful to be combined with generateStripedBlock to stripe blocks.
+   */
+  public static <I extends WritableComparable> Function<Predicate<I>, Block>
+      createSingleStripeBySendersFunction(final Block block) {
+    return new Function<Predicate<I>, Block>() {
+      @Override
+      public Block apply(final Predicate<I> stripePredicate) {
+        return FilteringBlock.createSendFiltering(
+            new SupplierFromVertex<I, Writable, Writable, Boolean>() {
+              @Override
+              public Boolean get(Vertex<I, Writable, Writable> vertex) {
+                return stripePredicate.apply(vertex.getId());
+              }
+            }, block);
+      }
+    };
+  }
+
+  private static <I extends WritableComparable>
+  Block generateStripedBlockImpl(
+      int stripes,
+      Function<Predicate<I>, Block> blockGenerator,
+      Int2ObjFunction<Predicate<I>> stripeSupplier) {
+    Preconditions.checkArgument(stripes >= 1);
+    if (stripes == 1) {
+      return blockGenerator.apply(new Predicate<I>() {
+        @Override
+        public boolean apply(I input) {
+          return true;
+        }
+      });
+    }
+    Block[] blocks = new Block[stripes];
+    for (int i = 0; i < stripes; i++) {
+      blocks[i] = blockGenerator.apply(stripeSupplier.apply(i));
+    }
+    return new SequenceBlock(blocks);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java
new file mode 100644
index 0000000..6a313a7
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for striping.
+ */
+package org.apache.giraph.block_app.library.striping;

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java 
b/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java
new file mode 100644
index 0000000..6d85e93
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+/**
+ * Function:
+ * (T) -> boolean
+ *
+ * @param <T1> First argument type
+ * @param <T2> Second argument type
+ */
+public interface PairPredicate<T1, T2> extends Serializable {
+  /**
+   * Returns the result of applying this predicate to
+   * {@code input1} and {@code input2}.
+   */
+  boolean apply(T1 input1, T2 input2);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java 
b/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java
new file mode 100644
index 0000000..c515ca0
--- /dev/null
+++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function;
+
+import java.io.Serializable;
+
+/**
+ * Function:
+ * (T) -> boolean
+ * <br>
+ * Specialization of com.google.common.base.Predicate, that is also
+ * Serializable.
+ *
+ * @param <T> Argument type
+ */
+public interface Predicate<T> extends Serializable {
+  /**
+   * Returns the result of applying this predicate to {@code input}.
+   */
+  boolean apply(T input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java
new file mode 100644
index 0000000..d082072
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+import java.io.Serializable;
+
+/**
+ * Primitive specialization of Function:
+ * (int) -> T
+ *
+ * @param <T> Result type
+ */
+public interface Int2ObjFunction<T> extends Serializable {
+  /**
+   * Returns the result of applying this function to given {@code input}.
+   *
+   * The returned object may or may not be a new instance,
+   * depending on the implementation.
+   */
+  T apply(int input);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java
new file mode 100644
index 0000000..6d3d739
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.function.primitive;
+
+import java.io.Serializable;
+
+/**
+ * Primitive specialization of Function:
+ * (F) -> int
+ *
+ * @param <T> Argument type
+ */
+public interface Obj2IntFunction<T> extends Serializable {
+  /**
+   * Returns the result of applying this function to given {@code input}.
+   */
+  int apply(T input);
+}

Reply via email to