Repository: giraph
Updated Branches:
  refs/heads/trunk 81d5badf7 -> c32f465d4


[GIRAPH-1037] Surface worker index information to computations

Summary:
It can be useful for applications to surface:
number of workers
index of a worker particular VertexId is assigned to
index of a current worker.

Test Plan: mvn clean install

Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo

Reviewed By: maja.kabiljo

Differential Revision: https://reviews.facebook.net/D49401


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c32f465d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c32f465d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c32f465d

Branch: refs/heads/trunk
Commit: c32f465d442c52edb8aba4e7fb78c153639d8cd7
Parents: 81d5bad
Author: Igor Kabiljo <[email protected]>
Authored: Fri Oct 23 17:34:07 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Fri Nov 13 12:52:27 2015 -0800

----------------------------------------------------------------------
 .../block_app/framework/api/BlockApi.java       |   8 +-
 .../block_app/framework/api/BlockWorkerApi.java |   3 +-
 .../framework/api/BlockWorkerContextApi.java    |  24 ++---
 .../api/BlockWorkerContextReceiveApi.java       |   8 +-
 .../api/BlockWorkerContextSendApi.java          |   8 +-
 .../api/giraph/BlockMasterApiWrapper.java       |   5 +
 .../api/giraph/BlockWorkerApiWrapper.java       |  15 +++
 .../api/giraph/BlockWorkerContext.java          |   3 +-
 .../giraph/BlockWorkerContextApiWrapper.java    |  12 ++-
 .../framework/api/local/InternalApi.java        |  15 ++-
 .../framework/piece/AbstractPiece.java          |   2 +-
 .../giraph/block_app/framework/piece/Piece.java |   2 +-
 .../framework/piece/delegate/DelegatePiece.java |   2 +-
 .../block_app/library/gc/WorkerGCPiece.java     |   2 +-
 .../block_app/migration/MigrationPiece.java     |   2 +-
 .../migration/MigrationWorkerContext.java       |   5 +-
 .../block_app/framework/TestWorkerMessages.java |   3 +-
 .../giraph/graph/AbstractComputation.java       |  44 ++++++--
 .../org/apache/giraph/graph/Computation.java    |  11 +-
 .../apache/giraph/graph/ComputeCallable.java    |   4 +-
 .../org/apache/giraph/master/MasterCompute.java |  12 +++
 .../apache/giraph/worker/AllWorkersInfo.java    | 103 +++++++++++++++++++
 .../org/apache/giraph/worker/WorkerContext.java |  51 +++++----
 .../apache/giraph/worker/WorkerIndexUsage.java  |  48 +++++++++
 .../java/org/apache/giraph/utils/MockUtils.java |   4 +-
 25 files changed, 315 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java
index a0c92ad..d914b9a 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java
@@ -17,12 +17,18 @@
  */
 package org.apache.giraph.block_app.framework.api;
 
-
 /**
  * Basic block computation API for accessing items
  * present on both workers and master.
  */
 public interface BlockApi extends BlockConfApi {
+ /**
+   * Get number of workers
+   *
+   * @return Number of workers
+   */
+  int getWorkerCount();
+
   /**
    * Get the total (all workers) number of vertices that
    * existed at the start of the current piece.

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
index 727bf08..76898f6 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
@@ -19,6 +19,7 @@ package org.apache.giraph.block_app.framework.api;
 
 import org.apache.giraph.aggregators.AggregatorUsage;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerIndexUsage;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -29,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public interface BlockWorkerApi<I extends WritableComparable>
-    extends AggregatorUsage, BlockApi {
+    extends BlockApi, AggregatorUsage, WorkerIndexUsage<I> {
   @Override
   ImmutableClassesGiraphConfiguration<I, ?, ?> getConf();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
index d5918b5..699f76a 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
@@ -18,28 +18,16 @@
 package org.apache.giraph.block_app.framework.api;
 
 import org.apache.giraph.aggregators.AggregatorUsage;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerIndexUsage;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Block computation API available for worker context methods.
  *
  * Interface to the WorkerContext methods.
+ * @param <I> vertex Id type.
  */
-public interface BlockWorkerContextApi extends AggregatorUsage, BlockApi {
-  @Override
-  ImmutableClassesGiraphConfiguration<?, ?, ?> getConf();
-
-  /**
-   * Get number of workers
-   *
-   * @return Number of workers
-   */
-  int getWorkerCount();
-
-  /**
-   * Get index for this worker
-   *
-   * @return Index of this worker
-   */
-  int getMyWorkerIndex();
+@SuppressWarnings("rawtypes")
+public interface BlockWorkerContextApi<I extends WritableComparable>
+    extends AggregatorUsage, BlockApi, WorkerIndexUsage<I> {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java
index a8242b2..40a1e7d 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java
@@ -18,12 +18,16 @@
 package org.apache.giraph.block_app.framework.api;
 
 import org.apache.giraph.worker.WorkerBroadcastUsage;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Block computation API available for worker context receive methods.
  *
  * Interface to the WorkerContext methods.
+ *
+ * @param <I> vertex Id type.
  */
-public interface BlockWorkerContextReceiveApi
-    extends BlockWorkerContextApi, WorkerBroadcastUsage {
+@SuppressWarnings("rawtypes")
+public interface BlockWorkerContextReceiveApi<I extends WritableComparable>
+    extends BlockWorkerContextApi<I>, WorkerBroadcastUsage {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java
index 769562d..386c6bb 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java
@@ -19,16 +19,20 @@ package org.apache.giraph.block_app.framework.api;
 
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Block computation API available for worker send methods.
  *
  * Interface to the WorkerContext methods.
  *
+ * @param <I> vertex Id type.
  * @param <WM> Worker message type
  */
-public interface BlockWorkerContextSendApi<WM extends Writable>
-    extends BlockWorkerContextApi, WorkerAggregatorUsage {
+@SuppressWarnings("rawtypes")
+public interface BlockWorkerContextSendApi
+    <I extends WritableComparable, WM extends Writable>
+    extends BlockWorkerContextApi<I>, WorkerAggregatorUsage {
   /**
    * Send message to another worker
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
index ede6005..55d3693 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
@@ -167,4 +167,9 @@ final class BlockMasterApiWrapper implements BlockMasterApi,
   public BlockOutputHandle getBlockOutputHandle() {
     return outputHandle;
   }
+
+  @Override
+  public int getWorkerCount() {
+    return master.getWorkerInfoList().size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
index 6e839f9..7bea80a 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
@@ -177,4 +177,19 @@ final class BlockWorkerApiWrapper<I extends 
WritableComparable,
   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
     return getBlockWorkerContext().getOutputHandle().getWriter(confOption);
   }
+
+  @Override
+  public int getMyWorkerIndex() {
+    return worker.getMyWorkerIndex();
+  }
+
+  @Override
+  public int getWorkerCount() {
+    return worker.getWorkerCount();
+  }
+
+  @Override
+  public int getWorkerForVertex(I vertexId) {
+    return worker.getWorkerForVertex(vertexId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
index 1a4f8d8..9618822 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java
@@ -29,6 +29,7 @@ import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.writable.kryo.HadoopKryo;
 import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
 /**
@@ -53,7 +54,7 @@ public final class BlockWorkerContext extends WorkerContext
   @Override
   public void preSuperstep() {
     List<Writable> messages = getAndClearMessagesFromOtherWorkers();
-    BlockWorkerContextApiWrapper<Writable> workerApi =
+    BlockWorkerContextApiWrapper<WritableComparable, Writable> workerApi =
         new BlockWorkerContextApiWrapper<>(this);
     BlockWorkerPieces<Object> workerPieces =
         BlockWorkerPieces.getNextWorkerPieces(this);

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
index c52b6a5..7336bd4 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
@@ -22,15 +22,18 @@ import 
org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Giraph implementation of BlockWorkerContextReceiveApi and
  * BlockWorkerContextSendApi, passing all calls to WorkerContext.
  *
+ * @param <I> vertex Id type.
  * @param <WM> Worker message type
  */
-final class BlockWorkerContextApiWrapper<WM extends Writable>
-    implements BlockWorkerContextReceiveApi, BlockWorkerContextSendApi<WM> {
+final class BlockWorkerContextApiWrapper
+    <I extends WritableComparable, WM extends Writable> implements
+    BlockWorkerContextReceiveApi<I>, BlockWorkerContextSendApi<I, WM> {
   private final WorkerContext workerContext;
 
   public BlockWorkerContextApiWrapper(WorkerContext workerContext) {
@@ -53,6 +56,11 @@ final class BlockWorkerContextApiWrapper<WM extends Writable>
   }
 
   @Override
+  public int getWorkerForVertex(I vertexId) {
+    return workerContext.getWorkerForVertex(vertexId);
+  }
+
+  @Override
   public <A extends Writable> A getAggregatedValue(String name) {
     return workerContext.getAggregatedValue(name);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
index 99e9e24..3ca8b1c 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
@@ -117,8 +117,8 @@ class InternalApi<I extends WritableComparable, V extends 
Writable,
    */
   class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E>
       implements BlockWorkerSendApi<I, V, E, Writable>,
-      BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<Writable>,
-      BlockWorkerContextReceiveApi, BlockWorkerValueAccessor,
+      BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<I, Writable>,
+      BlockWorkerContextReceiveApi<I>, BlockWorkerValueAccessor,
       WorkerGlobalCommUsage {
 
     @Override
@@ -178,6 +178,11 @@ class InternalApi<I extends WritableComparable, V extends 
Writable,
     }
 
     @Override
+    public int getWorkerForVertex(I vertexId) {
+      return 0;
+    }
+
+    @Override
     public void sendMessageToWorker(Writable message, int workerIndex) {
       Preconditions.checkArgument(workerIndex == getMyWorkerIndex(),
           "With just one worker you can only send worker message to itself, " +
@@ -425,8 +430,12 @@ class InternalApi<I extends WritableComparable, V extends 
Writable,
     return workerContextLogic.getOutputHandle().getWriter(confOption);
   }
 
-
   public BlockWorkerContextLogic getWorkerContextLogic() {
     return workerContextLogic;
   }
+
+  @Override
+  public int getWorkerCount() {
+    return 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
index 882f4f1..f4e3678 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java
@@ -159,7 +159,7 @@ public abstract class AbstractPiece<I extends 
WritableComparable,
    * getVertexSender.
    */
   public void workerContextSend(
-      BlockWorkerContextSendApi<WM> workerContextApi, S executionStage,
+      BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
       WV workerValue) {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
index 3ad66d1..0ef0701 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java
@@ -47,7 +47,7 @@ public class Piece<I extends WritableComparable, V extends 
Writable,
   // Disallowing use of Worker Context functions:
   @Override
   public final void workerContextSend(
-      BlockWorkerContextSendApi<NoMessage> workerContextApi,
+      BlockWorkerContextSendApi<I, NoMessage> workerContextApi,
       S executionStage, Object workerValue) {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
index 23c2d29..095ea74 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java
@@ -194,7 +194,7 @@ public class DelegatePiece<I extends WritableComparable, V 
extends Writable,
 
   @Override
   public void workerContextSend(
-      BlockWorkerContextSendApi<WM> workerContextApi, S executionStage,
+      BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage,
       WV workerValue) {
     for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) {
       piece.workerContextSend(workerContextApi, executionStage, workerValue);

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
index 83ef3fd..5912b9a 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java
@@ -34,7 +34,7 @@ public class WorkerGCPiece extends 
PieceWithWorkerContext<WritableComparable,
   @Override
   @SuppressFBWarnings(value = "DM_GC")
   public void workerContextSend(
-      BlockWorkerContextSendApi<NoMessage> workerContextApi,
+      BlockWorkerContextSendApi<WritableComparable, NoMessage> 
workerContextApi,
       Object executionStage,
       Object workerValue) {
     System.gc();

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
index c60ea2c..7a935c0 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java
@@ -258,7 +258,7 @@ public final class MigrationPiece<I extends 
WritableComparable,
 
   @Override
   public void workerContextSend(
-      BlockWorkerContextSendApi<Writable> workerContextApi,
+      BlockWorkerContextSendApi<I, Writable> workerContextApi,
       MigrationSuperstepStage executionStage,
       MigrationWorkerContext workerValue) {
     if (workerValue != null && !isFirstStep) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
index 014c269..5cc4925 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java
@@ -26,6 +26,7 @@ import 
org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Replacement for WorkerContext when migrating to Blocks Framework,
@@ -84,8 +85,8 @@ public class MigrationWorkerContext
   }
 
   public final void sendMessageToWorker(Writable message, int workerIndex) {
-    ((BlockWorkerContextSendApi<Writable>) api).sendMessageToWorker(
-        message, workerIndex);
+    ((BlockWorkerContextSendApi<WritableComparable, Writable>) api)
+      .sendMessageToWorker(message, workerIndex);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java
 
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java
index a68fca2..05f81b6 100644
--- 
a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java
+++ 
b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java
@@ -91,7 +91,8 @@ public class TestWorkerMessages {
     }
 
     @Override
-    public void workerContextSend(BlockWorkerContextSendApi<LongWritable> 
workerContextApi,
+    public void workerContextSend(
+        BlockWorkerContextSendApi<LongWritable, LongWritable> workerContextApi,
         Object executionStage, Object workerValue) {
       for (long value : values) {
         workerContextApi.sendMessageToWorker(new LongWritable(value),

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
index 1ea6603..086bc48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java
@@ -21,16 +21,17 @@ package org.apache.giraph.graph;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.worker.AllWorkersInfo;
 import org.apache.giraph.worker.WorkerAggregatorDelegator;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
 
 /**
  * See {@link Computation} for explanation of the interface.
@@ -54,17 +55,16 @@ public abstract class AbstractComputation<I extends 
WritableComparable,
     M2 extends Writable>
     extends WorkerAggregatorDelegator<I, V, E>
     implements Computation<I, V, E, M1, M2> {
-  /** Logger */
-  private static final Logger LOG = 
Logger.getLogger(AbstractComputation.class);
-
   /** Global graph state **/
   private GraphState graphState;
   /** Handles requests */
   private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
-  /** Graph-wide BSP Mapper for this Computation */
-  private GraphTaskManager<I, V, E> graphTaskManager;
+  /** Service worker */
+  private CentralizedServiceWorker<I, V, E> serviceWorker;
   /** Worker context */
   private WorkerContext workerContext;
+  /** All workers info */
+  private AllWorkersInfo allWorkersInfo;
 
   /**
    * Must be defined by user to do computation on a single Vertex.
@@ -109,14 +109,20 @@ public abstract class AbstractComputation<I extends 
WritableComparable,
   public void initialize(
       GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
-      GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerGlobalCommUsage workerGlobalCommUsage,
-      WorkerContext workerContext) {
+      CentralizedServiceWorker<I, V, E> serviceWorker,
+      WorkerGlobalCommUsage workerGlobalCommUsage) {
     this.graphState = graphState;
     this.workerClientRequestProcessor = workerClientRequestProcessor;
-    this.graphTaskManager = graphTaskManager;
     this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
-    this.workerContext = workerContext;
+    this.serviceWorker = serviceWorker;
+    if (serviceWorker != null) {
+      this.workerContext = serviceWorker.getWorkerContext();
+      this.allWorkersInfo = new AllWorkersInfo(
+          serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
+    } else {
+      this.workerContext = null;
+      this.allWorkersInfo = null;
+    }
   }
 
   /**
@@ -273,4 +279,20 @@ public abstract class AbstractComputation<I extends 
WritableComparable,
   public <W extends WorkerContext> W getWorkerContext() {
     return (W) workerContext;
   }
+
+  @Override
+  public final int getWorkerCount() {
+    return allWorkersInfo.getWorkerCount();
+  }
+
+  @Override
+  public final int getMyWorkerIndex() {
+    return allWorkersInfo.getMyWorkerIndex();
+  }
+
+  @Override
+  public final int getWorkerForVertex(I vertexId) {
+    return allWorkersInfo.getWorkerIndex(
+        serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index d310da9..1ac6f43 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.TypesHolder;
@@ -28,6 +29,7 @@ import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerGlobalCommUsage;
+import org.apache.giraph.worker.WorkerIndexUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -56,7 +58,7 @@ public interface Computation<I extends WritableComparable,
     M2 extends Writable>
     extends TypesHolder<I, V, E, M1, M2>,
     ImmutableClassesGiraphConfigurable<I, V, E>,
-    WorkerGlobalCommUsage, WorkerAggregatorUsage {
+    WorkerGlobalCommUsage, WorkerAggregatorUsage, WorkerIndexUsage<I> {
   /**
    * Must be defined by user to do computation on a single Vertex.
    *
@@ -87,14 +89,13 @@ public interface Computation<I extends WritableComparable,
    *
    * @param graphState Graph state
    * @param workerClientRequestProcessor Processor for handling requests
-   * @param graphTaskManager Graph-wide BSP Mapper for this Vertex
+   * @param serviceWorker Centralized service worker
    * @param workerGlobalCommUsage Worker global communication usage
-   * @param workerContext Worker context
    */
   void initialize(GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
-      GraphTaskManager<I, V, E> graphTaskManager,
-      WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext 
workerContext);
+      CentralizedServiceWorker<I, V, E> serviceWorker,
+      WorkerGlobalCommUsage workerGlobalCommUsage);
 
   /**
    * Retrieves the current superstep.

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index fa268da..d59d044 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -40,7 +40,6 @@ import org.apache.giraph.time.Times;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.utils.Trimmable;
-import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
 import org.apache.hadoop.io.Writable;
@@ -138,14 +137,13 @@ public class ComputeCallable<I extends 
WritableComparable, V extends Writable,
               useOneMessageToManyIdsEncoding());
     WorkerThreadGlobalCommUsage aggregatorUsage =
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
-    WorkerContext workerContext = serviceWorker.getWorkerContext();
 
     vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
 
     Computation<I, V, E, M1, M2> computation =
         (Computation<I, V, E, M1, M2>) configuration.createComputation();
     computation.initialize(graphState, workerClientRequestProcessor,
-        serviceWorker.getGraphTaskManager(), aggregatorUsage, workerContext);
+        serviceWorker, aggregatorUsage);
     computation.preSuperstep();
 
     List<PartitionStats> partitionStatsList = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java 
b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 50e3b36..dad766b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.master;
 
+import java.util.List;
+
 import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.combiner.MessageCombiner;
@@ -26,6 +28,7 @@ import org.apache.giraph.conf.MessageClasses;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -126,6 +129,15 @@ public abstract class MasterCompute
   }
 
   /**
+   * Get list of workers
+   *
+   * @return List of workers
+   */
+  public final List<WorkerInfo> getWorkerInfoList() {
+    return serviceMaster.getWorkerInfoList();
+  }
+
+  /**
    * Set Computation class to be used
    *
    * @param computationClass Computation class

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java
new file mode 100644
index 0000000..0112134
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.python.google.common.base.Preconditions;
+
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+/**
+ * Information about all workers, their WorkerInfo values, and indices.
+ */
+public class AllWorkersInfo {
+  /** List of workers in current superstep, sorted by task id */
+  private final List<WorkerInfo> workerInfos;
+  /** My worker index */
+  private final int myWorkerIndex;
+  /** Map of taskId to worker index */
+  private final Int2IntOpenHashMap taskIdToIndex;
+
+  /**
+   * Constructor
+   * @param workers Ordered list of workers
+   * @param myWorker My worker
+   */
+  public AllWorkersInfo(List<WorkerInfo> workers, WorkerInfo myWorker) {
+    workerInfos = new ArrayList<>(workers);
+
+    taskIdToIndex = new Int2IntOpenHashMap(workerInfos.size());
+    taskIdToIndex.defaultReturnValue(-1);
+    for (int i = 0; i < workerInfos.size(); i++) {
+      int task = workerInfos.get(i).getTaskId();
+      if (i > 0) {
+        Preconditions.checkState(
+            task > workerInfos.get(i - 1).getTaskId(), "Tasks not ordered");
+      }
+      Preconditions.checkState(task >= 0, "Task not specified, %d", task);
+      int old = taskIdToIndex.put(task, i);
+      Preconditions.checkState(old == -1,
+          "Task with %d id found twice (positions %d and %d)",
+          task, i, old);
+    }
+
+    myWorkerIndex = getWorkerIndex(myWorker);
+  }
+
+  /**
+   * List of WorkerInfos
+   *
+   * @return List of WorkerInfos
+   */
+  public List<WorkerInfo> getWorkerList() {
+    return workerInfos;
+  }
+
+  /**
+   * Get number of workers
+   *
+   * @return Number of workers
+   */
+  public int getWorkerCount() {
+    return workerInfos.size();
+  }
+
+  /**
+   * Get index for this worker
+   *
+   * @return Index of this worker
+   */
+  public int getMyWorkerIndex() {
+    return myWorkerIndex;
+  }
+
+  /**
+   * For every worker this method returns unique number
+   * between 0 and N, where N is the total number of workers.
+   * This number stays the same throughout the computation.
+   * TaskID may be different from this number and task ID
+   * is not necessarily continuous
+   * @param workerInfo worker info object
+   * @return worker number
+   */
+  public int getWorkerIndex(WorkerInfo workerInfo) {
+    return taskIdToIndex.get(workerInfo.getTaskId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
index b977ea1..dd909d3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java
@@ -37,23 +37,21 @@ import org.apache.hadoop.mapreduce.Mapper;
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext
   extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
-  implements Writable {
+  implements Writable, WorkerIndexUsage<WritableComparable> {
   /** Global graph state */
   private GraphState graphState;
 
   /** Service worker */
   private CentralizedServiceWorker serviceWorker;
-  /** Sorted list of other participating workers */
-  private List<WorkerInfo> workerList;
-  /** Index of this worker within workerList */
-  private int myWorkerIndex;
+  /** All workers info */
+  private AllWorkersInfo allWorkersInfo;
 
   /**
    * Set the graph state.
    *
    * @param graphState Used to set the graph state.
    */
-  public void setGraphState(GraphState graphState) {
+  public final void setGraphState(GraphState graphState) {
     this.graphState = graphState;
   }
 
@@ -62,10 +60,11 @@ public abstract class WorkerContext
    *
    * @param serviceWorker Service worker containing all the information
    */
-  public void setupSuperstep(CentralizedServiceWorker<?, ?, ?> serviceWorker) {
+  public final void setupSuperstep(
+      CentralizedServiceWorker<?, ?, ?> serviceWorker) {
     this.serviceWorker = serviceWorker;
-    workerList = serviceWorker.getWorkerInfoList();
-    myWorkerIndex = workerList.indexOf(serviceWorker.getWorkerInfo());
+    allWorkersInfo = new AllWorkersInfo(
+        serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
   }
 
   /**
@@ -98,8 +97,9 @@ public abstract class WorkerContext
    *
    * @return Number of workers
    */
-  public int getWorkerCount() {
-    return workerList.size();
+  @Override
+  public final int getWorkerCount() {
+    return allWorkersInfo.getWorkerCount();
   }
 
   /**
@@ -107,8 +107,15 @@ public abstract class WorkerContext
    *
    * @return Index of this worker
    */
-  public int getMyWorkerIndex() {
-    return myWorkerIndex;
+  @Override
+  public final int getMyWorkerIndex() {
+    return allWorkersInfo.getMyWorkerIndex();
+  }
+
+  @Override
+  public final int getWorkerForVertex(WritableComparable vertexId) {
+    return allWorkersInfo.getWorkerIndex(
+        serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
   }
 
   /**
@@ -117,7 +124,7 @@ public abstract class WorkerContext
    *
    * @return Messages received
    */
-  public List<Writable> getAndClearMessagesFromOtherWorkers() {
+  public final List<Writable> getAndClearMessagesFromOtherWorkers() {
     return serviceWorker.getServerData().
         getAndClearCurrentWorkerToWorkerMessages();
   }
@@ -128,14 +135,14 @@ public abstract class WorkerContext
    * @param message Message to send
    * @param workerIndex Index of the worker to send the message to
    */
-  public void sendMessageToWorker(Writable message, int workerIndex) {
+  public final void sendMessageToWorker(Writable message, int workerIndex) {
     SendWorkerToWorkerMessageRequest request =
         new SendWorkerToWorkerMessageRequest(message);
-    if (workerIndex == myWorkerIndex) {
+    if (workerIndex == getMyWorkerIndex()) {
       request.doRequest(serviceWorker.getServerData());
     } else {
       serviceWorker.getWorkerClient().sendWritableRequest(
-          workerList.get(workerIndex).getTaskId(), request);
+          allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), 
request);
     }
   }
 
@@ -151,7 +158,7 @@ public abstract class WorkerContext
    *
    * @return Current superstep
    */
-  public long getSuperstep() {
+  public final long getSuperstep() {
     return graphState.getSuperstep();
   }
 
@@ -161,7 +168,7 @@ public abstract class WorkerContext
    *
    * @return Total number of vertices (-1 if first superstep)
    */
-  public long getTotalNumVertices() {
+  public final long getTotalNumVertices() {
     return graphState.getTotalNumVertices();
   }
 
@@ -171,7 +178,7 @@ public abstract class WorkerContext
    *
    * @return Total number of edges (-1 if first superstep)
    */
-  public long getTotalNumEdges() {
+  public final long getTotalNumEdges() {
     return graphState.getTotalNumEdges();
   }
 
@@ -180,7 +187,7 @@ public abstract class WorkerContext
    *
    * @return Mapper context
    */
-  public Mapper.Context getContext() {
+  public final Mapper.Context getContext() {
     return graphState.getContext();
   }
 
@@ -190,7 +197,7 @@ public abstract class WorkerContext
    *
    * @param line Line to print
    */
-  public void logToCommandLine(String line) {
+  public final void logToCommandLine(String line) {
     serviceWorker.getJobProgressTracker().logInfo(line);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java
new file mode 100644
index 0000000..0af70a5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.worker;
+
+/**
+ * Interface providing utilities for using worker index.
+ *
+ * @param <I> Vertex id type
+ */
+public interface WorkerIndexUsage<I> {
+  /**
+   * Get number of workers
+   *
+   * @return Number of workers
+   */
+  int getWorkerCount();
+
+  /**
+   * Get index for this worker
+   *
+   * @return Index of this worker
+   */
+  int getMyWorkerIndex();
+
+  /**
+   * Get worker index which will contain vertex with given id,
+   * if such vertex exists.
+   *
+   * @param vertexId vertex id
+   * @return worker index
+   */
+  int getWorkerForVertex(I vertexId);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java 
b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 0a1da49..63403ab 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -96,7 +96,7 @@ public class MockUtils {
             Mockito.verify(workerClientRequestProcessor).sendMessageRequest
                 (targetVertexId, message);
         }
-        
+
         public void verifyMessageSentToAllEdges(Vertex<I, V, E> vertex, M 
message) {
           
Mockito.verify(workerClientRequestProcessor).sendMessageToAllRequest(vertex, 
message);
       }
@@ -137,7 +137,7 @@ public class MockUtils {
     Mockito.when(env.getContext().getConfiguration())
         .thenReturn(env.getConfiguration());
     computation.initialize(env.getGraphState(),
-        env.getWorkerClientRequestProcessor(), null, null, null);
+        env.getWorkerClientRequestProcessor(), null, null);
 
     GiraphConfiguration giraphConf = new GiraphConfiguration();
     giraphConf.setComputationClass(computation.getClass());

Reply via email to