Repository: giraph
Updated Branches:
  refs/heads/trunk ad27a2914 -> 79e7f1c98


[GIRAPH-1013] Add local (single machine) implementation

Summary:
This allows you to run application written in Blocks Framework
very efficiently on single machine.

Specifically this is interesting for having fast unit tests.

Test Plan:
mvn clean install -Phadoop_facebook

Making TargetVertexIdIterator public is in addition to just adding classes to 
open source

Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov

Reviewed By: sergey.edunov

Differential Revision: https://reviews.facebook.net/D39717


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/79e7f1c9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/79e7f1c9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/79e7f1c9

Branch: refs/heads/trunk
Commit: 79e7f1c98575a473d12022e198679614b1fe9029
Parents: ad27a29
Author: Igor Kabiljo <[email protected]>
Authored: Mon Jun 8 11:48:28 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Fri Jun 12 20:34:16 2015 -0700

----------------------------------------------------------------------
 .../api/local/InternalAggregators.java          | 142 ++++++
 .../framework/api/local/InternalApi.java        | 432 +++++++++++++++++++
 .../api/local/InternalMessageStore.java         | 423 ++++++++++++++++++
 .../framework/api/local/LocalBlockRunner.java   | 247 +++++++++++
 .../framework/api/local/VertexSaver.java        |  34 ++
 .../framework/api/local/package-info.java       |  26 ++
 .../apache/giraph/comm/SendMessageCache.java    |   7 +-
 .../writable/kryo/KryoWritableWrapper.java      |  13 +
 8 files changed, 1321 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java
new file mode 100644
index 0000000..dbcc9f1
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.master.MasterGlobalCommUsage;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Internal aggregators implementation
+ */
+@SuppressWarnings("unchecked")
+class InternalAggregators
+    implements MasterGlobalCommUsage, WorkerGlobalCommUsage {
+  private final boolean runAllChecks;
+
+  /** Map of reducers registered for the next worker computation */
+  private final Map<String, Reducer<Object, Writable>> reducerMap =
+      Maps.newHashMap();
+  /** Map of values to be sent to workers for next computation */
+  private final Map<String, Writable> broadcastMap =
+      Maps.newHashMap();
+  /** Values reduced from previous computation */
+  private final Map<String, Writable> reducedMap =
+      Maps.newHashMap();
+
+  public InternalAggregators(boolean runAllChecks) {
+    this.runAllChecks = runAllChecks;
+  }
+
+  private static <T> T getOrThrow(
+      Map<String, T> map, String mapName, String key) {
+    T value = map.get(key);
+    if (value == null) {
+      throw new IllegalArgumentException(
+          key + " not present in " + mapName);
+    }
+    return value;
+  }
+
+  @Override
+  public void broadcast(String name, Writable value) {
+    broadcastMap.put(name, value);
+  }
+
+  @Override
+  public <B extends Writable> B getBroadcast(String name) {
+    return (B) getOrThrow(broadcastMap, "broadcastMap", name);
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp) {
+    registerReducer(name, reduceOp, reduceOp.createInitialValue());
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp,
+      R globalInitialValue) {
+    if (reducerMap.containsKey(name)) {
+      throw new IllegalArgumentException(
+          "Reducer with name " + name + " was already registered, " +
+          " and is " + reducerMap.get(name).getReduceOp() +
+          ", and we are trying to " + " register " + reduceOp);
+    }
+    if (reduceOp == null) {
+      throw new IllegalArgumentException(
+          "null reducer cannot be registered, with name " + name);
+    }
+    if (globalInitialValue == null) {
+      throw new IllegalArgumentException(
+          "global initial value for reducer cannot be null, but is for " +
+          reduceOp + " with naem" + name);
+    }
+
+    Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue);
+    reducerMap.put(name, (Reducer<Object, Writable>) reducer);
+  }
+
+  @Override
+  public void reduce(String name, Object value) {
+    Reducer<Object, Writable> reducer =
+        getOrThrow(reducerMap, "reducerMap", name);
+    synchronized (reducer) {
+      reducer.reduce(value);
+    }
+  }
+
+  @Override
+  public void reduceMerge(String name, Writable value) {
+    Reducer<Object, Writable> reducer =
+        getOrThrow(reducerMap, "reducerMap", name);
+    synchronized (reducer) {
+      reducer.reduceMerge(value);
+    }
+  }
+
+  @Override
+  public <R extends Writable> R getReduced(String name) {
+    return (R) getOrThrow(reducedMap, "reducedMap", name);
+  }
+
+  public synchronized void afterWorkerBeforeMaster() {
+    broadcastMap.clear();
+    reducedMap.clear();
+    for (Entry<String, Reducer<Object, Writable>> entry :
+          reducerMap.entrySet()) {
+      Writable value = entry.getValue().getCurrentValue();
+      if (runAllChecks) {
+        Writable newValue = entry.getValue().createInitialValue();
+        WritableUtils.copyInto(value, newValue);
+        value = newValue;
+      }
+      reducedMap.put(entry.getKey(), value);
+    }
+    reducerMap.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
new file mode 100644
index 0000000..99e9e24
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
+import org.apache.giraph.block_app.framework.api.Counter;
+import 
org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
+import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
+import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
+import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
+import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
+import 
org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
+import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.worker.WorkerAggregatorDelegator;
+import org.apache.giraph.worker.WorkerGlobalCommUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Internal implementation of Block API interfaces - representing an in-memory
+ * giraph instance.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+class InternalApi<I extends WritableComparable, V extends Writable,
+    E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
+  private final TestGraph<I, V, E> graph;
+  private final ImmutableClassesGiraphConfiguration conf;
+  private final boolean runAllChecks;
+  private final InternalAggregators globalComm;
+  private final AggregatorToGlobalCommTranslation aggregators;
+
+  private final boolean createVertexOnMsgs;
+  private final ConcurrentHashMap<I, VertexMutations<I, V, E>> mutations;
+
+  private InternalMessageStore previousMessages;
+  private InternalMessageStore nextMessages;
+
+  private final InternalWorkerApi workerApi;
+  private final BlockWorkerContextLogic workerContextLogic;
+  private List<Writable> previousWorkerMessages;
+  private List<Writable> nextWorkerMessages;
+
+  public InternalApi(
+      TestGraph<I, V, E> graph,
+      ImmutableClassesGiraphConfiguration conf,
+      boolean runAllChecks) {
+    this.graph = graph;
+    this.conf = conf;
+    this.runAllChecks = runAllChecks;
+    this.globalComm = new InternalAggregators(runAllChecks);
+    this.aggregators = new AggregatorToGlobalCommTranslation(conf, globalComm);
+    this.mutations = new ConcurrentHashMap<>();
+    this.workerApi = new InternalWorkerApi();
+    this.workerApi.setConf(conf);
+    this.workerApi.setWorkerGlobalCommUsage(this.globalComm);
+
+    this.createVertexOnMsgs =
+        GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.get(conf);
+    workerContextLogic = new BlockWorkerContextLogic();
+  }
+
+  /**
+   * Wrapper for calling Worker API interface.
+   * Needs to be separate from Master API, since getAggregatedValue
+   * has different implementation on worker and on master.
+   */
+  class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E>
+      implements BlockWorkerSendApi<I, V, E, Writable>,
+      BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<Writable>,
+      BlockWorkerContextReceiveApi, BlockWorkerValueAccessor,
+      WorkerGlobalCommUsage {
+
+    @Override
+    public void addVertexRequest(I id, V value) {
+      addVertexRequest(id, value, conf.createAndInitializeOutEdges());
+    }
+
+    @Override
+    public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
+      Vertex<I, V, E> vertex = conf.createVertex();
+      vertex.initialize(id, value, edges);
+      getMutationFor(id).addVertex(vertex);
+    }
+
+    @Override
+    public void removeVertexRequest(I vertexId) {
+      getMutationFor(vertexId).removeVertex();
+    }
+
+    @Override
+    public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
+      getMutationFor(sourceVertexId).addEdge(edge);
+    }
+
+    @Override
+    public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
+      getMutationFor(sourceVertexId).removeEdge(targetVertexId);
+    }
+
+    @Override
+    public void sendMessage(I id, Writable message) {
+      nextMessages.sendMessage(id, message);
+    }
+
+    @Override
+    public void sendMessageToAllEdges(
+        Vertex<I, V, E> vertex, Writable message) {
+      sendMessageToMultipleEdges(
+          new TargetVertexIdIterator<>(vertex),
+          message);
+    }
+
+    @Override
+    public void sendMessageToMultipleEdges(
+        Iterator<I> vertexIdIterator, Writable message) {
+      nextMessages.sendMessageToMultipleEdges(vertexIdIterator, message);
+    }
+
+    @Override
+    public int getMyWorkerIndex() {
+      return 0;
+    }
+
+    @Override
+    public int getWorkerCount() {
+      return 1;
+    }
+
+    @Override
+    public void sendMessageToWorker(Writable message, int workerIndex) {
+      Preconditions.checkArgument(workerIndex == getMyWorkerIndex(),
+          "With just one worker you can only send worker message to itself, " +
+              "but tried to send to " + workerIndex);
+      nextWorkerMessages.add(message);
+    }
+
+    @Override
+    public Object getWorkerValue() {
+      return workerContextLogic.getWorkerValue();
+    }
+
+    @Override
+    public long getTotalNumVertices() {
+      return InternalApi.this.getTotalNumVertices();
+    }
+
+    @Override
+    public long getTotalNumEdges() {
+      return InternalApi.this.getTotalNumEdges();
+    }
+
+    @Override
+    public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
+    OD getOutputDesc(String confOption) {
+      return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
+          confOption);
+    }
+
+    @Override
+    public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
+      return workerContextLogic.getOutputHandle().getWriter(confOption);
+    }
+  }
+
+  @Override
+  public void broadcast(String name, Writable value) {
+    globalComm.broadcast(name, value);
+  }
+
+  @Override
+  public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
+    BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
+    broadcast(handle.getName(), object);
+    return handle;
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp) {
+    globalComm.registerReducer(name, reduceOp);
+  }
+
+  @Override
+  public <S, R extends Writable> void registerReducer(
+      String name, ReduceOperation<S, R> reduceOp,
+      R globalInitialValue) {
+    globalComm.registerReducer(name, reduceOp, globalInitialValue);
+  }
+
+  @Override
+  public <R extends Writable> R getReduced(String name) {
+    return globalComm.getReduced(name);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return aggregators.getAggregatedValue(name);
+  }
+
+  @Override
+  public <A extends Writable> void setAggregatedValue(String name, A value) {
+    aggregators.setAggregatedValue(name, value);
+  }
+
+  @Override
+  public <A extends Writable>
+  boolean registerAggregator(
+      String name, Class<? extends Aggregator<A>> aggregatorClass)
+      throws InstantiationException, IllegalAccessException {
+    return aggregators.registerAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public <A extends Writable>
+  boolean registerPersistentAggregator(
+      String name, Class<? extends Aggregator<A>> aggregatorClass)
+      throws InstantiationException, IllegalAccessException {
+    return aggregators.registerPersistentAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setStatus(String status) {
+  }
+
+  @Override
+  public void progress() {
+  }
+
+  @Override
+  public Counter getCounter(final String group, final String name) {
+    return new Counter() {
+      @Override
+      public void increment(long incr) {
+      }
+      @Override
+      public void setValue(long value) {
+      }
+    };
+  }
+
+  private VertexMutations<I, V, E> getMutationFor(I vertexId) {
+    VertexMutations<I, V, E> curMutations = new VertexMutations<>();
+    VertexMutations<I, V, E> prevMutations =
+        mutations.putIfAbsent(vertexId, curMutations);
+    if (prevMutations != null) {
+      curMutations = prevMutations;
+    }
+    return curMutations;
+  }
+
+  public Iterable takeMessages(I id) {
+    if (previousMessages != null) {
+      Iterable result = previousMessages.takeMessages(id);
+      if (result != null) {
+        return result;
+      }
+    }
+    return Collections.emptyList();
+  }
+
+  public List<Writable> takeWorkerMessages() {
+    if (previousWorkerMessages != null) {
+      List<Writable> ret = new ArrayList<>(previousWorkerMessages.size());
+      for (Writable message : previousWorkerMessages) {
+        // Use message copies probabilistically, to catch both not serializing
+        // some fields, and storing references from message object itself
+        // (which can be reusable).
+        ret.add(runAllChecks && ThreadLocalRandom.current().nextBoolean() ?
+            WritableUtils.createCopy(message) : message);
+      }
+      previousWorkerMessages = null;
+      if (runAllChecks) {
+        Collections.shuffle(ret);
+      }
+      return ret;
+    }
+    return Collections.emptyList();
+  }
+
+  public void afterWorkerBeforeMaster() {
+    globalComm.afterWorkerBeforeMaster();
+    aggregators.prepareSuperstep();
+  }
+
+  public void afterMasterBeforeWorker() {
+    aggregators.postMasterCompute();
+  }
+
+  public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
+    afterMasterBeforeWorker();
+
+    previousMessages = nextMessages;
+    previousWorkerMessages = nextWorkerMessages;
+
+    nextMessages = InternalConcurrentMessageStore.createMessageStore(
+        conf, computation, runAllChecks);
+    nextWorkerMessages = new ArrayList<>();
+
+    // process mutations:
+    Set<I> targets = previousMessages == null ?
+      Collections.EMPTY_SET : previousMessages.targetsSet();
+    if (createVertexOnMsgs) {
+      for (I target : targets) {
+        if (!graph.getVertices().containsKey(target)) {
+          mutations.put(target, new VertexMutations<I, V, E>());
+        }
+      }
+    }
+
+    VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
+    for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
+      I vertexIndex = entry.getKey();
+      Vertex<I, V, E> originalVertex = graph.getVertex(vertexIndex);
+      VertexMutations<I, V, E> curMutations = entry.getValue();
+      Vertex<I, V, E> vertex = vertexResolver.resolve(
+          vertexIndex, originalVertex, curMutations,
+          targets.contains(vertexIndex));
+
+      if (vertex != null) {
+        graph.addVertex(vertex);
+      } else if (originalVertex != null) {
+        graph.getVertices().remove(originalVertex.getId());
+      }
+    }
+    mutations.clear();
+  }
+
+  public Collection<Vertex<I, V, E>> getAllVertices() {
+    return graph.getVertices().values();
+  }
+
+  public InternalWorkerApi getWorkerApi() {
+    return workerApi;
+  }
+
+  @Override
+  public long getTotalNumEdges() {
+    int numEdges = 0;
+    for (Vertex<I, V, E> vertex : graph.getVertices().values()) {
+      numEdges += vertex.getNumEdges();
+    }
+    return numEdges;
+  }
+
+  @Override
+  public long getTotalNumVertices() {
+    return graph.getVertices().size();
+  }
+
+  @Override
+  public void logToCommandLine(String line) {
+    System.err.println("Command line: " + line);
+  }
+
+  @Override
+  public BlockOutputHandle getBlockOutputHandle() {
+    return workerContextLogic.getOutputHandle();
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter,
+      OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
+    return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
+        confOption);
+  }
+
+  @Override
+  public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
+    return workerContextLogic.getOutputHandle().getWriter(confOption);
+  }
+
+
+  public BlockWorkerContextLogic getWorkerContextLogic() {
+    return workerContextLogic;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
new file mode 100644
index 0000000..6c0cccb
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.MessageClasses;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.types.ops.TypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.utils.ExtendedByteArrayDataInput;
+import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * Interface for internal message store, used by LocalBlockRunner
+ *
+ * @param <I> Vertex id type
+ * @param <M> Message type
+ */
+@SuppressWarnings("rawtypes")
+interface InternalMessageStore
+    <I extends WritableComparable, M extends Writable> {
+  Set<I> targetsSet();
+  Iterable<M> takeMessages(I id);
+  void sendMessage(I id, M message);
+  void sendMessageToMultipleEdges(Iterator<I> idIter, M message);
+
+  /**
+   * Abstract Internal message store implementation that uses
+   * ConcurrentHashMap to store objects received thus far.
+   *
+   * @param <I> Vertex id type
+   * @param <M> Message type
+   * @param <R> Receiver object that particular implementation uses
+   *            (message, array of messages, byte array, etc)
+   */
+  abstract class InternalConcurrentMessageStore
+      <I extends WritableComparable, M extends Writable, R>
+      implements InternalMessageStore<I, M> {
+    protected final ConcurrentHashMap<I, R> received =
+        new ConcurrentHashMap<>();
+
+    private final Class<I> idClass;
+    private final TypeOps<I> idTypeOps;
+
+    InternalConcurrentMessageStore(Class<I> idClass) {
+      this.idClass = idClass;
+      idTypeOps = TypeOpsUtils.getTypeOpsOrNull(idClass);
+    }
+
+    public I copyId(I id) {
+      if (idTypeOps != null) {
+        return idTypeOps.createCopy(id);
+      } else {
+        return WritableUtils.createCopy(id, idClass, null);
+      }
+    }
+
+    R getReceiverFor(I id) {
+      R value = received.get(id);
+
+      if (value == null) {
+        id = copyId(id);
+        value = createNewReceiver();
+        R oldValue = received.putIfAbsent(id, value);
+        if (oldValue != null) {
+          value = oldValue;
+        }
+      }
+      return value;
+    }
+
+    abstract R createNewReceiver();
+
+    @Override
+    public Set<I> targetsSet() {
+      return received.keySet();
+    }
+
+    @Override
+    public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
+      while (idIter.hasNext()) {
+        sendMessage(idIter.next(), message);
+      }
+    }
+
+    public static <I extends WritableComparable, M extends Writable>
+    InternalMessageStore<I, M> createMessageStore(
+        final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+        final MessageClasses<I, M> messageClasses) {
+      MessageCombiner<? super I, M> combiner =
+          messageClasses.createMessageCombiner(conf);
+      if (combiner != null) {
+        return new InternalCombinerMessageStore<>(
+            conf.getVertexIdClass(), combiner);
+      } else if (messageClasses.getMessageEncodeAndStoreType().equals(
+          MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
+        return new InternalSharedByteMessageStore<>(
+            conf.getVertexIdClass(),
+            messageClasses.createMessageValueFactory(conf));
+      } else {
+        return new InternalByteMessageStore<>(
+            conf.getVertexIdClass(),
+            messageClasses.createMessageValueFactory(conf));
+      }
+    }
+
+    public static <I extends WritableComparable, M extends Writable>
+    InternalMessageStore<I, M> createMessageStore(
+        final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+        final BlockWorkerPieces pieces, boolean runAllChecks) {
+      @SuppressWarnings("unchecked")
+      MessageClasses<I, M> messageClasses =
+          pieces.getOutgoingMessageClasses(conf);
+
+      InternalMessageStore<I, M> messageStore =
+          createMessageStore(conf, messageClasses);
+      if (runAllChecks) {
+        return new InternalChecksMessageStore<I, M>(
+            messageStore, conf, 
messageClasses.createMessageValueFactory(conf));
+      } else {
+        return messageStore;
+      }
+    }
+  }
+
+  /**
+   * InternalMessageStore that combines messages as they are received.
+   *
+   * @param <I> Vertex id value type
+   * @param <M> Message type
+   */
+  static class InternalCombinerMessageStore
+      <I extends WritableComparable, M extends Writable>
+      extends InternalConcurrentMessageStore<I, M, M> {
+    private final MessageCombiner<? super I, M> messageCombiner;
+
+    public InternalCombinerMessageStore(Class<I> idClass,
+        MessageCombiner<? super I, M> messageCombiner) {
+      super(idClass);
+      this.messageCombiner = messageCombiner;
+    }
+
+    @Override
+    public Iterable<M> takeMessages(I id) {
+      M message = received.remove(id);
+      if (message != null) {
+        return Collections.singleton(message);
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public void sendMessage(I id, M message) {
+      M mainMessage = getReceiverFor(id);
+      synchronized (mainMessage) {
+        messageCombiner.combine(id, mainMessage, message);
+      }
+    }
+
+    @Override
+    M createNewReceiver() {
+      return messageCombiner.createInitialMessage();
+    }
+  }
+
+  /**
+   * InternalMessageStore that keeps messages for each vertex in byte array.
+   *
+   * @param <I> Vertex id value type
+   * @param <M> Message type
+   */
+  static class InternalByteMessageStore
+      <I extends WritableComparable, M extends Writable>
+      extends InternalConcurrentMessageStore<I, M,
+          ExtendedByteArrayDataOutput> {
+    private final MessageValueFactory<M> messageFactory;
+
+    public InternalByteMessageStore(
+        Class<I> idClass, MessageValueFactory<M> messageFactory) {
+      super(idClass);
+      this.messageFactory = messageFactory;
+    }
+
+    @Override
+    public Iterable<M> takeMessages(I id) {
+      final ExtendedByteArrayDataOutput out = received.remove(id);
+      if (out == null) {
+        return null;
+      }
+
+      return new Iterable<M>() {
+        @Override
+        public Iterator<M> iterator() {
+          final ExtendedByteArrayDataInput in = new ExtendedByteArrayDataInput(
+              out.getByteArray(), 0, out.getPos());
+          final M message = messageFactory.newInstance();
+          return new AbstractIterator<M>() {
+            @Override
+            protected M computeNext() {
+              if (in.available() == 0) {
+                return endOfData();
+              }
+              try {
+                message.readFields(in);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+              return message;
+            }
+          };
+        }
+      };
+    }
+
+    @Override
+    public void sendMessage(I id, M message) {
+      ExtendedByteArrayDataOutput out = getReceiverFor(id);
+
+      synchronized (out) {
+        try {
+          message.write(out);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    @Override
+    ExtendedByteArrayDataOutput createNewReceiver() {
+      return new ExtendedByteArrayDataOutput();
+    }
+  }
+
+  /**
+   * InternalMessageStore that creates byte[] for each message, and
+   * all receivers share the same byte[].
+   *
+   * @param <I> Vertex id value type
+   * @param <M> Message type
+   */
+  static class InternalSharedByteMessageStore
+      <I extends WritableComparable, M extends Writable>
+      extends InternalConcurrentMessageStore<I, M, List<byte[]>> {
+    private final MessageValueFactory<M> messageFactory;
+
+    public InternalSharedByteMessageStore(
+        Class<I> idClass, MessageValueFactory<M> messageFactory) {
+      super(idClass);
+      this.messageFactory = messageFactory;
+    }
+
+    @Override
+    public Iterable<M> takeMessages(I id) {
+      final List<byte[]> out = received.remove(id);
+      if (out == null) {
+        return null;
+      }
+
+      return new Iterable<M>() {
+        @Override
+        public Iterator<M> iterator() {
+          final Iterator<byte[]> byteIter = out.iterator();
+          final M message = messageFactory.newInstance();
+          final UnsafeReusableByteArrayInput reusableInput =
+              new UnsafeReusableByteArrayInput();
+
+          return new Iterator<M>() {
+            @Override
+            public boolean hasNext() {
+              return byteIter.hasNext();
+            }
+
+            @Override
+            public M next() {
+              WritableUtils.fromByteArrayUnsafe(
+                  byteIter.next(), message, reusableInput);
+              return message;
+            }
+
+            @Override
+            public void remove() {
+              byteIter.remove();
+            }
+          };
+        }
+      };
+    }
+
+    private void storeMessage(I id, byte[] messageData) {
+      List<byte[]> out = getReceiverFor(id);
+      synchronized (out) {
+        out.add(messageData);
+      }
+    }
+
+    @Override
+    List<byte[]> createNewReceiver() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public void sendMessage(I id, M message) {
+      storeMessage(id, WritableUtils.toByteArrayUnsafe(message));
+    }
+
+    @Override
+    public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
+      byte[] messageData = WritableUtils.toByteArrayUnsafe(message);
+      while (idIter.hasNext()) {
+        storeMessage(idIter.next(), messageData);
+      }
+    }
+  }
+
+  /**
+   * Message store that add checks for whether serialization seems to be
+   * working fine
+   */
+  static class InternalChecksMessageStore
+      <I extends WritableComparable, M extends Writable>
+      implements InternalMessageStore<I, M> {
+    private final InternalMessageStore<I, M> messageStore;
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+    private final MessageValueFactory<M> messageFactory;
+
+    public InternalChecksMessageStore(InternalMessageStore<I, M> messageStore,
+        ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+        MessageValueFactory<M> messageFactory) {
+      this.messageStore = messageStore;
+      this.conf = conf;
+      this.messageFactory = messageFactory;
+    }
+
+    // Use message copies probabilistically, to catch both not serializing some
+    // fields, and storing references from message object itself
+    // (which can be reusable).
+    private M maybeMessageCopy(M message) {
+      M messageCopy = WritableUtils.createCopy(
+          message, messageFactory, conf);
+      return ThreadLocalRandom.current().nextBoolean() ? messageCopy : message;
+    }
+
+    private void checkIdCopy(I id) {
+      WritableUtils.createCopy(id, conf.getVertexIdFactory(), conf);
+    }
+
+    @Override
+    public void sendMessage(I id, M message) {
+      checkIdCopy(id);
+      messageStore.sendMessage(id, maybeMessageCopy(message));
+    }
+
+    @Override
+    public void sendMessageToMultipleEdges(
+        final Iterator<I> idIter, M message) {
+      messageStore.sendMessageToMultipleEdges(
+          new Iterator<I>() {
+            @Override
+            public boolean hasNext() {
+              return idIter.hasNext();
+            }
+
+            @Override
+            public I next() {
+              I id = idIter.next();
+              checkIdCopy(id);
+              return id;
+            }
+
+            @Override
+            public void remove() {
+              idIter.remove();
+            }
+          },
+          maybeMessageCopy(message));
+    }
+
+    @Override
+    public Iterable<M> takeMessages(I id) {
+      checkIdCopy(id);
+      return messageStore.takeMessages(id);
+    }
+
+    @Override
+    public Set<I> targetsSet() {
+      return messageStore.targetsSet();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
new file mode 100644
index 0000000..bdf3233
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import 
org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
+import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
+import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
+import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
+import org.apache.giraph.conf.BooleanConfOption;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+/**
+ * Local in-memory Block application job runner, used for testing.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class LocalBlockRunner {
+  public static final IntConfOption NUM_WORKERS = new IntConfOption(
+      "test.LocalBlockRunner.NUM_WORKERS", 3, "");
+  public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
+      "test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
+  // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working
+  public static final BooleanConfOption SERIALIZE_MASTER =
+      new BooleanConfOption(
+          "test.LocalBlockRunner.SERIALIZE_MASTER", false, "");
+
+  private LocalBlockRunner() { }
+
+  /**
+   * With a boolean flag, you can switch between LocalBlockRunner and
+   * InternalVertexRunner for running the unit test.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  TestGraph<I, V, E> runWithInMemoryOutput(
+      TestGraph<I, V, E> graph, GiraphConfiguration conf,
+      boolean useFullDigraphTests) throws Exception {
+    if (useFullDigraphTests) {
+      return InternalVertexRunner.runWithInMemoryOutput(conf, graph);
+    } else {
+      runWithInMemoryOutput(graph, conf);
+      return graph;
+    }
+  }
+
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  void runWithInMemoryOutput(
+      TestGraph<I, V, E> graph, GiraphConfiguration conf) throws Exception {
+    VertexSaver<I, V, E> noOpVertexSaver = new VertexSaver<I, V, E>() {
+      @Override
+      public void saveVertex(Vertex<I, V, E> vertex) {
+        // No-op
+      }
+    };
+    runWithVertexSaverOutput(graph, noOpVertexSaver, conf);
+  }
+
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  void runWithVertexSaverOutput(
+      TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver,
+      GiraphConfiguration conf) throws Exception {
+    int numWorkers = NUM_WORKERS.get(conf);
+    boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
+    boolean serializeMaster = SERIALIZE_MASTER.get(conf);
+    final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
+
+    ImmutableClassesGiraphConfiguration<I, V, E> immConf =
+        new ImmutableClassesGiraphConfiguration(conf);
+    final InternalApi internalApi =
+        new InternalApi(graph, immConf, runAllChecks);
+    final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
+
+    BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
+    blockMasterLogic.initialize(immConf, internalApi);
+
+    BlockWorkerContextLogic workerContextLogic =
+        internalApi.getWorkerContextLogic();
+    workerContextLogic.preApplication(internalWorkerApi,
+        new BlockOutputHandle("", conf, new Progressable() {
+          @Override
+          public void progress() {
+          }
+        }));
+
+    ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
+    Random rand = new Random();
+
+    if (runAllChecks) {
+      for (Vertex<I, V, E> vertex : graph) {
+        V value = immConf.createVertexValue();
+        WritableUtils.copyInto(vertex.getValue(), value);
+        vertex.setValue(value);
+
+        vertex.setEdges((Iterable) WritableUtils.createCopy(
+            (Writable) vertex.getEdges(), immConf.getOutEdgesClass(), 
immConf));
+      }
+    }
+
+    final AtomicBoolean anyVertexAlive = new AtomicBoolean(true);
+
+    for (int superstep = 0;; superstep++) {
+      // serialize master to test continuable computation
+      if (serializeMaster) {
+        blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy(
+            new KryoWritableWrapper<>(blockMasterLogic),
+            KryoWritableWrapper.class,
+            immConf).get();
+        blockMasterLogic.initializeAfterRead(internalApi);
+      }
+
+      if (!anyVertexAlive.get()) {
+        break;
+      }
+
+      final BlockWorkerPieces workerPieces =
+          blockMasterLogic.computeNext(superstep);
+      if (workerPieces == null) {
+        if (!conf.doOutputDuringComputation()) {
+          Collection<Vertex<I, V, E>> vertices = internalApi.getAllVertices();
+          for (Vertex<I, V, E> vertex : vertices) {
+            vertexSaver.saveVertex(vertex);
+          }
+        }
+        int left = executor.shutdownNow().size();
+        Preconditions.checkState(0 == left, "Some work still left to be 
done?");
+        break;
+      } else {
+        internalApi.afterMasterBeforeWorker(workerPieces);
+        List<List<Vertex<I, V, E>>> verticesPerWorker = new ArrayList<>();
+        for (int i = 0; i < numWorkers; i++) {
+          verticesPerWorker.add(new ArrayList<Vertex<I, V, E>>());
+        }
+        Collection<Vertex<I, V, E>> allVertices = internalApi.getAllVertices();
+        for (Vertex<I, V, E> vertex : allVertices) {
+          verticesPerWorker.get(rand.nextInt(numWorkers)).add(vertex);
+        }
+
+        workerContextLogic.preSuperstep(
+            internalWorkerApi,
+            internalWorkerApi,
+            KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
+            internalApi.takeWorkerMessages());
+
+        final CountDownLatch latch = new CountDownLatch(numWorkers);
+        final AtomicReference<Throwable> exception = new AtomicReference<>();
+        anyVertexAlive.set(false);
+        for (final List<Vertex<I, V, E>> curVertices : verticesPerWorker) {
+          executor.execute(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                boolean anyCurVertexAlive = false;
+                BlockWorkerPieces localPieces =
+                    KryoWritableWrapper.wrapAndCopy(workerPieces);
+
+                BlockWorkerLogic localLogic = new 
BlockWorkerLogic(localPieces);
+                localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
+
+                for (Vertex<I, V, E> vertex : curVertices) {
+                  Iterable messages = internalApi.takeMessages(vertex.getId());
+                  if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
+                    vertex.wakeUp();
+                  }
+                  if (!vertex.isHalted()) {
+                    localLogic.compute(vertex, messages);
+                    if (doOutputDuringComputation) {
+                      vertexSaver.saveVertex(vertex);
+                    }
+                  }
+
+                  if (!vertex.isHalted()) {
+                    anyCurVertexAlive = true;
+                  }
+                }
+
+                if (anyCurVertexAlive) {
+                  anyVertexAlive.set(true);
+                }
+                localLogic.postSuperstep();
+              // CHECKSTYLE: stop IllegalCatch
+              // Need to propagate all exceptions within test
+              } catch (Throwable t) {
+              // CHECKSTYLE: resume IllegalCatch
+                t.printStackTrace();
+                exception.set(t);
+              }
+
+              latch.countDown();
+            }
+          });
+        }
+
+        latch.await();
+        if (exception.get() != null) {
+          throw new RuntimeException("Worker failed", exception.get());
+        }
+
+        workerContextLogic.postSuperstep();
+
+        internalApi.afterWorkerBeforeMaster();
+      }
+    }
+
+    workerContextLogic.postApplication();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
new file mode 100644
index 0000000..0053644
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface to use for saving vertices
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public interface VertexSaver<I extends WritableComparable, V extends Writable,
+    E extends Writable> {
+  void saveVertex(Vertex<I, V, E> vertex);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java
new file mode 100644
index 0000000..c9fe578
--- /dev/null
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Local (single machine) implementation of graph processing system API used by
+ * Blocks Framework.
+ *
+ * Allows efficient execution of Block Applications on small graphs, as well as
+ * comprehensive set of optional checks helping with unit tests.
+ */
+package org.apache.giraph.block_app.framework.api.local;

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index e101b01..a3af507 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -175,7 +175,8 @@ public class SendMessageCache<I extends WritableComparable, 
M extends Writable>
    * An iterator wrapper on edges to return
    * target vertex ids.
    */
-  private class TargetVertexIdIterator implements Iterator<I> {
+  public static class TargetVertexIdIterator<I extends WritableComparable>
+      implements Iterator<I> {
     /** An edge iterator */
     private final Iterator<Edge<I, Writable>> edgesIterator;
 
@@ -184,7 +185,7 @@ public class SendMessageCache<I extends WritableComparable, 
M extends Writable>
      *
      * @param vertex The source vertex of the out edges
      */
-    private TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
+    public TargetVertexIdIterator(Vertex<I, ?, ?> vertex) {
       edgesIterator =
         ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator();
     }
@@ -201,7 +202,7 @@ public class SendMessageCache<I extends WritableComparable, 
M extends Writable>
 
     @Override
     public void remove() {
-      // No operation.
+      throw new UnsupportedOperationException();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
 
b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
index 0f6e73f..f17955b 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -107,4 +108,16 @@ public class KryoWritableWrapper<T> implements Writable {
       return (T) value;
     }
   }
+
+  /**
+   * Wrap object with KryoWritableWrapper, create a writable copy of it,
+   * and then unwrap it, allowing any object to be copied.
+   *
+   * @param object Object to copy
+   * @return copy of the object
+   * @param <T> Type of the object
+   */
+  public static <T> T wrapAndCopy(T object) {
+    return WritableUtils.createCopy(new KryoWritableWrapper<>(object)).get();
+  }
 }

Reply via email to