Repository: giraph Updated Branches: refs/heads/trunk 27f234f1f -> a1a236fa6
[GIRAPH-1013] Add library of common pieces and functions Summary: StripingUtils has been modified, to be compiled with Java7, and to have snippet of MIT lincense for used hash algorithm. Test Plan: mvn clean install Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D39915 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a1a236fa Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a1a236fa Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a1a236fa Branch: refs/heads/trunk Commit: a1a236fa6f8bd2c9e3ba479e8cc1aa94a7f1f402 Parents: 27f234f Author: Igor Kabiljo <[email protected]> Authored: Wed Jun 10 15:33:35 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Tue Jun 16 11:13:37 2015 -0700 ---------------------------------------------------------------------- .../apache/giraph/block_app/library/Pieces.java | 368 +++++++++++++++++++ .../block_app/library/SendMessageChain.java | 288 +++++++++++++++ .../block_app/library/VertexSuppliers.java | 176 +++++++++ .../library/internal/SendMessagePiece.java | 158 ++++++++ .../internal/SendMessageWithCombinerPiece.java | 167 +++++++++ .../library/internal/package-info.java | 21 ++ .../iteration/IterationCounterPiece.java | 35 ++ .../library/iteration/IterationStage.java | 28 ++ .../library/iteration/IterationStageImpl.java | 48 +++ .../library/iteration/package-info.java | 21 ++ .../giraph/block_app/library/package-info.java | 21 ++ .../library/striping/StripingUtils.java | 225 ++++++++++++ .../library/striping/package-info.java | 21 ++ .../apache/giraph/function/PairPredicate.java | 35 ++ .../org/apache/giraph/function/Predicate.java | 36 ++ .../function/primitive/Int2ObjFunction.java | 36 ++ .../function/primitive/Obj2IntFunction.java | 33 ++ 17 files changed, 1717 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java new file mode 100644 index 0000000..88b78a3 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.internal.SendMessagePiece; +import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Utility class for creating common Pieces and computations for processing + * graphs. + */ +public class Pieces { + private static final Logger LOG = Logger.getLogger(Pieces.class); + + private Pieces() { } + + /** + * For each vertex execute given process function. + * Computation is happening in send phase of the returned Piece. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Piece<I, V, E, NoMessage, Object> forAllVertices( + final String pieceName, final Consumer<Vertex<I, V, E>> process) { + return new Piece<I, V, E, NoMessage, Object>() { + @Override + public VertexSender<I, V, E> getVertexSender( + BlockWorkerSendApi<I, V, E, NoMessage> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, E> vertex) { + process.apply(vertex); + } + }; + } + + @Override + public String toString() { + return pieceName; + } + }; + } + + /** + * For each vertex execute given process function. + * Computation is happening in the receive phase of the returned Piece. + * This function should be used if you need returned Piece to interact with + * subsequent Piece, as that requires passed function to be executed + * during receive phase, + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Piece<I, V, E, NoMessage, Object> forAllVerticesOnReceive( + final String pieceName, final Consumer<Vertex<I, V, E>> process) { + return new Piece<I, V, E, NoMessage, Object>() { + @Override + public VertexReceiver<I, V, E, NoMessage> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return new InnerVertexReceiver() { + @Override + public void vertexReceive( + Vertex<I, V, E> vertex, Iterable<NoMessage> messages) { + process.apply(vertex); + } + }; + } + + @Override + public String toString() { + return pieceName; + } + }; + } + + /** + * Creates Piece which removes vertices for which supplier returns true. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Piece<I, V, E, NoMessage, Object> removeVertices( + final String pieceName, + final SupplierFromVertex<I, V, E, Boolean> shouldRemoveVertex) { + return new Piece<I, V, E, NoMessage, Object>() { + private ReducerHandle<LongWritable, LongWritable> countRemovedAgg; + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + countRemovedAgg = reduceApi.createLocalReducer(SumReduce.LONG); + } + + @Override + public VertexSender<I, V, E> getVertexSender( + final BlockWorkerSendApi<I, V, E, NoMessage> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, E> vertex) { + if (shouldRemoveVertex.get(vertex)) { + workerApi.removeVertexRequest(vertex.getId()); + reduceLong(countRemovedAgg, 1); + } + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + LOG.info("Removed " + countRemovedAgg.getReducedValue(master) + + " vertices from the graph, during stage " + executionStage); + } + + @Override + public String toString() { + return pieceName; + } + }; + } + + /** + * Creates single reducer piece - given reduce class, supplier of values on + * worker, reduces and passes the result to given consumer on master. + * + * @param <S> Single value type, objects passed on workers + * @param <R> Reduced value type + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ + public static + <S, R extends Writable, I extends WritableComparable, V extends Writable, + E extends Writable> + Piece<I, V, E, NoMessage, Object> reduce( + final String name, + final ReduceOperation<S, R> reduceOp, + final SupplierFromVertex<I, V, E, S> valueSupplier, + final Consumer<R> reducedValueConsumer) { + return new Piece<I, V, E, NoMessage, Object>() { + private ReducerHandle<S, R> handle; + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + handle = reduceApi.createLocalReducer(reduceOp); + } + + @Override + public VertexSender<I, V, E> getVertexSender( + BlockWorkerSendApi<I, V, E, NoMessage> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, E> vertex) { + handle.reduce(valueSupplier.get(vertex)); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + reducedValueConsumer.apply(handle.getReducedValue(master)); + } + + @Override + public String toString() { + return name; + } + }; + } + + /** + * Creates single reducer and broadcast piece - given reduce class, supplier + * of values on worker, reduces and broadcasts the value, passing it to the + * consumer on worker for each vertex. + * + * @param <S> Single value type, objects passed on workers + * @param <R> Reduced value type + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ + public static + <S, R extends Writable, I extends WritableComparable, V extends Writable, + E extends Writable> + Piece<I, V, E, NoMessage, Object> reduceAndBroadcast( + final String name, + final ReduceOperation<S, R> reduceOp, + final SupplierFromVertex<I, V, E, S> valueSupplier, + final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) { + return new Piece<I, V, E, NoMessage, Object>() { + private final ReducerAndBroadcastWrapperHandle<S, R> handle = + new ReducerAndBroadcastWrapperHandle<>(); + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + handle.registeredReducer(reduceApi.createLocalReducer(reduceOp)); + } + + @Override + public VertexSender<I, V, E> getVertexSender( + BlockWorkerSendApi<I, V, E, NoMessage> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, E> vertex) { + handle.reduce(valueSupplier.get(vertex)); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + handle.broadcastValue(master); + } + + @Override + public VertexReceiver<I, V, E, NoMessage> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + final R value = handle.getBroadcast(workerApi); + return new InnerVertexReceiver() { + @Override + public void vertexReceive( + Vertex<I, V, E> vertex, Iterable<NoMessage> messages) { + reducedValueConsumer.apply(vertex, value); + } + }; + } + + @Override + public String toString() { + return name; + } + }; + } + + /** + * Creates Piece that for each vertex, sends message provided by + * messageSupplier to all targets provided by targetsSupplier. + * Received messages are then passed to and processed by provided + * messagesConsumer. + * + * If messageSupplier or targetsSupplier returns null, current vertex + * is not going to send any messages. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> + SendMessagePiece<I, V, E, M> sendMessage( + String name, + Class<M> messageClass, + SupplierFromVertex<I, V, E, M> messageSupplier, + SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier, + ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) { + return new SendMessagePiece<>( + name, messageClass, messageSupplier, targetsSupplier, messagesConsumer); + } + + /** + * Creates Piece that for each vertex, sends message provided by + * messageSupplier to all neighbors of current vertex. + * Received messages are then passed to and processed by provided + * messagesConsumer. + * + * If messageSupplier returns null, current vertex + * is not going to send any messages. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> + SendMessagePiece<I, V, E, M> sendMessageToNeighbors( + String name, + Class<M> messageClass, + SupplierFromVertex<I, V, E, M> messageSupplier, + ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) { + return sendMessage( + name, messageClass, messageSupplier, + VertexSuppliers.<I, V, E>vertexNeighborsSupplier(), + messagesConsumer); + } + + /** + * Creates Piece that for each vertex, sends message provided by + * messageSupplier to all targets provided by targetsSupplier, + * and uses given messageCombiner to combine messages together. + * Received combined message is then passed to and processed by provided + * messageConsumer. (null is passed to it, if vertex received no messages) + * + * If messageSupplier or targetsSupplier returns null, current vertex + * is not going to send any messages. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> + SendMessageWithCombinerPiece<I, V, E, M> sendMessage( + String name, + MessageCombiner<? super I, M> messageCombiner, + SupplierFromVertex<I, V, E, M> messageSupplier, + SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier, + ConsumerWithVertex<I, V, E, M> messagesConsumer) { + return new SendMessageWithCombinerPiece<>( + name, messageCombiner, + messageSupplier, targetsSupplier, messagesConsumer); + } + + /** + * Creates Piece that for each vertex, sends message provided by + * messageSupplier to all neighbors of current vertex, + * and uses given messageCombiner to combine messages together. + * Received combined message is then passed to and processed by provided + * messageConsumer. (null is passed to it, if vertex received no messages) + * + * If messageSupplier returns null, current vertex + * is not going to send any messages. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> + SendMessageWithCombinerPiece<I, V, E, M> sendMessageToNeighbors( + String name, + MessageCombiner<? super I, M> messageCombiner, + SupplierFromVertex<I, V, E, M> messageSupplier, + ConsumerWithVertex<I, V, E, M> messagesConsumer) { + return sendMessage( + name, messageCombiner, messageSupplier, + VertexSuppliers.<I, V, E>vertexNeighborsSupplier(), + messagesConsumer); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java new file mode 100644 index 0000000..b606a34 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.Function; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.FunctionWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Utility class for creating sequences of sending replies to received + * messages. Current instance of this object represents partial chain, + * where we have specified which messages will be send at the lastly defined + * link in the chain thus far, but we haven't specified yet what to do when + * vertices receive them. + * + * Contains set of: + * - static startX methods, used to create the chain + * - thenX methods, used to add one more Piece to the chain, can be + * "chained" arbitrary number of times. + * - endX methods, used to finish the chain, returning + * the Block representing the whole chain + * + * If messageSupplier or targetsSupplier returns null, current vertex + * is not going to send any messages. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <P> Previous value + */ +public class SendMessageChain<I extends WritableComparable, V extends Writable, + E extends Writable, P> { + /** + * Represent current partial chain. Given a way to consume messages + * received in lastly defined link in this chain, it will produce block + * representing a chain created thus far. + */ + private final Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator; + + private SendMessageChain( + Function<ConsumerWithVertex<I, V, E, P>, Block> blockCreator) { + this.blockCreator = blockCreator; + } + + /** + * Start chain with sending message provided by messageSupplier to all + * targets provided by targetsSupplier. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + SendMessageChain<I, V, E, Iterable<M>> startSend( + final String name, + final Class<M> messageClass, + final SupplierFromVertex<I, V, E, M> messageSupplier, + final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) { + return new SendMessageChain<>( + new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() { + @Override + public Block apply( + ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) { + return Pieces.sendMessage( + name, messageClass, messageSupplier, + targetsSupplier, messagesConsumer); + } + }); + } + + /** + * Start chain with sending message provided by messageSupplier to all + * targets provided by targetsSupplier, and use given messageCombiner to + * combine messages together. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + SendMessageChain<I, V, E, M> startSend( + final String name, + final MessageCombiner<? super I, M> messageCombiner, + final SupplierFromVertex<I, V, E, M> messageSupplier, + final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) { + return new SendMessageChain<>( + new Function<ConsumerWithVertex<I, V, E, M>, Block>() { + @Override + public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) { + return Pieces.sendMessage( + name, messageCombiner, messageSupplier, + targetsSupplier, messagesConsumer); + } + }); + } + + /** + * Start chain with sending message provided by messageSupplier to all + * neighbors of a current vertex. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + SendMessageChain<I, V, E, Iterable<M>> startSendToNeighbors( + final String name, + final Class<M> messageClass, + final SupplierFromVertex<I, V, E, M> messageSupplier) { + return startSend(name, messageClass, messageSupplier, + VertexSuppliers.<I, V, E>vertexNeighborsSupplier()); + } + + /** + * Start chain with sending message provided by messageSupplier to all + * neighbors of a current vertex, and use given messageCombiner to + * combine messages together. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> + SendMessageChain<I, V, E, M> startSendToNeighbors( + final String name, + final MessageCombiner<? super I, M> messageCombiner, + final SupplierFromVertex<I, V, E, M> messageSupplier) { + return startSend(name, messageCombiner, messageSupplier, + VertexSuppliers.<I, V, E>vertexNeighborsSupplier()); + } + + /** + * Give previously received message(s) to messageSupplier, and send message + * it returns to all targets provided by targetsSupplier. + */ + public <M extends Writable> + SendMessageChain<I, V, E, Iterable<M>> thenSend( + final String name, + final Class<M> messageClass, + final FunctionWithVertex<I, V, E, P, M> messageSupplier, + final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) { + final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>(); + + return new SendMessageChain<>( + new Function<ConsumerWithVertex<I, V, E, Iterable<M>>, Block>() { + @Override + public Block apply( + ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) { + return new SequenceBlock( + blockCreator.apply( + prevMessagesTransfer.<I, V, E>castToConsumer()), + Pieces.sendMessage( + name, messageClass, + new SupplierFromVertex<I, V, E, M>() { + @Override + public M get(Vertex<I, V, E> vertex) { + return messageSupplier.apply( + vertex, prevMessagesTransfer.get()); + } + }, + targetsSupplier, messagesConsumer)); + } + }); + } + + /** + * Give previously received message(s) to messageSupplier, and send message + * it returns to all neighbors of current vertex. + */ + public <M extends Writable> + SendMessageChain<I, V, E, Iterable<M>> thenSendToNeighbors( + final String name, + final Class<M> messageClass, + final FunctionWithVertex<I, V, E, P, M> messageSupplier) { + return thenSend(name, messageClass, messageSupplier, + VertexSuppliers.<I, V, E>vertexNeighborsSupplier()); + } + + /** + * Give previously received message(s) to messageSupplier, and send message + * it returns to all targets provided by targetsSupplier, and use given + * messageCombiner to combine messages together. + */ + public <M extends Writable> + SendMessageChain<I, V, E, M> thenSend( + final String name, + final MessageCombiner<? super I, M> messageCombiner, + final FunctionWithVertex<I, V, E, P, M> messageSupplier, + final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier) { + final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>(); + + return new SendMessageChain<>( + new Function<ConsumerWithVertex<I, V, E, M>, Block>() { + @Override + public Block apply(ConsumerWithVertex<I, V, E, M> messagesConsumer) { + return new SequenceBlock( + blockCreator.apply( + prevMessagesTransfer.<I, V, E>castToConsumer()), + Pieces.sendMessage( + name, messageCombiner, + new SupplierFromVertex<I, V, E, M>() { + @Override + public M get(Vertex<I, V, E> vertex) { + return messageSupplier.apply( + vertex, prevMessagesTransfer.get()); + } + }, + targetsSupplier, messagesConsumer)); + } + }); + } + + /** + * Give previously received message(s) to messageSupplier, and send message + * it returns to all neighbors of current vertex, and use given + * messageCombiner to combine messages together. + */ + public <M extends Writable> + SendMessageChain<I, V, E, M> thenSendToNeighbors( + final String name, + final MessageCombiner<? super I, M> messageCombiner, + final FunctionWithVertex<I, V, E, P, M> messageSupplier) { + return thenSend(name, messageCombiner, messageSupplier, + VertexSuppliers.<I, V, E>vertexNeighborsSupplier()); + } + + /** + * End chain by giving received messages to valueSupplier, + * to produce value that should be reduced, and consumed on master + * by reducedValueConsumer. + */ + public <S, R extends Writable> + Block endReduce(String name, ReduceOperation<S, R> reduceOp, + final FunctionWithVertex<I, V, E, P, S> valueSupplier, + Consumer<R> reducedValueConsumer) { + final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>(); + + return new SequenceBlock( + blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()), + Pieces.reduce( + name, + reduceOp, + new SupplierFromVertex<I, V, E, S>() { + @Override + public S get(Vertex<I, V, E> vertex) { + return valueSupplier.apply(vertex, prevMessagesTransfer.get()); + } + }, + reducedValueConsumer)); + } + + /** + * End chain by processing messages received within the last link + * in the chain. + */ + public Block endConsume(ConsumerWithVertex<I, V, E, P> messagesConsumer) { + return blockCreator.apply(messagesConsumer); + } + + /** + * End chain by providing a function that will produce Block to be attached + * to the end of current chain, given a supplier of messages received + * within the last link in the chain. + */ + public Block endCustom( + Function<SupplierFromVertex<I, V, E, P>, Block> createBlockToAttach) { + final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>(); + return new SequenceBlock( + blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()), + createBlockToAttach.apply( + prevMessagesTransfer.<I, V, E>castToSupplier())); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java new file mode 100644 index 0000000..321ed3a --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/VertexSuppliers.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Iterator; + +import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.function.PairPredicate; +import org.apache.giraph.function.Predicate; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.UnmodifiableIterator; + +/** + * SupplierFromVertex that extract common information from + * vertex itself. + */ +@SuppressWarnings("rawtypes") +public class VertexSuppliers { + /** Hide constructor */ + private VertexSuppliers() { } + + /** + * Supplier which extracts and returns vertex ID. + * (note - do not modify the object, as it is not returning a copy) + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, I> vertexIdSupplier() { + return new SupplierFromVertex<I, V, E, I>() { + @Override + public I get(Vertex<I, V, E> vertex) { + return vertex.getId(); + } + }; + } + + /** + * Supplier which extracts and returns vertex value. + * (note - doesn't create a copy of vertex value) + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, V> vertexValueSupplier() { + return new SupplierFromVertex<I, V, E, V>() { + @Override + public V get(Vertex<I, V, E> vertex) { + return vertex.getValue(); + } + }; + } + + /** + * Supplier which extracts and returns edges object. + * (note - doesn't create a copy of vertex value) + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, Iterable<Edge<I, E>>> vertexEdgesSupplier() { + return new SupplierFromVertex<I, V, E, Iterable<Edge<I, E>>>() { + @Override + public Iterable<Edge<I, E>> get(Vertex<I, V, E> vertex) { + return vertex.getEdges(); + } + }; + } + + /** + * Supplier which extracts and returns Iterator over all neighbor IDs. + * Note - iterator returns reused object, so you need to "use" them, + * before calling next() again. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplier() { + return new SupplierFromVertex<I, V, E, Iterator<I>>() { + @Override + public Iterator<I> get(final Vertex<I, V, E> vertex) { + return new TargetVertexIdIterator<>(vertex); + } + }; + } + + /** + * Supplier which extracts and returns Iterator over neighbor IDs + * that return true for given predicate. + * Note - iterator returns reused object, so you need to "use" them, + * before calling next() again. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplier( + final Predicate<I> toSupply) { + return new SupplierFromVertex<I, V, E, Iterator<I>>() { + @Override + public Iterator<I> get(final Vertex<I, V, E> vertex) { + return Iterators.filter( + new TargetVertexIdIterator<>(vertex), + new com.google.common.base.Predicate<I>() { + @Override + public boolean apply(I input) { + return toSupply.apply(input); + } + }); + } + }; + } + + /** + * Supplier which gives Iterator over neighbor IDs that return true for given + * predicate over (index, target) + * Note - iterator returns reused object, so you need to "use" them, + * before calling next() again. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, Iterator<I>> vertexNeighborsSupplierWithIndex( + final PairPredicate<IntRef, I> toSupply) { + return new SupplierFromVertex<I, V, E, Iterator<I>>() { + @Override + public Iterator<I> get(final Vertex<I, V, E> vertex) { + // Every time we return an iterator, we return with a fresh (0) index. + return filterWithIndex( + new TargetVertexIdIterator<>(vertex), toSupply); + } + }; + } + + /** + * Returns the elements of {@code unfiltered} that satisfy a + * predicate over (index, t). + */ + private static <T> UnmodifiableIterator<T> filterWithIndex( + final Iterator<T> unfiltered, final PairPredicate<IntRef, T> predicate) { + checkNotNull(unfiltered); + checkNotNull(predicate); + return new AbstractIterator<T>() { + private final IntRef index = new IntRef(0); + @Override protected T computeNext() { + while (unfiltered.hasNext()) { + T element = unfiltered.next(); + boolean res = predicate.apply(index, element); + index.value++; + if (res) { + return element; + } + } + return endOfData(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java new file mode 100644 index 0000000..e8bf569 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessagePiece.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.internal; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.striping.StripingUtils; +import org.apache.giraph.function.Function; +import org.apache.giraph.function.Predicate; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +/** + * Piece that sends a message provided through messageProducer to given set of + * neighbors, and passes them to messagesConsumer. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message type + */ +@SuppressWarnings("rawtypes") +public class SendMessagePiece<I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> extends Piece<I, V, E, M, Object> { + private final String name; + private final Class<M> messageClass; + private final SupplierFromVertex<I, V, E, M> messageSupplier; + private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier; + private final ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer; + + public SendMessagePiece(String name, + Class<M> messageClass, + SupplierFromVertex<I, V, E, M> messageSupplier, + SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier, + ConsumerWithVertex<I, V, E, Iterable<M>> messagesConsumer) { + Preconditions.checkNotNull(messageClass); + this.name = name; + this.messageClass = messageClass; + this.messageSupplier = messageSupplier; + this.targetsSupplier = targetsSupplier; + this.messagesConsumer = messagesConsumer; + } + + /** + * Stripe message sending computation across multiple stripes, in + * each stripe only part of the vertices will receive messages. + * + * @param stripes Number of stripes + * @param stripeSupplier Stripe supplier function, if IDs are Longs, you can + * use StripingUtils::fastHashStripingPredicate + * @return Resulting block + */ + public Block stripeByReceiver( + int stripes, + Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) { + return StripingUtils.generateStripedBlock( + stripes, + new Function<Predicate<I>, Block>() { + @Override + public Block apply(final Predicate<I> stripePredicate) { + return FilteringPiece.createReceiveFiltering( + new SupplierFromVertex<I, V, E, Boolean>() { + @Override + public Boolean get(Vertex<I, V, E> vertex) { + return stripePredicate.apply(vertex.getId()); + } + }, + new SendMessagePiece<>( + name, + messageClass, + messageSupplier, + new SupplierFromVertex<I, V, E, Iterator<I>>() { + @Override + public Iterator<I> get(Vertex<I, V, E> vertex) { + return Iterators.filter( + targetsSupplier.get(vertex), + new com.google.common.base.Predicate<I>() { + @Override + public boolean apply(I targetId) { + return stripePredicate.apply(targetId); + } + }); + } + }, + messagesConsumer)); + } + }, + stripeSupplier); + } + + + @Override + public VertexSender<I, V, E> getVertexSender( + final BlockWorkerSendApi<I, V, E, M> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, E> vertex) { + Iterator<I> targets = targetsSupplier.get(vertex); + M message = messageSupplier.get(vertex); + if (message != null && targets != null && targets.hasNext()) { + workerApi.sendMessageToMultipleEdges(targets, message); + } + } + }; + } + + @Override + public VertexReceiver<I, V, E, M> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, + Object executionStage) { + return new InnerVertexReceiver() { + @Override + public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) { + messagesConsumer.apply(vertex, messages); + } + }; + } + + @Override + public Class<M> getMessageClass() { + return messageClass; + } + + @Override + public String toString() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java new file mode 100644 index 0000000..c44ef8d --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/SendMessageWithCombinerPiece.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.internal; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.striping.StripingUtils; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.function.Function; +import org.apache.giraph.function.Predicate; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +/** + * Piece that sends a message provided through messageProducer to given set of + * neighbors, uses a message combiner and passes them to messagesConsumer. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message type + */ +public class SendMessageWithCombinerPiece<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends Piece<I, V, E, M, Object> { + private final String name; + private final MessageCombiner<? super I, M> messageCombiner; + private final SupplierFromVertex<I, V, E, M> messageSupplier; + private final SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier; + private final ConsumerWithVertex<I, V, E, M> messagesConsumer; + + public SendMessageWithCombinerPiece(String name, + MessageCombiner<? super I, M> messageCombiner, + SupplierFromVertex<I, V, E, M> messageSupplier, + SupplierFromVertex<I, V, E, Iterator<I>> targetsSupplier, + ConsumerWithVertex<I, V, E, M> messagesConsumer) { + Preconditions.checkNotNull(messageCombiner); + this.name = name; + this.messageCombiner = messageCombiner; + this.messageSupplier = messageSupplier; + this.targetsSupplier = targetsSupplier; + this.messagesConsumer = messagesConsumer; + } + + /** + * Stripe message sending computation across multiple stripes, in + * each stripe only part of the vertices will receive messages. + * + * @param stripes Number of stripes + * @param stripeSupplier Stripe supplier function, if IDs are Longs, you can + * use StripingUtils::fastHashStripingPredicate + * @return Resulting block + */ + public Block stripeByReceiver( + int stripes, + Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) { + return StripingUtils.generateStripedBlock( + stripes, + new Function<Predicate<I>, Block>() { + @Override + public Block apply(final Predicate<I> stripePredicate) { + return FilteringPiece.createReceiveFiltering( + new SupplierFromVertex<I, V, E, Boolean>() { + @Override + public Boolean get(Vertex<I, V, E> vertex) { + return stripePredicate.apply(vertex.getId()); + } + }, + new SendMessageWithCombinerPiece<>( + name, + messageCombiner, + messageSupplier, + new SupplierFromVertex<I, V, E, Iterator<I>>() { + @Override + public Iterator<I> get(Vertex<I, V, E> vertex) { + return Iterators.filter( + targetsSupplier.get(vertex), + new com.google.common.base.Predicate<I>() { + @Override + public boolean apply(I targetId) { + return stripePredicate.apply(targetId); + } + }); + } + }, + messagesConsumer)); + } + }, + stripeSupplier); + } + + @Override + public VertexSender<I, V, E> getVertexSender( + final BlockWorkerSendApi<I, V, E, M> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, E> vertex) { + Iterator<I> targets = targetsSupplier.get(vertex); + M message = messageSupplier.get(vertex); + if (message != null && targets != null && targets.hasNext()) { + workerApi.sendMessageToMultipleEdges(targets, message); + } + } + }; + } + + @Override + public VertexReceiver<I, V, E, M> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, + Object executionStage) { + return new InnerVertexReceiver() { + @Override + public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) { + Iterator<M> iter = messages.iterator(); + M combinedMessage = null; + if (iter.hasNext()) { + combinedMessage = iter.next(); + // When message combiner is used, there is never more then one message + Preconditions.checkArgument(!iter.hasNext()); + } + messagesConsumer.apply(vertex, combinedMessage); + } + }; + } + + @Override + public MessageCombiner<? super I, M> getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return messageCombiner; + } + + @Override + public String toString() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java new file mode 100644 index 0000000..6e8b3eb --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/internal/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Internal implementation of Pieces needed for Pieces utility class. + */ +package org.apache.giraph.block_app.library.internal; http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java new file mode 100644 index 0000000..70e65da --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationCounterPiece.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.iteration; + +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Piece that increments execution stage iteration. + */ +@SuppressWarnings("rawtypes") +public class IterationCounterPiece extends Piece<WritableComparable, + Writable, Writable, Writable, IterationStage> { + @Override + public IterationStage nextExecutionStage(IterationStage executionStage) { + return executionStage.changedIteration(executionStage.getIteration() + 1); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java new file mode 100644 index 0000000..ccac323 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStage.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.iteration; + + +/** + * Execution stage that contains iteration information. + * Iteration can be incremented via IterationCounterPiece. + */ +public interface IterationStage { + int getIteration(); + IterationStage changedIteration(int iteration); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java new file mode 100644 index 0000000..2f94b25 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/IterationStageImpl.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.iteration; + +/** + * Implementation of IterationStage + */ +public class IterationStageImpl implements IterationStage { + private final int iteration; + + public IterationStageImpl() { + this.iteration = 0; + } + + public IterationStageImpl(int iteration) { + this.iteration = iteration; + } + + @Override + public int getIteration() { + return iteration; + } + + @Override + public IterationStage changedIteration(int iteration) { + return new IterationStageImpl(iteration); + } + + @Override + public String toString() { + return "IterationStage[" + iteration + "]"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java new file mode 100644 index 0000000..c33a18f --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/iteration/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities for having iteration within execution stage object. + */ +package org.apache.giraph.block_app.library.iteration; http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java new file mode 100644 index 0000000..801150c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Core library of Pieces and Suppliers, providing most common usages. + */ +package org.apache.giraph.block_app.library; http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java new file mode 100644 index 0000000..34e5fec --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/StripingUtils.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.striping; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.FilteringBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.function.Function; +import org.apache.giraph.function.Predicate; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.primitive.Obj2IntFunction; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +/** + * Utility functions for doing superstep striping. + * + * We need to make sure that partitioning (which uses mod for distributing + * data across workers) is independent from striping itself. So we are using + * fastHash function below, taken from https://code.google.com/p/fast-hash/. + */ +public class StripingUtils { + private StripingUtils() { } + + /* The MIT License + + Copyright (C) 2012 Zilong Tan ([email protected]) + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation + files (the "Software"), to deal in the Software without + restriction, including without limitation the rights to use, copy, + modify, merge, publish, distribute, sublicense, and/or sell copies + of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + */ + /** + * Returns 32-bit hash of a given value. + * + * Fast and generally good hashing function, adapted from C++ implementation: + * https://code.google.com/p/fast-hash/ + */ + public static int fastHash(long h) { + h ^= h >> 23; + h *= 0x2127599bf4325c37L; + h ^= h >> 47; + return ((int) (h - (h >> 32))) & 0x7fffffff; + } + + /** + * Returns number in [0, stripes) range, from given input {@code value}. + */ + public static int fastStripe(long value, int stripes) { + return fastHash(value) % stripes; + } + + /** + * Fast hash-based striping for LongWritable IDs, returns a function + * that for a given ID returns it's stripe index. + */ + public static + Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) { + return new Obj2IntFunction<LongWritable>() { + @Override + public int apply(LongWritable id) { + return fastStripe(id.get(), stripes); + } + }; + } + + /** + * Fast hash-based striping for LongWritable IDs, returns a function + * that for a given stripe index returns a predicate checking whether ID is + * in that stripe. + */ + public static + Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate( + final int stripes) { + return new Int2ObjFunction<Predicate<LongWritable>>() { + @Override + public Predicate<LongWritable> apply(final int stripe) { + return new Predicate<LongWritable>() { + @Override + public boolean apply(LongWritable id) { + return fastStripe(id.get(), stripes) == stripe; + } + }; + } + }; + } + + /** + * Generate striped block, with given number of {@code stripes}, + * using given {@code blockGenerator} to generate block for each stripe. + * + * @param stripes Number of stripes + * @param blockGenerator Function given predicate representing whether + * ID is in current stripe, should return Block + * for current stripe + * @return Resulting block + */ + public static Block generateStripedBlock( + int stripes, + Function<Predicate<LongWritable>, Block> blockGenerator) { + return generateStripedBlockImpl( + stripes, blockGenerator, + StripingUtils.fastHashStripingPredicate(stripes)); + } + + /** + * Generate striped block, with given number of {@code stripes}, + * using given {@code blockGenerator} to generate block for each stripe, + * and using striping based on given {@code stripeSupplier}. + * + * @param stripes Number of stripes + * @param blockGenerator Function given predicate representing whether + * ID is in current stripe, should return Block + * for current stripe + * @param stripeSupplier Function given number of stripes, + * generates a function that given stripe index, + * returns predicate checking whether ID is in that + * stripe. + * @return Resulting block + */ + public static <I extends WritableComparable> + Block generateStripedBlock( + int stripes, + Function<Predicate<I>, Block> blockGenerator, + Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) { + return generateStripedBlockImpl( + stripes, blockGenerator, stripeSupplier.apply(stripes)); + } + + /** + * Stripe given block, by calling vertexSend only in it's corresponding + * stripe. All other methods are called number of stripes times. + * + * @param stripes Number of stripes + * @param block Block to stripe + * @return Resulting block + */ + public static Block stripeBlockBySenders( + int stripes, + Block block) { + return generateStripedBlockImpl( + stripes, + StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block), + StripingUtils.fastHashStripingPredicate(stripes)); + } + + /** + * Given a block, creates a function that will given a predicate filter + * calls to vertexSend function based on that predicate. + * + * Useful to be combined with generateStripedBlock to stripe blocks. + */ + public static <I extends WritableComparable> Function<Predicate<I>, Block> + createSingleStripeBySendersFunction(final Block block) { + return new Function<Predicate<I>, Block>() { + @Override + public Block apply(final Predicate<I> stripePredicate) { + return FilteringBlock.createSendFiltering( + new SupplierFromVertex<I, Writable, Writable, Boolean>() { + @Override + public Boolean get(Vertex<I, Writable, Writable> vertex) { + return stripePredicate.apply(vertex.getId()); + } + }, block); + } + }; + } + + private static <I extends WritableComparable> + Block generateStripedBlockImpl( + int stripes, + Function<Predicate<I>, Block> blockGenerator, + Int2ObjFunction<Predicate<I>> stripeSupplier) { + Preconditions.checkArgument(stripes >= 1); + if (stripes == 1) { + return blockGenerator.apply(new Predicate<I>() { + @Override + public boolean apply(I input) { + return true; + } + }); + } + Block[] blocks = new Block[stripes]; + for (int i = 0; i < stripes; i++) { + blocks[i] = blockGenerator.apply(stripeSupplier.apply(i)); + } + return new SequenceBlock(blocks); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java new file mode 100644 index 0000000..6a313a7 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/striping/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities for striping. + */ +package org.apache.giraph.block_app.library.striping; http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java new file mode 100644 index 0000000..6d85e93 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairPredicate.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + +/** + * Function: + * (T) -> boolean + * + * @param <T1> First argument type + * @param <T2> Second argument type + */ +public interface PairPredicate<T1, T2> extends Serializable { + /** + * Returns the result of applying this predicate to + * {@code input1} and {@code input2}. + */ + boolean apply(T1 input1, T2 input2); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java new file mode 100644 index 0000000..c515ca0 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Predicate.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + +/** + * Function: + * (T) -> boolean + * <br> + * Specialization of com.google.common.base.Predicate, that is also + * Serializable. + * + * @param <T> Argument type + */ +public interface Predicate<T> extends Serializable { + /** + * Returns the result of applying this predicate to {@code input}. + */ + boolean apply(T input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java new file mode 100644 index 0000000..d082072 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Int2ObjFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + +/** + * Primitive specialization of Function: + * (int) -> T + * + * @param <T> Result type + */ +public interface Int2ObjFunction<T> extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + * + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + T apply(int input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a1a236fa/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java new file mode 100644 index 0000000..6d3d739 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Obj2IntFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + +/** + * Primitive specialization of Function: + * (F) -> int + * + * @param <T> Argument type + */ +public interface Obj2IntFunction<T> extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + */ + int apply(T input); +}
