http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
new file mode 100644
index 0000000..ede6005
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.giraph;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockOutputApi;
+import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
+import org.apache.giraph.block_app.framework.api.Counter;
+import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
+import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
+import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import 
org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Giraph implementation of BlockMasterApi, that delegates all calls
+ * to MasterCompute.
+ */
+final class BlockMasterApiWrapper implements BlockMasterApi,
+    BlockOutputApi, BlockOutputHandleAccessor {
+  private final MasterCompute master;
+  private final BlockOutputHandle outputHandle;
+
+  public BlockMasterApiWrapper(MasterCompute master,
+                               BlockOutputHandle outputHandle) {
+    this.master = master;
+    this.outputHandle = outputHandle;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() {
+    return master.getConf();
+  }
+
+  @Override
+  public void setStatus(String status) {
+    master.getContext().setStatus(status);
+  }
+
+  @Override
+  public void progress() {
+    master.getContext().progress();
+  }
+
+  @Override
+  public Counter getCounter(String group, String name) {
+    final org.apache.hadoop.mapreduce.Counter counter =
+        master.getContext().getCounter(group, name);
+    return new Counter() {
+      @Override
+      public void increment(long incr) {
+        counter.increment(incr);
+      }
+
+      @Override
+      public void setValue(long value) {
+        counter.setValue(value);
+      }
+    };
+  }
+
+  @Override
+  public <R extends Writable> R getReduced(String name) {
+    return master.getReduced(name);
+  }
+
+  @Override
+  public void broadcast(String name, Writable value) {
+    master.broadcast(name, value);
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp) {
+    master.registerReducer(name, reduceOp);
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
+    master.registerReducer(name, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return master.getAggregatedValue(name);
+  }
+
+  @Override
+  public <A extends Writable>
+  boolean registerAggregator(
+      String name, Class<? extends Aggregator<A>> aggregatorClass
+  ) throws InstantiationException, IllegalAccessException {
+    return master.registerAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public <A extends Writable>
+  boolean registerPersistentAggregator(
+      String name, Class<? extends Aggregator<A>> aggregatorClass
+  ) throws InstantiationException,
+      IllegalAccessException {
+    return master.registerPersistentAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    master.setAggregatedValue(name, value);
+  }
+
+  @Override
+  public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
+    BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
+    master.broadcast(handle.getName(), object);
+    return handle;
+  }
+
+  @Override
+  @Deprecated
+  public long getTotalNumEdges() {
+    return master.getTotalNumEdges();
+  }
+
+  @Override
+  @Deprecated
+  public long getTotalNumVertices() {
+    return master.getTotalNumVertices();
+  }
+
+  @Override
+  public void logToCommandLine(String line) {
+    master.logToCommandLine(line);
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter,
+      OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
+    return outputHandle.<OW, OD>getOutputDesc(confOption);
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
+    return outputHandle.getWriter(confOption);
+  }
+
+  @Override
+  public BlockOutputHandle getBlockOutputHandle() {
+    return outputHandle;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java
new file mode 100644
index 0000000..69cf9f8
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.giraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
+import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+
+/**
+ * MasterCompute class which executes block computation.
+ *
+ * @param <S> Execution stage type
+ */
+public final class BlockMasterCompute<S> extends MasterCompute {
+  private BlockMasterLogic<S> blockMasterLogic = new BlockMasterLogic<>();
+
+  @Override
+  public void initialize() throws InstantiationException,
+      IllegalAccessException {
+    blockMasterLogic.initialize(getConf(), new BlockMasterApiWrapper(this,
+        new BlockOutputHandle(getContext().getJobID().toString(),
+        getConf(), getContext())));
+  }
+
+  @Override
+  public void compute() {
+    BlockWorkerPieces<S> workerPieces =
+        blockMasterLogic.computeNext(getSuperstep());
+    if (workerPieces == null) {
+      haltComputation();
+    } else {
+      BlockWorkerPieces.setNextWorkerPieces(this, workerPieces);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    new KryoWritableWrapper<>(blockMasterLogic).write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    KryoWritableWrapper<BlockMasterLogic<S>> object =
+        new KryoWritableWrapper<>();
+    object.readFields(in);
+    blockMasterLogic = object.get();
+    blockMasterLogic.initializeAfterRead(new BlockMasterApiWrapper(this,
+        new BlockOutputHandle(getContext().getJobID().toString(),
+        getConf(), getContext())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
new file mode 100644
index 0000000..6e839f9
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.giraph;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockOutputApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
+import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
+import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Giraph implementation of BlockWorkerReceiveApi and BlockWorkerSendAPI,
+ * passing all calls to Computation.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message type
+ */
+@SuppressWarnings("rawtypes")
+final class BlockWorkerApiWrapper<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements BlockWorkerReceiveApi<I>, BlockWorkerSendApi<I, V, E, M>,
+    BlockWorkerValueAccessor, WorkerGlobalCommUsage, BlockOutputApi {
+  private final Computation<I, V, E, NoMessage, M> worker;
+
+  public BlockWorkerApiWrapper(Computation<I, V, E, NoMessage, M> worker) {
+    this.worker = worker;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
+    return worker.getConf();
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    worker.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return worker.getAggregatedValue(name);
+  }
+
+  @Override
+  public <B extends Writable> B getBroadcast(String name) {
+    return worker.getBroadcast(name);
+  }
+
+  @Override
+  public void reduce(String name, Object value) {
+    worker.reduce(name, value);
+  }
+
+  @Override
+  public void reduceMerge(String name, Writable value) {
+    worker.reduceMerge(name, value);
+  }
+
+  @Override
+  public void sendMessage(I id, M message) {
+    worker.sendMessage(id, message);
+  }
+
+  @Override
+  public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M message) {
+    worker.sendMessageToAllEdges(vertex, message);
+  }
+
+  @Override
+  public void sendMessageToMultipleEdges(
+      Iterator<I> vertexIdIterator, M message) {
+    worker.sendMessageToMultipleEdges(vertexIdIterator, message);
+  }
+
+  @Override
+  public void addVertexRequest(I id, V value) {
+    try {
+      worker.addVertexRequest(id, value);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
+    try {
+      worker.addVertexRequest(id, value, edges);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void removeVertexRequest(I vertexId) {
+    try {
+      worker.removeVertexRequest(vertexId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
+    try {
+      worker.addEdgeRequest(sourceVertexId, edge);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
+    try {
+      worker.removeEdgesRequest(sourceVertexId, targetVertexId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private BlockWorkerContext getBlockWorkerContext() {
+    return (BlockWorkerContext) worker.getWorkerContext();
+  }
+
+  @Override
+  public Object getWorkerValue() {
+    return getBlockWorkerContext().getWorkerValue();
+  }
+
+  @Override
+  public long getTotalNumEdges() {
+    return worker.getTotalNumEdges();
+  }
+
+  @Override
+  public long getTotalNumVertices() {
+    return worker.getTotalNumVertices();
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
+  OD getOutputDesc(String confOption) {
+    return getBlockWorkerContext().getOutputHandle().<OW, OD>getOutputDesc(
+        confOption);
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
+    return getBlockWorkerContext().getOutputHandle().getWriter(confOption);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
new file mode 100644
index 0000000..1a4f8d8
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.giraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
+import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.writable.kryo.HadoopKryo;
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * WorkerContext that executes receiver and sender blocks passed
+ * into BlockWorkerPieces.
+ */
+public final class BlockWorkerContext extends WorkerContext
+    implements KryoIgnoreWritable {
+  public static final Logger LOG = Logger.getLogger(BlockWorkerContext.class);
+
+  private BlockWorkerContextLogic workerLogic;
+
+  @Override
+  public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+    workerLogic = new BlockWorkerContextLogic();
+    workerLogic.preApplication(new BlockWorkerContextApiWrapper<>(this),
+        new BlockOutputHandle(getContext().getJobID().toString(),
+            getConf(), getContext()));
+  }
+
+  @Override
+  public void preSuperstep() {
+    List<Writable> messages = getAndClearMessagesFromOtherWorkers();
+    BlockWorkerContextApiWrapper<Writable> workerApi =
+        new BlockWorkerContextApiWrapper<>(this);
+    BlockWorkerPieces<Object> workerPieces =
+        BlockWorkerPieces.getNextWorkerPieces(this);
+
+    LOG.info("PassedComputation in " + getSuperstep() +
+        " superstep executing " + workerPieces);
+
+    workerLogic.preSuperstep(
+        workerApi, workerApi, workerPieces, getSuperstep(), messages);
+  }
+
+  @Override
+  public void postSuperstep() {
+    workerLogic.postSuperstep();
+  }
+
+  @Override
+  public void postApplication() {
+    workerLogic.postApplication();
+  }
+
+  public Object getWorkerValue() {
+    return workerLogic.getWorkerValue();
+  }
+
+  public BlockOutputHandle getOutputHandle() {
+    return workerLogic.getOutputHandle();
+  }
+
+  // Cannot extend KryoWritable directly, since WorkerContext is
+  // abstract class, not interface... Additionally conf in parent
+  // class cannot be made transient.
+  // So just add serialization of two individual fields.
+  // (and adding KryoIgnoreWritable to avoid wrapping it twice)
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    HadoopKryo.writeClassAndObject(out, workerLogic);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    workerLogic = HadoopKryo.readClassAndObject(in);
+    workerLogic.getOutputHandle().initialize(getConf(), getContext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
new file mode 100644
index 0000000..c52b6a5
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.giraph;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Giraph implementation of BlockWorkerContextReceiveApi and
+ * BlockWorkerContextSendApi, passing all calls to WorkerContext.
+ *
+ * @param <WM> Worker message type
+ */
+final class BlockWorkerContextApiWrapper<WM extends Writable>
+    implements BlockWorkerContextReceiveApi, BlockWorkerContextSendApi<WM> {
+  private final WorkerContext workerContext;
+
+  public BlockWorkerContextApiWrapper(WorkerContext workerContext) {
+    this.workerContext = workerContext;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() {
+    return workerContext.getConf();
+  }
+
+  @Override
+  public int getWorkerCount() {
+    return workerContext.getWorkerCount();
+  }
+
+  @Override
+  public int getMyWorkerIndex() {
+    return workerContext.getMyWorkerIndex();
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerContext.getAggregatedValue(name);
+  }
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerContext.aggregate(name, value);
+  }
+
+  @Override
+  public void sendMessageToWorker(WM message, int workerIndex) {
+    workerContext.sendMessageToWorker(message, workerIndex);
+  }
+
+  @Override
+  public <B extends Writable> B getBroadcast(String name) {
+    return workerContext.getBroadcast(name);
+  }
+
+  @Override
+  public long getTotalNumEdges() {
+    return workerContext.getTotalNumEdges();
+  }
+
+  @Override
+  public long getTotalNumVertices() {
+    return workerContext.getTotalNumVertices();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java
new file mode 100644
index 0000000..e20fb8e
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Giraph implementation of graph processing system API used by
+ * Blocks Framework.
+ */
+package org.apache.giraph.block_app.framework.api.giraph;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java
new file mode 100644
index 0000000..c10e5d2
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Interfaces representing full API to the underlying graph processing system.
+ *
+ * Framework implementation is fully contained within package
+ * org.apache.giraph.block_app.framework, given implementation of interfaces
+ * defined here.
+ *
+ * He have two such implementations:
+ * - one relying on Giraph, distributed graph processing system,
+ *   connecting all methods to it's internals
+ * - one having a fully contained local implementation, executing applications
+ *   on a single machine. Avoiding overheads of Giraph being distributed,
+ *   it allows very efficient evaluation on small graphs, especially useful for
+ *   fast unit tests.
+ *
+ * You could potentially use a different graph processing system, to execute
+ * any Block Application, by implementing these interfaces.
+ */
+package org.apache.giraph.block_app.framework.api;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
new file mode 100644
index 0000000..6d5287c
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.function.Consumer;
+
+/**
+ * Composable unit of execution. Used to combine other Blocks into
+ * bigger units. Each Piece represents a Block itself.
+ *
+ * Execution is represented as an iterator across Pieces.
+ *
+ * The whole application run is represented by a single block at the end.
+ */
+@SuppressWarnings("rawtypes")
+public interface Block extends Iterable<AbstractPiece> {
+  /**
+   * Create iterator representing all pieces needed to be executed
+   * in this block.
+   *
+   * After Iterator.next call returns, master compute of returned Piece is
+   * guaranteed to be called before calling hasNext/next on the iterator.
+   * (allows for iterators logic to depend on the execution dynamically,
+   * and not be only static)
+   */
+  @Override
+  Iterator<AbstractPiece> iterator();
+
+  /**
+   * Calls consumer for each Piece:
+   * - in no particular order
+   * - potentially calling multiple times on same Piece
+   * - even if Piece might never be returned in the iterator
+   * - it will be called at least once for every piece that is
+   *   going to be returned by iterator
+   *
+   * Can be used for static analysis/introspection of the block,
+   * without actually executing them.
+   */
+  void forAllPossiblePieces(Consumer<AbstractPiece> consumer);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
new file mode 100644
index 0000000..1a57402
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.function.Consumer;
+
+/**
+ * Block without any pieces
+ */
+@SuppressWarnings("rawtypes")
+public final class EmptyBlock implements Block {
+  @Override
+  public Iterator<AbstractPiece> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
new file mode 100644
index 0000000..5631417
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * Block which filters out calls to vertexSend/vertexReceive functions
+ * of all pieces in a given block.
+ * Filtering happens based on toCallSend and toCallReceive suppliers
+ * that are passed in, as every piece is just wrapped with FilteringPiece.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public final class FilteringBlock<I extends WritableComparable,
+        V extends Writable, E extends Writable>
+    implements Block {
+  private final SupplierFromVertex<I, V, E, Boolean> toCallSend;
+  private final SupplierFromVertex<I, V, E, Boolean> toCallReceive;
+  private final Block block;
+
+  /**
+   * Creates filtering block which uses passed {@code toCallSend} to filter
+   * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter
+   * calls to {@code vertexReceive}, on all pieces within passed {@code block}.
+   */
+  public FilteringBlock(
+      SupplierFromVertex<I, V, E, Boolean> toCallSend,
+      SupplierFromVertex<I, V, E, Boolean> toCallReceive,
+      Block block) {
+    this.toCallSend = toCallSend;
+    this.toCallReceive = toCallReceive;
+    this.block = block;
+  }
+
+  /**
+   * Creates filtering block, where both vertexSend and vertexReceive is
+   * filtered based on same supplier.
+   */
+  public FilteringBlock(
+      SupplierFromVertex<I, V, E, Boolean> toCallSendAndReceive, Block block) {
+    this(toCallSendAndReceive, toCallSendAndReceive, block);
+  }
+
+  /**
+   * Creates filtering block, that filters only vertexReceive function,
+   * and always calls vertexSend function.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Block createReceiveFiltering(
+      SupplierFromVertex<I, V, E, Boolean> toCallReceive,
+      Block innerBlock) {
+    return new FilteringBlock<>(null, toCallReceive, innerBlock);
+  }
+
+  /**
+   * Creates filtering block, that filters only vertexSend function,
+   * and always calls vertexReceive function.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  Block createSendFiltering(
+      SupplierFromVertex<I, V, E, Boolean> toCallSend,
+      Block innerBlock) {
+    return new FilteringBlock<>(toCallSend, null, innerBlock);
+  }
+
+  @Override
+  public Iterator<AbstractPiece> iterator() {
+    return Iterators.transform(
+        block.iterator(),
+        new Function<AbstractPiece, AbstractPiece>() {
+          @Override
+          public AbstractPiece apply(AbstractPiece input) {
+            return new FilteringPiece<>(toCallSend, toCallReceive, input);
+          }
+        });
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    block.forAllPossiblePieces(consumer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
new file mode 100644
index 0000000..e73392d
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.Supplier;
+
+/**
+ * Block that executes one of two branches based on a condition
+ */
+@SuppressWarnings("rawtypes")
+public final class IfBlock implements Block {
+  private final Block thenBlock;
+  private final Block elseBlock;
+  private final Supplier<Boolean> condition;
+
+  public IfBlock(
+      Supplier<Boolean> condition, Block thenBlock, Block elseBlock) {
+    this.condition = condition;
+    this.thenBlock = thenBlock;
+    this.elseBlock = elseBlock;
+  }
+
+  public IfBlock(Supplier<Boolean> condition, Block thenBlock) {
+    this.condition = condition;
+    this.thenBlock = thenBlock;
+    this.elseBlock = new EmptyBlock();
+  }
+
+  @Override
+  public Iterator<AbstractPiece> iterator() {
+    if (Boolean.TRUE.equals(condition.get())) {
+      return thenBlock.iterator();
+    } else {
+      return elseBlock.iterator();
+    }
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    thenBlock.forAllPossiblePieces(consumer);
+    elseBlock.forAllPossiblePieces(consumer);
+  }
+
+  @Override
+  public String toString() {
+    if (elseBlock instanceof EmptyBlock) {
+      return "IfBlock(" + thenBlock + ")";
+    }
+    return "IfBlock(" + thenBlock + " , " + elseBlock + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
new file mode 100644
index 0000000..9f4f4a0
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.primitive.IntSupplier;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Block that repeats another block given number of times.
+ */
+@SuppressWarnings("rawtypes")
+public final class RepeatBlock implements Block {
+  private final Block block;
+  private final IntSupplier repeatTimes;
+
+  public RepeatBlock(final int repeatTimes, Block block) {
+    this.block = block;
+    this.repeatTimes = new IntSupplier() {
+      @Override
+      public int get() {
+        return repeatTimes;
+      }
+    };
+  }
+
+  /**
+   * Creates a repeat block, that before starting execution takes number of
+   * iterations from the given supplier.
+   *
+   * This allows number of iterations to be dynamic, and depend on
+   * execution that happens before.
+   * Note - it doesn't allow for number of repetitions to change during the
+   * loop itself - as it is supplier is called only when this block gets
+   * its turn.
+   */
+  public RepeatBlock(IntSupplier repeatTimes, Block block) {
+    this.block = block;
+    this.repeatTimes = repeatTimes;
+  }
+
+  /**
+   * Create a repeat block that executes unlimited number of times.
+   *
+   * Should rarely be used, as it will cause application never to finish,
+   * unless other unconventional ways of termination are used.
+   */
+  public static Block unlimited(Block block) {
+    return new RepeatBlock(Integer.MAX_VALUE, block);
+  }
+
+  @Override
+  public Iterator<AbstractPiece> iterator() {
+    return Iterables.concat(
+        Collections.nCopies(repeatTimes.get(), block)).iterator();
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    block.forAllPossiblePieces(consumer);
+  }
+
+  @Override
+  public String toString() {
+    return "RepeatBlock(" + repeatTimes + " * " + block + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
new file mode 100644
index 0000000..13e8833
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.Supplier;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+
+/**
+ * Block that repeats another block until toQuit supplier returns true,
+ * but at most given number of times.
+ *
+ * If toQuit returns true on first run, block is not going
+ * to be executed at all.
+ */
+@SuppressWarnings("rawtypes")
+public final class RepeatUntilBlock implements Block {
+  private final Block block;
+  private final int repeatTimes;
+  private final Supplier<Boolean> toQuit;
+
+  public RepeatUntilBlock(
+      int repeatTimes, Block block, Supplier<Boolean> toQuit) {
+    this.block = block;
+    this.repeatTimes = repeatTimes;
+    this.toQuit = toQuit;
+  }
+
+  /**
+   * Repeat unlimited number of times, until toQuit supplier returns true.
+   */
+  public static Block unlimited(Block block, Supplier<Boolean> toQuit) {
+    return new RepeatUntilBlock(Integer.MAX_VALUE, block, toQuit);
+  }
+
+  @Override
+  public Iterator<AbstractPiece> iterator() {
+    // nCopies uses constant memory, creating a looped list with single element
+    final Iterator<AbstractPiece> repeatIterator =
+        Iterables.concat(Collections.nCopies(repeatTimes, block)).iterator();
+    return new AbstractIterator<AbstractPiece>() {
+      @Override
+      protected AbstractPiece computeNext() {
+        if (Boolean.TRUE.equals(toQuit.get()) || !repeatIterator.hasNext()) {
+          return endOfData();
+        }
+
+        return repeatIterator.next();
+      }
+    };
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    block.forAllPossiblePieces(consumer);
+  }
+
+  @Override
+  public String toString() {
+    return "RepeatUntilBlock(" + repeatTimes + " * " + block + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
new file mode 100644
index 0000000..d768f0b
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.block;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.function.Consumer;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Block that executes provided blocks sequentially.
+ */
+@SuppressWarnings("rawtypes")
+public final class SequenceBlock implements Block {
+  private final Block[] blocks;
+
+  public SequenceBlock(Block... blocks) {
+    this.blocks = blocks.clone();
+  }
+
+  public SequenceBlock(List<? extends Block> blocks) {
+    this.blocks = blocks.toArray(new Block[blocks.size()]);
+  }
+
+  @Override
+  public Iterator<AbstractPiece> iterator() {
+    return Iterables.concat(Arrays.asList(blocks)).iterator();
+  }
+
+  @Override
+  public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
+    for (Block block : blocks) {
+      block.forAllPossiblePieces(consumer);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "SequenceBlock" + Arrays.toString(blocks);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java
new file mode 100644
index 0000000..64adc35
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Block interface as a composable unit of execution, and its common
+ * implementations.
+ */
+package org.apache.giraph.block_app.framework.block;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
new file mode 100644
index 0000000..6a2fb39
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.internal;
+
+import java.lang.reflect.Field;
+
+import org.apache.giraph.block_app.framework.api.StatusReporter;
+
+/** Utility class for Blocks Framework related counters */
+public class BlockCounters {
+  public static final String GROUP = "Blocks Framework";
+
+  private BlockCounters() { }
+
+  /**
+   * Takes all fields from stage object, and puts them into counters,
+   * if possible.
+   * Only fields that are convertible to long via widening are set
+   * (i.e. long/int/short/byte)
+   */
+  public static void setStageCounters(
+      String prefix, Object stage, StatusReporter reporter) {
+    if (stage != null && reporter != null) {
+      Class<?> clazz = stage.getClass();
+
+      while (clazz != null) {
+        Field[] fields = clazz.getDeclaredFields();
+
+        Field.setAccessible(fields, true);
+        for (Field field : fields) {
+          try {
+            long value = field.getLong(stage);
+            reporter.getCounter(
+                GROUP, prefix + field.getName()).setValue(value);
+
+          // CHECKSTYLE: stop EmptyBlock - ignore any exceptions
+          } catch (IllegalArgumentException | IllegalAccessException e) {
+          }
+          // CHECKSTYLE: resume EmptyBlock
+        }
+        clazz = clazz.getSuperclass();
+      }
+    }
+  }
+
+  public static void setMasterTimeCounter(
+      PairedPieceAndStage<?> masterPiece, long superstep,
+      long millis, StatusReporter reporter) {
+    reporter.getCounter(
+        GROUP + " Master Timers",
+        String.format(
+            "In %6.1f %s (s)", superstep - 0.5, masterPiece.getPiece())
+    ).setValue(millis / 1000);
+  }
+
+  public static void setWorkerTimeCounter(
+      BlockWorkerPieces<?> workerPieces, long superstep,
+      long millis, StatusReporter reporter) {
+    reporter.getCounter(
+        GROUP + " Worker Timers",
+        String.format("In %6d %s (s)", superstep, workerPieces.toStringShort())
+    ).setValue(millis / 1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
new file mode 100644
index 0000000..3b87372
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.internal;
+
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.BlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.function.Consumer;
+import org.apache.log4j.Logger;
+import org.python.google.common.base.Preconditions;
+
+/**
+ * Block execution logic on master, iterating over Pieces of the
+ * application Block, executing master logic, and providing what needs to be
+ * executed on the workers.
+ *
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings("rawtypes")
+public class BlockMasterLogic<S> {
+  private static final Logger LOG = Logger.getLogger(BlockMasterLogic.class);
+
+  private Iterator<AbstractPiece> pieceIterator;
+  private PairedPieceAndStage<S> previousPiece;
+  private transient BlockMasterApi masterApi;
+  private long lastTimestamp = -1;
+  private BlockWorkerPieces previousWorkerPieces;
+  private boolean computationDone;
+
+  public void initialize(
+      GiraphConfiguration conf, final BlockMasterApi masterApi)
+    throws InstantiationException, IllegalAccessException {
+    this.masterApi = masterApi;
+    this.computationDone = false;
+
+    BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
+    Block executionBlock = factory.createBlock(conf);
+    LOG.info("Executing application - " + executionBlock);
+
+    // We register all possible aggregators at the beginning
+    executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
+      private final HashSet<AbstractPiece> registeredPieces = new HashSet<>();
+      @SuppressWarnings("deprecation")
+      @Override
+      public void apply(AbstractPiece piece) {
+        // no need to regiser the same piece twice.
+        if (registeredPieces.add(piece)) {
+          try {
+            piece.registerAggregators(masterApi);
+          } catch (InstantiationException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    });
+
+    pieceIterator = executionBlock.iterator();
+    // Invariant is that ReceiveWorkerPiece of previousPiece has already been
+    // executed and that previousPiece.nextExecutionStage() should be used for
+    // iterating. So passing piece as null, and initial state as current state,
+    // so that nothing get's executed in first half, and calculateNextState
+    // returns initial state.
+    previousPiece = new PairedPieceAndStage<>(
+        null, factory.createExecutionStage(conf));
+  }
+
+  /**
+   * Initialize object after deserializing it.
+   * BlockMasterApi is not serializable, so it is transient, and set via this
+   * method afterwards.
+   */
+  public void initializeAfterRead(BlockMasterApi masterApi) {
+    this.masterApi = masterApi;
+  }
+
+  /**
+   * Executes operations on master (master compute and registering reducers),
+   * and calculates next pieces to be exectued on workers.
+   *
+   * @param superstep Current superstep
+   * @return Next BlockWorkerPieces to be executed on workers, or null
+   *         if computation should be halted.
+   */
+  public BlockWorkerPieces<S> computeNext(long superstep) {
+    long beforeMaster = System.currentTimeMillis();
+    if (lastTimestamp != -1) {
+      BlockCounters.setWorkerTimeCounter(
+          previousWorkerPieces, superstep - 1,
+          beforeMaster - lastTimestamp, masterApi);
+    }
+
+    if (previousPiece == null) {
+      postApplication();
+      return null;
+    } else {
+      LOG.info(
+          "Master executing " + previousPiece + ", in superstep " + superstep);
+      previousPiece.masterCompute(masterApi);
+      ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
+          returnAllWriters();
+      long afterMaster = System.currentTimeMillis();
+
+      if (previousPiece.getPiece() != null) {
+        BlockCounters.setMasterTimeCounter(
+            previousPiece, superstep, afterMaster - beforeMaster, masterApi);
+      }
+
+      PairedPieceAndStage<S> nextPiece;
+      if (pieceIterator.hasNext()) {
+        nextPiece = new PairedPieceAndStage<S>(
+            pieceIterator.next(), previousPiece.nextExecutionStage());
+        nextPiece.registerReducers(masterApi);
+      } else {
+        nextPiece = null;
+      }
+      BlockCounters.setStageCounters(
+          "Master finished stage: ", previousPiece.getExecutionStage(),
+          masterApi);
+      LOG.info(
+          "Master passing next " + nextPiece + ", in superstep " + superstep);
+
+      // if there is nothing more to compute, no need for additional superstep
+      // this can only happen if application uses no pieces.
+      BlockWorkerPieces<S> result;
+      if (previousPiece.getPiece() == null && nextPiece == null) {
+        postApplication();
+        result = null;
+      } else {
+        result = new BlockWorkerPieces<>(previousPiece, nextPiece);
+        LOG.info("Master in " + superstep + " superstep passing " +
+            result + " to be executed");
+      }
+
+      previousPiece = nextPiece;
+      lastTimestamp = afterMaster;
+      previousWorkerPieces = result;
+      return result;
+    }
+  }
+
+  /**
+   * Clean up any master state, after application has finished.
+   */
+  private void postApplication() {
+    ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
+        closeAllWriters();
+    Preconditions.checkState(!computationDone);
+    computationDone = true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
new file mode 100644
index 0000000..8b8e174
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.internal;
+
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Block execution logic on WorkerContext.
+ */
+@SuppressWarnings({ "rawtypes" })
+public class BlockWorkerContextLogic {
+  public static final Logger LOG =
+      Logger.getLogger(BlockWorkerContextLogic.class);
+
+  private Object workerValue;
+  private BlockWorkerPieces workerPieces;
+  private BlockOutputHandle outputHandle;
+
+  private transient BlockWorkerContextSendApi sendApi;
+
+  public BlockWorkerContextLogic() {
+  }
+
+  public void preApplication(BlockWorkerContextApi api,
+      BlockOutputHandle outputHandle) {
+    workerValue =
+        BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.newInstance(api.getConf());
+    this.outputHandle = outputHandle;
+  }
+
+  public Object getWorkerValue() {
+    return workerValue;
+  }
+
+  public BlockOutputHandle getOutputHandle() {
+    return outputHandle;
+  }
+
+  @SuppressWarnings("unchecked")
+  public void preSuperstep(
+      BlockWorkerContextReceiveApi receiveApi,
+      BlockWorkerContextSendApi sendApi,
+      BlockWorkerPieces workerPieces, long superstep,
+      List<Writable> messages) {
+    LOG.info("Worker executing " + workerPieces + " in " + superstep +
+        " superstep");
+    this.sendApi = sendApi;
+    this.workerPieces = workerPieces;
+    if (workerPieces.getReceiver() != null) {
+      workerPieces.getReceiver().workerContextReceive(
+          receiveApi, workerValue, messages);
+    }
+  }
+
+  public void postSuperstep() {
+    if (workerPieces.getSender() != null) {
+      workerPieces.getSender().workerContextSend(sendApi, workerValue);
+    }
+    workerPieces = null;
+    sendApi = null;
+    outputHandle.returnAllWriters();
+  }
+
+  public void postApplication() {
+    outputHandle.closeAllWriters();
+    // TODO add support through conf for postApplication, if needed.
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java
new file mode 100644
index 0000000..844160c
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.internal;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import 
org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
+import 
org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.graph.Vertex;
+
+/**
+ * Block execution logic on workers.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class BlockWorkerLogic {
+  private final BlockWorkerPieces pieces;
+
+  private transient VertexReceiver receiveFunctions;
+  private transient InnerVertexSender sendFunctions;
+
+  public BlockWorkerLogic(BlockWorkerPieces pieces) {
+    this.pieces = pieces;
+  }
+
+  public void preSuperstep(
+      BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) {
+    if (pieces.getReceiver() != null) {
+      receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi);
+    }
+    if (pieces.getSender() != null) {
+      sendFunctions = pieces.getSender().getVertexSender(sendApi);
+    }
+  }
+
+  public void compute(Vertex vertex, Iterable messages) {
+    if (receiveFunctions != null) {
+      receiveFunctions.vertexReceive(vertex, messages);
+    }
+    if (sendFunctions != null) {
+      sendFunctions.vertexSend(vertex);
+    }
+  }
+
+  public void postSuperstep() {
+    if (receiveFunctions instanceof VertexPostprocessor) {
+      ((VertexPostprocessor) receiveFunctions).postprocess();
+    }
+    if (sendFunctions != null) {
+      sendFunctions.postprocess();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java
new file mode 100644
index 0000000..3b38cfa
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.internal;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.conf.DefaultMessageClasses;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.factories.DefaultMessageValueFactory;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Pair of pieces to be executed on workers in a superstep
+ *
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class BlockWorkerPieces<S> {
+  private static final Logger LOG = Logger.getLogger(BlockWorkerPieces.class);
+
+  /** Aggregator holding next worker computation */
+  private static final
+  String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces";
+
+  private final PairedPieceAndStage<S> receiver;
+  private final PairedPieceAndStage<S> sender;
+
+  public BlockWorkerPieces(
+      PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender) {
+    this.receiver = receiver;
+    this.sender = sender;
+  }
+
+  public PairedPieceAndStage<S> getReceiver() {
+    return receiver;
+  }
+
+  public PairedPieceAndStage<S> getSender() {
+    return sender;
+  }
+
+  public MessageClasses getOutgoingMessageClasses(
+      ImmutableClassesGiraphConfiguration conf) {
+    MessageClasses messageClasses;
+    if (sender == null || sender.getPiece() == null) {
+      messageClasses = new DefaultMessageClasses(
+          NoMessage.class,
+          DefaultMessageValueFactory.class,
+          null,
+          MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION);
+    } else {
+      messageClasses = sender.getPiece().getMessageClasses(conf);
+    }
+
+    messageClasses.verifyConsistent(conf);
+    return messageClasses;
+  }
+
+  @Override
+  public String toString() {
+    return "[receiver=" + receiver + ",sender=" + sender + "]";
+  }
+
+  public String toStringShort() {
+    String receiverString =
+        Objects.toString(receiver != null ? receiver.getPiece() : null);
+    String senderString =
+        Objects.toString(sender != null ? sender.getPiece() : null);
+    if (receiverString.equals(senderString)) {
+      return "[receiver&sender=" + receiverString + "]";
+    } else {
+      return "[receiver=" + receiverString + ",sender=" + senderString + "]";
+    }
+  }
+
+  /**
+   * Sets which WorkerComputation is going to be executed in the next 
superstep.
+   */
+  public static <S> void setNextWorkerPieces(
+      MasterCompute master, BlockWorkerPieces<S> nextWorkerPieces) {
+    Writable toBroadcast = new KryoWritableWrapper<>(nextWorkerPieces);
+    byte[] data = WritableUtils.toByteArrayUnsafe(toBroadcast);
+
+    // TODO: extract splitting logic into common utility
+    int overhead = 4096;
+    int singleSize = Math.max(
+        overhead,
+        GiraphConstants.MAX_MSG_REQUEST_SIZE.get(master.getConf()) - overhead);
+
+    ArrayList<byte[]> splittedData = new ArrayList<>();
+    if (data.length < singleSize) {
+      splittedData.add(data);
+    } else {
+      for (int start = 0; start < data.length; start += singleSize) {
+        splittedData.add(Arrays.copyOfRange(
+            data, start, Math.min(data.length, start + singleSize)));
+      }
+    }
+
+    LOG.info("Next worker piece - total serialized size: " + data.length +
+        ", split into " + splittedData.size());
+    master.getContext().getCounter(
+        "PassedWorker Stats", "total serialized size")
+        .increment(data.length);
+    master.getContext().getCounter(
+        "PassedWorker Stats", "split parts")
+        .increment(splittedData.size());
+
+    master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size()));
+
+    for (int i = 0; i < splittedData.size(); i++) {
+      master.broadcast(NEXT_WORKER_PIECES + "_part_" + i,
+          KryoWritableWrapper.wrapIfNeeded(splittedData.get(i)));
+    }
+
+    master.setOutgoingMessageClasses(
+        nextWorkerPieces.getOutgoingMessageClasses(master.getConf()));
+  }
+
+  public static <S> BlockWorkerPieces<S> getNextWorkerPieces(
+      WorkerGlobalCommUsage worker) {
+    int splits = worker.<IntWritable>getBroadcast(NEXT_WORKER_PIECES).get();
+
+    int totalLength = 0;
+    ArrayList<byte[]> splittedData = new ArrayList<>();
+    for (int i = 0; i < splits; i++) {
+      byte[] cur = KryoWritableWrapper.<byte[]>unwrapIfNeeded(
+          worker.getBroadcast(NEXT_WORKER_PIECES + "_part_" + i));
+      splittedData.add(cur);
+      totalLength += cur.length;
+    }
+
+    byte[] merged;
+    if (splits == 1) {
+      merged = splittedData.get(0);
+    } else {
+      merged = new byte[totalLength];
+      int index = 0;
+      for (int i = 0; i < splits; i++) {
+        System.arraycopy(
+            splittedData.get(i), 0, merged, index, splittedData.get(i).length);
+        index += splittedData.get(i).length;
+      }
+    }
+
+    KryoWritableWrapper<BlockWorkerPieces<S>> wrapper =
+        new KryoWritableWrapper<>();
+    WritableUtils.fromByteArrayUnsafe(
+        merged, wrapper, new UnsafeReusableByteArrayInput());
+    return wrapper.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java
new file mode 100644
index 0000000..90fe83e
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.internal;
+
+import java.util.List;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.piece.AbstractPiece;
+import 
org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Object holding piece with it's corresponding execution stage.
+ *
+ * @param <S> Execution stage type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class PairedPieceAndStage<S> {
+  private final AbstractPiece piece;
+  private final S executionStage;
+
+  public PairedPieceAndStage(AbstractPiece piece, S executionStage) {
+    this.piece = piece;
+    this.executionStage = executionStage;
+  }
+
+  public S nextExecutionStage() {
+    // if piece is null, then it cannot change the execution stage
+    return piece != null ?
+      (S) piece.nextExecutionStage(executionStage) : executionStage;
+  }
+
+  public S getExecutionStage() {
+    return executionStage;
+  }
+
+  public void registerReducers(BlockMasterApi masterApi) {
+    if (piece != null) {
+      piece.wrappedRegisterReducers(masterApi, executionStage);
+    }
+  }
+
+  public InnerVertexSender getVertexSender(BlockWorkerSendApi sendApi) {
+    if (piece != null) {
+      return piece.getWrappedVertexSender(sendApi, executionStage);
+    }
+    return null;
+  }
+
+  public void masterCompute(BlockMasterApi masterApi) {
+    if (piece != null) {
+      piece.masterCompute(masterApi, executionStage);
+    }
+  }
+
+  public VertexReceiver getVertexReceiver(
+      BlockWorkerReceiveApi receiveApi) {
+    if (piece != null) {
+      return piece.getVertexReceiver(receiveApi, executionStage);
+    }
+    return null;
+  }
+
+  public void workerContextSend(
+      BlockWorkerContextSendApi workerContextApi, Object workerValue) {
+    if (piece != null) {
+      piece.workerContextSend(workerContextApi, executionStage, workerValue);
+    }
+  }
+
+  public void workerContextReceive(
+      BlockWorkerContextReceiveApi workerContextApi,
+      Object workerValue, List<Writable> workerMessages) {
+    if (piece != null) {
+      piece.workerContextReceive(
+          workerContextApi, executionStage, workerValue, workerMessages);
+    }
+  }
+
+  /**
+   * @return the piece
+   */
+  public AbstractPiece getPiece() {
+    return piece;
+  }
+
+  @Override
+  public String toString() {
+    return "Piece " + piece + " in stage " + executionStage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java
new file mode 100644
index 0000000..3ebe8f7
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementation of execution logic, guiding internal execution of
+ * Block Application.
+ */
+package org.apache.giraph.block_app.framework.internal;

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
new file mode 100644
index 0000000..6f2a3dd
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Output description
+ *
+ * @param <OW> Writer type
+ */
+public interface BlockOutputDesc<OW extends BlockOutputWriter> {
+  /**
+   * Initialize output and perform any necessary checks
+   *
+   * @param jobIdentifier Unique identifier of the job
+   * @param conf Configuration
+   */
+  void initializeAndCheck(String jobIdentifier, Configuration conf);
+
+  /**
+   * Create writer
+   *
+   * @param conf Configuration
+   * @param hadoopProgressable Progressable to call progress on
+   * @return Writer
+   */
+  OW createOutputWriter(Configuration conf, Progressable hadoopProgressable);
+
+  /**
+   * Commit everything
+   */
+  void commit();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java
new file mode 100644
index 0000000..818a311
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.output;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.utils.ConfigurationObjectUtils;
+import org.apache.giraph.utils.DefaultOutputCommitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Hadoop output format to use with block output.
+ * It keeps track of all registered outputs, and knows how to create them.
+ */
+public class BlockOutputFormat extends BspOutputFormat {
+  private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption(
+      "digraph.outputConfOptions", "",
+      "List of conf options for outputs used");
+
+  public static <OD> void addOutputDesc(OD outputDesc, String confOption,
+      GiraphConfiguration conf) {
+    GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf,
+        BlockOutputFormat.class);
+    String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf);
+    if (!currentOutputs.isEmpty()) {
+      currentOutputs = currentOutputs + ",";
+    }
+    OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption);
+    ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf);
+  }
+
+  private static String[] getOutputConfOptions(Configuration conf) {
+    String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf);
+    return outputConfOptions.isEmpty() ?
+        new String[0] : outputConfOptions.split(",");
+  }
+
+  public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
+  OD createInitAndCheckOutputDesc(String confOption, Configuration conf,
+      String jobIdentifier) {
+    OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf);
+    outputDesc.initializeAndCheck(jobIdentifier, conf);
+    return outputDesc;
+  }
+
+  public static Map<String, BlockOutputDesc>
+  createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) {
+    String[] outputConfOptions = getOutputConfOptions(conf);
+    Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length);
+    for (String outputConfOption : outputConfOptions) {
+      ret.put(outputConfOption,
+          createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier));
+    }
+    return ret;
+  }
+
+  public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap(
+      JobContext jobContext) {
+    return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(),
+        jobContext.getJobID().toString());
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext jobContext)
+      throws IOException, InterruptedException {
+    createInitAndCheckOutputDescsMap(jobContext);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new DefaultOutputCommitter() {
+      @Override
+      public void commit(JobContext jobContext) throws IOException {
+        Map<String, BlockOutputDesc> map =
+            createInitAndCheckOutputDescsMap(jobContext);
+        for (BlockOutputDesc outputDesc : map.values()) {
+          outputDesc.commit();
+        }
+      }
+    };
+  }
+}

Reply via email to