Repository: giraph Updated Branches: refs/heads/trunk 81d5badf7 -> c32f465d4
[GIRAPH-1037] Surface worker index information to computations Summary: It can be useful for applications to surface: number of workers index of a worker particular VertexId is assigned to index of a current worker. Test Plan: mvn clean install Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D49401 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c32f465d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c32f465d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c32f465d Branch: refs/heads/trunk Commit: c32f465d442c52edb8aba4e7fb78c153639d8cd7 Parents: 81d5bad Author: Igor Kabiljo <[email protected]> Authored: Fri Oct 23 17:34:07 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Fri Nov 13 12:52:27 2015 -0800 ---------------------------------------------------------------------- .../block_app/framework/api/BlockApi.java | 8 +- .../block_app/framework/api/BlockWorkerApi.java | 3 +- .../framework/api/BlockWorkerContextApi.java | 24 ++--- .../api/BlockWorkerContextReceiveApi.java | 8 +- .../api/BlockWorkerContextSendApi.java | 8 +- .../api/giraph/BlockMasterApiWrapper.java | 5 + .../api/giraph/BlockWorkerApiWrapper.java | 15 +++ .../api/giraph/BlockWorkerContext.java | 3 +- .../giraph/BlockWorkerContextApiWrapper.java | 12 ++- .../framework/api/local/InternalApi.java | 15 ++- .../framework/piece/AbstractPiece.java | 2 +- .../giraph/block_app/framework/piece/Piece.java | 2 +- .../framework/piece/delegate/DelegatePiece.java | 2 +- .../block_app/library/gc/WorkerGCPiece.java | 2 +- .../block_app/migration/MigrationPiece.java | 2 +- .../migration/MigrationWorkerContext.java | 5 +- .../block_app/framework/TestWorkerMessages.java | 3 +- .../giraph/graph/AbstractComputation.java | 44 ++++++-- .../org/apache/giraph/graph/Computation.java | 11 +- .../apache/giraph/graph/ComputeCallable.java | 4 +- .../org/apache/giraph/master/MasterCompute.java | 12 +++ .../apache/giraph/worker/AllWorkersInfo.java | 103 +++++++++++++++++++ .../org/apache/giraph/worker/WorkerContext.java | 51 +++++---- .../apache/giraph/worker/WorkerIndexUsage.java | 48 +++++++++ .../java/org/apache/giraph/utils/MockUtils.java | 4 +- 25 files changed, 315 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java index a0c92ad..d914b9a 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java @@ -17,12 +17,18 @@ */ package org.apache.giraph.block_app.framework.api; - /** * Basic block computation API for accessing items * present on both workers and master. */ public interface BlockApi extends BlockConfApi { + /** + * Get number of workers + * + * @return Number of workers + */ + int getWorkerCount(); + /** * Get the total (all workers) number of vertices that * existed at the start of the current piece. http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java index 727bf08..76898f6 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java @@ -19,6 +19,7 @@ package org.apache.giraph.block_app.framework.api; import org.apache.giraph.aggregators.AggregatorUsage; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.worker.WorkerIndexUsage; import org.apache.hadoop.io.WritableComparable; /** @@ -29,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable; */ @SuppressWarnings("rawtypes") public interface BlockWorkerApi<I extends WritableComparable> - extends AggregatorUsage, BlockApi { + extends BlockApi, AggregatorUsage, WorkerIndexUsage<I> { @Override ImmutableClassesGiraphConfiguration<I, ?, ?> getConf(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java index d5918b5..699f76a 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java @@ -18,28 +18,16 @@ package org.apache.giraph.block_app.framework.api; import org.apache.giraph.aggregators.AggregatorUsage; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.worker.WorkerIndexUsage; +import org.apache.hadoop.io.WritableComparable; /** * Block computation API available for worker context methods. * * Interface to the WorkerContext methods. + * @param <I> vertex Id type. */ -public interface BlockWorkerContextApi extends AggregatorUsage, BlockApi { - @Override - ImmutableClassesGiraphConfiguration<?, ?, ?> getConf(); - - /** - * Get number of workers - * - * @return Number of workers - */ - int getWorkerCount(); - - /** - * Get index for this worker - * - * @return Index of this worker - */ - int getMyWorkerIndex(); +@SuppressWarnings("rawtypes") +public interface BlockWorkerContextApi<I extends WritableComparable> + extends AggregatorUsage, BlockApi, WorkerIndexUsage<I> { } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java index a8242b2..40a1e7d 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java @@ -18,12 +18,16 @@ package org.apache.giraph.block_app.framework.api; import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.WritableComparable; /** * Block computation API available for worker context receive methods. * * Interface to the WorkerContext methods. + * + * @param <I> vertex Id type. */ -public interface BlockWorkerContextReceiveApi - extends BlockWorkerContextApi, WorkerBroadcastUsage { +@SuppressWarnings("rawtypes") +public interface BlockWorkerContextReceiveApi<I extends WritableComparable> + extends BlockWorkerContextApi<I>, WorkerBroadcastUsage { } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java index 769562d..386c6bb 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java @@ -19,16 +19,20 @@ package org.apache.giraph.block_app.framework.api; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * Block computation API available for worker send methods. * * Interface to the WorkerContext methods. * + * @param <I> vertex Id type. * @param <WM> Worker message type */ -public interface BlockWorkerContextSendApi<WM extends Writable> - extends BlockWorkerContextApi, WorkerAggregatorUsage { +@SuppressWarnings("rawtypes") +public interface BlockWorkerContextSendApi + <I extends WritableComparable, WM extends Writable> + extends BlockWorkerContextApi<I>, WorkerAggregatorUsage { /** * Send message to another worker * http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java index ede6005..55d3693 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java @@ -167,4 +167,9 @@ final class BlockMasterApiWrapper implements BlockMasterApi, public BlockOutputHandle getBlockOutputHandle() { return outputHandle; } + + @Override + public int getWorkerCount() { + return master.getWorkerInfoList().size(); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java index 6e839f9..7bea80a 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java @@ -177,4 +177,19 @@ final class BlockWorkerApiWrapper<I extends WritableComparable, public <OW extends BlockOutputWriter> OW getWriter(String confOption) { return getBlockWorkerContext().getOutputHandle().getWriter(confOption); } + + @Override + public int getMyWorkerIndex() { + return worker.getMyWorkerIndex(); + } + + @Override + public int getWorkerCount() { + return worker.getWorkerCount(); + } + + @Override + public int getWorkerForVertex(I vertexId) { + return worker.getWorkerForVertex(vertexId); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java index 1a4f8d8..9618822 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java @@ -29,6 +29,7 @@ import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.writable.kryo.HadoopKryo; import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; /** @@ -53,7 +54,7 @@ public final class BlockWorkerContext extends WorkerContext @Override public void preSuperstep() { List<Writable> messages = getAndClearMessagesFromOtherWorkers(); - BlockWorkerContextApiWrapper<Writable> workerApi = + BlockWorkerContextApiWrapper<WritableComparable, Writable> workerApi = new BlockWorkerContextApiWrapper<>(this); BlockWorkerPieces<Object> workerPieces = BlockWorkerPieces.getNextWorkerPieces(this); http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java index c52b6a5..7336bd4 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java @@ -22,15 +22,18 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.worker.WorkerContext; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * Giraph implementation of BlockWorkerContextReceiveApi and * BlockWorkerContextSendApi, passing all calls to WorkerContext. * + * @param <I> vertex Id type. * @param <WM> Worker message type */ -final class BlockWorkerContextApiWrapper<WM extends Writable> - implements BlockWorkerContextReceiveApi, BlockWorkerContextSendApi<WM> { +final class BlockWorkerContextApiWrapper + <I extends WritableComparable, WM extends Writable> implements + BlockWorkerContextReceiveApi<I>, BlockWorkerContextSendApi<I, WM> { private final WorkerContext workerContext; public BlockWorkerContextApiWrapper(WorkerContext workerContext) { @@ -53,6 +56,11 @@ final class BlockWorkerContextApiWrapper<WM extends Writable> } @Override + public int getWorkerForVertex(I vertexId) { + return workerContext.getWorkerForVertex(vertexId); + } + + @Override public <A extends Writable> A getAggregatedValue(String name) { return workerContext.getAggregatedValue(name); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java index 99e9e24..3ca8b1c 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java @@ -117,8 +117,8 @@ class InternalApi<I extends WritableComparable, V extends Writable, */ class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E> implements BlockWorkerSendApi<I, V, E, Writable>, - BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<Writable>, - BlockWorkerContextReceiveApi, BlockWorkerValueAccessor, + BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<I, Writable>, + BlockWorkerContextReceiveApi<I>, BlockWorkerValueAccessor, WorkerGlobalCommUsage { @Override @@ -178,6 +178,11 @@ class InternalApi<I extends WritableComparable, V extends Writable, } @Override + public int getWorkerForVertex(I vertexId) { + return 0; + } + + @Override public void sendMessageToWorker(Writable message, int workerIndex) { Preconditions.checkArgument(workerIndex == getMyWorkerIndex(), "With just one worker you can only send worker message to itself, " + @@ -425,8 +430,12 @@ class InternalApi<I extends WritableComparable, V extends Writable, return workerContextLogic.getOutputHandle().getWriter(confOption); } - public BlockWorkerContextLogic getWorkerContextLogic() { return workerContextLogic; } + + @Override + public int getWorkerCount() { + return 1; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java index 882f4f1..f4e3678 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/AbstractPiece.java @@ -159,7 +159,7 @@ public abstract class AbstractPiece<I extends WritableComparable, * getVertexSender. */ public void workerContextSend( - BlockWorkerContextSendApi<WM> workerContextApi, S executionStage, + BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage, WV workerValue) { } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java index 3ad66d1..0ef0701 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/Piece.java @@ -47,7 +47,7 @@ public class Piece<I extends WritableComparable, V extends Writable, // Disallowing use of Worker Context functions: @Override public final void workerContextSend( - BlockWorkerContextSendApi<NoMessage> workerContextApi, + BlockWorkerContextSendApi<I, NoMessage> workerContextApi, S executionStage, Object workerValue) { } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java index 23c2d29..095ea74 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/delegate/DelegatePiece.java @@ -194,7 +194,7 @@ public class DelegatePiece<I extends WritableComparable, V extends Writable, @Override public void workerContextSend( - BlockWorkerContextSendApi<WM> workerContextApi, S executionStage, + BlockWorkerContextSendApi<I, WM> workerContextApi, S executionStage, WV workerValue) { for (AbstractPiece<I, V, E, M, WV, WM, S> piece : innerPieces) { piece.workerContextSend(workerContextApi, executionStage, workerValue); http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java index 83ef3fd..5912b9a 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java @@ -34,7 +34,7 @@ public class WorkerGCPiece extends PieceWithWorkerContext<WritableComparable, @Override @SuppressFBWarnings(value = "DM_GC") public void workerContextSend( - BlockWorkerContextSendApi<NoMessage> workerContextApi, + BlockWorkerContextSendApi<WritableComparable, NoMessage> workerContextApi, Object executionStage, Object workerValue) { System.gc(); http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java index c60ea2c..7a935c0 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationPiece.java @@ -258,7 +258,7 @@ public final class MigrationPiece<I extends WritableComparable, @Override public void workerContextSend( - BlockWorkerContextSendApi<Writable> workerContextApi, + BlockWorkerContextSendApi<I, Writable> workerContextApi, MigrationSuperstepStage executionStage, MigrationWorkerContext workerValue) { if (workerValue != null && !isFirstStep) { http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java index 014c269..5cc4925 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/migration/MigrationWorkerContext.java @@ -26,6 +26,7 @@ import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi; import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * Replacement for WorkerContext when migrating to Blocks Framework, @@ -84,8 +85,8 @@ public class MigrationWorkerContext } public final void sendMessageToWorker(Writable message, int workerIndex) { - ((BlockWorkerContextSendApi<Writable>) api).sendMessageToWorker( - message, workerIndex); + ((BlockWorkerContextSendApi<WritableComparable, Writable>) api) + .sendMessageToWorker(message, workerIndex); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java index a68fca2..05f81b6 100644 --- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java @@ -91,7 +91,8 @@ public class TestWorkerMessages { } @Override - public void workerContextSend(BlockWorkerContextSendApi<LongWritable> workerContextApi, + public void workerContextSend( + BlockWorkerContextSendApi<LongWritable, LongWritable> workerContextApi, Object executionStage, Object workerValue) { for (long value : values) { workerContextApi.sendMessageToWorker(new LongWritable(value), http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java index 1ea6603..086bc48 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java @@ -21,16 +21,17 @@ package org.apache.giraph.graph; import java.io.IOException; import java.util.Iterator; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.worker.AllWorkersInfo; import org.apache.giraph.worker.WorkerAggregatorDelegator; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; /** * See {@link Computation} for explanation of the interface. @@ -54,17 +55,16 @@ public abstract class AbstractComputation<I extends WritableComparable, M2 extends Writable> extends WorkerAggregatorDelegator<I, V, E> implements Computation<I, V, E, M1, M2> { - /** Logger */ - private static final Logger LOG = Logger.getLogger(AbstractComputation.class); - /** Global graph state **/ private GraphState graphState; /** Handles requests */ private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor; - /** Graph-wide BSP Mapper for this Computation */ - private GraphTaskManager<I, V, E> graphTaskManager; + /** Service worker */ + private CentralizedServiceWorker<I, V, E> serviceWorker; /** Worker context */ private WorkerContext workerContext; + /** All workers info */ + private AllWorkersInfo allWorkersInfo; /** * Must be defined by user to do computation on a single Vertex. @@ -109,14 +109,20 @@ public abstract class AbstractComputation<I extends WritableComparable, public void initialize( GraphState graphState, WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor, - GraphTaskManager<I, V, E> graphTaskManager, - WorkerGlobalCommUsage workerGlobalCommUsage, - WorkerContext workerContext) { + CentralizedServiceWorker<I, V, E> serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage) { this.graphState = graphState; this.workerClientRequestProcessor = workerClientRequestProcessor; - this.graphTaskManager = graphTaskManager; this.setWorkerGlobalCommUsage(workerGlobalCommUsage); - this.workerContext = workerContext; + this.serviceWorker = serviceWorker; + if (serviceWorker != null) { + this.workerContext = serviceWorker.getWorkerContext(); + this.allWorkersInfo = new AllWorkersInfo( + serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo()); + } else { + this.workerContext = null; + this.allWorkersInfo = null; + } } /** @@ -273,4 +279,20 @@ public abstract class AbstractComputation<I extends WritableComparable, public <W extends WorkerContext> W getWorkerContext() { return (W) workerContext; } + + @Override + public final int getWorkerCount() { + return allWorkersInfo.getWorkerCount(); + } + + @Override + public final int getMyWorkerIndex() { + return allWorkersInfo.getMyWorkerIndex(); + } + + @Override + public final int getWorkerForVertex(I vertexId) { + return allWorkersInfo.getWorkerIndex( + serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java index d310da9..1ac6f43 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java @@ -20,6 +20,7 @@ package org.apache.giraph.graph; import java.io.IOException; import java.util.Iterator; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.TypesHolder; @@ -28,6 +29,7 @@ import org.apache.giraph.edge.OutEdges; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.giraph.worker.WorkerIndexUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -56,7 +58,7 @@ public interface Computation<I extends WritableComparable, M2 extends Writable> extends TypesHolder<I, V, E, M1, M2>, ImmutableClassesGiraphConfigurable<I, V, E>, - WorkerGlobalCommUsage, WorkerAggregatorUsage { + WorkerGlobalCommUsage, WorkerAggregatorUsage, WorkerIndexUsage<I> { /** * Must be defined by user to do computation on a single Vertex. * @@ -87,14 +89,13 @@ public interface Computation<I extends WritableComparable, * * @param graphState Graph state * @param workerClientRequestProcessor Processor for handling requests - * @param graphTaskManager Graph-wide BSP Mapper for this Vertex + * @param serviceWorker Centralized service worker * @param workerGlobalCommUsage Worker global communication usage - * @param workerContext Worker context */ void initialize(GraphState graphState, WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor, - GraphTaskManager<I, V, E> graphTaskManager, - WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext); + CentralizedServiceWorker<I, V, E> serviceWorker, + WorkerGlobalCommUsage workerGlobalCommUsage); /** * Retrieves the current superstep. http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index fa268da..d59d044 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -40,7 +40,6 @@ import org.apache.giraph.time.Times; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.TimedLogger; import org.apache.giraph.utils.Trimmable; -import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerProgress; import org.apache.giraph.worker.WorkerThreadGlobalCommUsage; import org.apache.hadoop.io.Writable; @@ -138,14 +137,13 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, useOneMessageToManyIdsEncoding()); WorkerThreadGlobalCommUsage aggregatorUsage = serviceWorker.getAggregatorHandler().newThreadAggregatorUsage(); - WorkerContext workerContext = serviceWorker.getWorkerContext(); vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter(); Computation<I, V, E, M1, M2> computation = (Computation<I, V, E, M1, M2>) configuration.createComputation(); computation.initialize(graphState, workerClientRequestProcessor, - serviceWorker.getGraphTaskManager(), aggregatorUsage, workerContext); + serviceWorker, aggregatorUsage); computation.preSuperstep(); List<PartitionStats> partitionStatsList = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index 50e3b36..dad766b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -18,6 +18,8 @@ package org.apache.giraph.master; +import java.util.List; + import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.combiner.MessageCombiner; @@ -26,6 +28,7 @@ import org.apache.giraph.conf.MessageClasses; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.GraphState; import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -126,6 +129,15 @@ public abstract class MasterCompute } /** + * Get list of workers + * + * @return List of workers + */ + public final List<WorkerInfo> getWorkerInfoList() { + return serviceMaster.getWorkerInfoList(); + } + + /** * Set Computation class to be used * * @param computationClass Computation class http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java b/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java new file mode 100644 index 0000000..0112134 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/AllWorkersInfo.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.worker; + +import java.util.ArrayList; +import java.util.List; + +import org.python.google.common.base.Preconditions; + +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; + +/** + * Information about all workers, their WorkerInfo values, and indices. + */ +public class AllWorkersInfo { + /** List of workers in current superstep, sorted by task id */ + private final List<WorkerInfo> workerInfos; + /** My worker index */ + private final int myWorkerIndex; + /** Map of taskId to worker index */ + private final Int2IntOpenHashMap taskIdToIndex; + + /** + * Constructor + * @param workers Ordered list of workers + * @param myWorker My worker + */ + public AllWorkersInfo(List<WorkerInfo> workers, WorkerInfo myWorker) { + workerInfos = new ArrayList<>(workers); + + taskIdToIndex = new Int2IntOpenHashMap(workerInfos.size()); + taskIdToIndex.defaultReturnValue(-1); + for (int i = 0; i < workerInfos.size(); i++) { + int task = workerInfos.get(i).getTaskId(); + if (i > 0) { + Preconditions.checkState( + task > workerInfos.get(i - 1).getTaskId(), "Tasks not ordered"); + } + Preconditions.checkState(task >= 0, "Task not specified, %d", task); + int old = taskIdToIndex.put(task, i); + Preconditions.checkState(old == -1, + "Task with %d id found twice (positions %d and %d)", + task, i, old); + } + + myWorkerIndex = getWorkerIndex(myWorker); + } + + /** + * List of WorkerInfos + * + * @return List of WorkerInfos + */ + public List<WorkerInfo> getWorkerList() { + return workerInfos; + } + + /** + * Get number of workers + * + * @return Number of workers + */ + public int getWorkerCount() { + return workerInfos.size(); + } + + /** + * Get index for this worker + * + * @return Index of this worker + */ + public int getMyWorkerIndex() { + return myWorkerIndex; + } + + /** + * For every worker this method returns unique number + * between 0 and N, where N is the total number of workers. + * This number stays the same throughout the computation. + * TaskID may be different from this number and task ID + * is not necessarily continuous + * @param workerInfo worker info object + * @return worker number + */ + public int getWorkerIndex(WorkerInfo workerInfo) { + return taskIdToIndex.get(workerInfo.getTaskId()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java index b977ea1..dd909d3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java @@ -37,23 +37,21 @@ import org.apache.hadoop.mapreduce.Mapper; @SuppressWarnings("rawtypes") public abstract class WorkerContext extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable> - implements Writable { + implements Writable, WorkerIndexUsage<WritableComparable> { /** Global graph state */ private GraphState graphState; /** Service worker */ private CentralizedServiceWorker serviceWorker; - /** Sorted list of other participating workers */ - private List<WorkerInfo> workerList; - /** Index of this worker within workerList */ - private int myWorkerIndex; + /** All workers info */ + private AllWorkersInfo allWorkersInfo; /** * Set the graph state. * * @param graphState Used to set the graph state. */ - public void setGraphState(GraphState graphState) { + public final void setGraphState(GraphState graphState) { this.graphState = graphState; } @@ -62,10 +60,11 @@ public abstract class WorkerContext * * @param serviceWorker Service worker containing all the information */ - public void setupSuperstep(CentralizedServiceWorker<?, ?, ?> serviceWorker) { + public final void setupSuperstep( + CentralizedServiceWorker<?, ?, ?> serviceWorker) { this.serviceWorker = serviceWorker; - workerList = serviceWorker.getWorkerInfoList(); - myWorkerIndex = workerList.indexOf(serviceWorker.getWorkerInfo()); + allWorkersInfo = new AllWorkersInfo( + serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo()); } /** @@ -98,8 +97,9 @@ public abstract class WorkerContext * * @return Number of workers */ - public int getWorkerCount() { - return workerList.size(); + @Override + public final int getWorkerCount() { + return allWorkersInfo.getWorkerCount(); } /** @@ -107,8 +107,15 @@ public abstract class WorkerContext * * @return Index of this worker */ - public int getMyWorkerIndex() { - return myWorkerIndex; + @Override + public final int getMyWorkerIndex() { + return allWorkersInfo.getMyWorkerIndex(); + } + + @Override + public final int getWorkerForVertex(WritableComparable vertexId) { + return allWorkersInfo.getWorkerIndex( + serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo()); } /** @@ -117,7 +124,7 @@ public abstract class WorkerContext * * @return Messages received */ - public List<Writable> getAndClearMessagesFromOtherWorkers() { + public final List<Writable> getAndClearMessagesFromOtherWorkers() { return serviceWorker.getServerData(). getAndClearCurrentWorkerToWorkerMessages(); } @@ -128,14 +135,14 @@ public abstract class WorkerContext * @param message Message to send * @param workerIndex Index of the worker to send the message to */ - public void sendMessageToWorker(Writable message, int workerIndex) { + public final void sendMessageToWorker(Writable message, int workerIndex) { SendWorkerToWorkerMessageRequest request = new SendWorkerToWorkerMessageRequest(message); - if (workerIndex == myWorkerIndex) { + if (workerIndex == getMyWorkerIndex()) { request.doRequest(serviceWorker.getServerData()); } else { serviceWorker.getWorkerClient().sendWritableRequest( - workerList.get(workerIndex).getTaskId(), request); + allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), request); } } @@ -151,7 +158,7 @@ public abstract class WorkerContext * * @return Current superstep */ - public long getSuperstep() { + public final long getSuperstep() { return graphState.getSuperstep(); } @@ -161,7 +168,7 @@ public abstract class WorkerContext * * @return Total number of vertices (-1 if first superstep) */ - public long getTotalNumVertices() { + public final long getTotalNumVertices() { return graphState.getTotalNumVertices(); } @@ -171,7 +178,7 @@ public abstract class WorkerContext * * @return Total number of edges (-1 if first superstep) */ - public long getTotalNumEdges() { + public final long getTotalNumEdges() { return graphState.getTotalNumEdges(); } @@ -180,7 +187,7 @@ public abstract class WorkerContext * * @return Mapper context */ - public Mapper.Context getContext() { + public final Mapper.Context getContext() { return graphState.getContext(); } @@ -190,7 +197,7 @@ public abstract class WorkerContext * * @param line Line to print */ - public void logToCommandLine(String line) { + public final void logToCommandLine(String line) { serviceWorker.getJobProgressTracker().logInfo(line); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java new file mode 100644 index 0000000..0af70a5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerIndexUsage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.worker; + +/** + * Interface providing utilities for using worker index. + * + * @param <I> Vertex id type + */ +public interface WorkerIndexUsage<I> { + /** + * Get number of workers + * + * @return Number of workers + */ + int getWorkerCount(); + + /** + * Get index for this worker + * + * @return Index of this worker + */ + int getMyWorkerIndex(); + + /** + * Get worker index which will contain vertex with given id, + * if such vertex exists. + * + * @param vertexId vertex id + * @return worker index + */ + int getWorkerForVertex(I vertexId); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c32f465d/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java index 0a1da49..63403ab 100644 --- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java +++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java @@ -96,7 +96,7 @@ public class MockUtils { Mockito.verify(workerClientRequestProcessor).sendMessageRequest (targetVertexId, message); } - + public void verifyMessageSentToAllEdges(Vertex<I, V, E> vertex, M message) { Mockito.verify(workerClientRequestProcessor).sendMessageToAllRequest(vertex, message); } @@ -137,7 +137,7 @@ public class MockUtils { Mockito.when(env.getContext().getConfiguration()) .thenReturn(env.getConfiguration()); computation.initialize(env.getGraphState(), - env.getWorkerClientRequestProcessor(), null, null, null); + env.getWorkerClientRequestProcessor(), null, null); GiraphConfiguration giraphConf = new GiraphConfiguration(); giraphConf.setComputationClass(computation.getClass());
