[GIRAPH-1013] Adding more libraries, algos and examples Summary: Adding more libraries, algos and examples
Only changes from our internal state: New classes: PairReduce MaxMessageCombiner PartitioningStats TestMessageChain Change to: Pieces SendMessageChain Test Plan: mvn clean install -Phadoop_facebook Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov Reviewed By: sergey.edunov Differential Revision: https://reviews.facebook.net/D40935 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/3b7c68e5 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/3b7c68e5 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/3b7c68e5 Branch: refs/heads/trunk Commit: 3b7c68e54a76d02ae721161b1b81c15c2e22e44a Parents: d7e4bde Author: Igor Kabiljo <[email protected]> Authored: Mon Jun 29 23:20:58 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Thu Jul 2 16:32:41 2015 -0700 ---------------------------------------------------------------------- .../library/algo/BreadthFirstSearch.java | 202 +++++++++++ .../library/algo/DistributedIndependentSet.java | 337 ++++++++++++++++++ .../block_app/library/algo/package-info.java | 21 ++ .../library/coarsening/CoarseningUtils.java | 347 +++++++++++++++++++ .../library/coarsening/package-info.java | 21 ++ .../library/stats/DirectedGraphStats.java | 187 ++++++++++ .../library/stats/PartitioningStats.java | 125 +++++++ .../block_app/library/stats/package-info.java | 21 ++ .../AbstractPageRankExampleBlockFactory.java | 54 +++ .../pagerank/PageRankExampleBlockFactory.java | 47 +++ ...eRankWithConvergenceExampleBlockFactory.java | 70 ++++ ...PiecesAndConvergenceExampleBlockFactory.java | 146 ++++++++ .../PageRankWithPiecesExampleBlockFactory.java | 83 +++++ ...ansferAndConvergenceExampleBlockFactory.java | 74 ++++ .../examples/pagerank/TestPageRankExample.java | 127 +++++++ .../block_app/library/TestMessageChain.java | 224 ++++++++++++ .../algo/BreadthFirstSearchBlockFactory.java | 79 +++++ .../algo/BreadthFirstSearchVertexValue.java | 61 ++++ .../DistributedIndependentSetBlockFactory.java | 60 ++++ .../DistributedIndependentSetVertexValue.java | 56 +++ .../library/algo/TestBreadthFirstSearch.java | 165 +++++++++ .../algo/TestDistributedIndependentSet.java | 220 ++++++++++++ .../library/coarsening/TestCoarseningUtils.java | 133 +++++++ .../kryo/KryoWritableWrapperJava8Test.java | 174 ++++++++++ .../framework/api/local/LocalBlockRunner.java | 2 + .../apache/giraph/block_app/library/Pieces.java | 33 +- .../block_app/library/SendMessageChain.java | 72 +++- .../block_app/library/gc/WorkerGCPiece.java | 42 +++ .../block_app/library/gc/package-info.java | 21 ++ .../function/primitive/Double2ObjFunction.java | 36 ++ .../function/primitive/DoubleConsumer.java | 32 ++ .../giraph/combiner/MaxMessageCombiner.java | 74 ++++ .../apache/giraph/reducers/impl/PairReduce.java | 104 ++++++ 33 files changed, 3433 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java new file mode 100644 index 0000000..fe290fb --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearch.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Iterators; + +/** + * Class for running breadth-first search on the graph. + * + * Graph is expected to be symmetric before calling any of the methods here. + */ +public class BreadthFirstSearch { + private static final Logger LOG = Logger.getLogger(BreadthFirstSearch.class); + private static final IntWritable NOT_REACHABLE_VERTEX_VALUE = + new IntWritable(-1); + + private BreadthFirstSearch() { + } + + /** + * Default block, which calculates connected components using the vertex's + * default edges. + */ + public static <I extends WritableComparable, V extends Writable> + Block bfs( + SupplierFromVertex<I, V, Writable, Boolean> isVertexInSeedSet, + SupplierFromVertex<I, V, Writable, IntWritable> getDistance, + ConsumerWithVertex<I, V, Writable, IntWritable> setDistance + ) { + ObjectTransfer<Boolean> converged = new ObjectTransfer<>(); + ObjectTransfer<Boolean> vertexUpdatedDistance = new ObjectTransfer<>(); + + return new SequenceBlock( + createInitializePiece( + vertexUpdatedDistance, + isVertexInSeedSet, + getDistance, + setDistance, + VertexSuppliers.vertexEdgesSupplier() + ), + RepeatUntilBlock.unlimited( + createPropagateConnectedComponentsPiece( + vertexUpdatedDistance, + vertexUpdatedDistance, + converged, + getDistance, + setDistance, + VertexSuppliers.vertexEdgesSupplier() + ), + converged + ) + ); + } + + /** + * Initialize vertex values for connected components calculation + */ + private static <I extends WritableComparable, V extends Writable> + Piece<I, V, Writable, NoMessage, Object> createInitializePiece( + Consumer<Boolean> vertexUpdatedDistance, + SupplierFromVertex<I, V, Writable, Boolean> isVertexInSeedSet, + SupplierFromVertex<I, V, Writable, IntWritable> getDistance, + ConsumerWithVertex<I, V, Writable, IntWritable> setDistance, + SupplierFromVertex<I, V, Writable, ? extends Iterable<? extends Edge<I, ?>>> + edgeSupplier + ) { + IntWritable zero = new IntWritable(0); + return Pieces.forAllVerticesOnReceive( + "InitializeBFS", + (vertex) -> { + if (isVertexInSeedSet.get(vertex)) { + setDistance.apply(vertex, zero); + vertexUpdatedDistance.apply(true); + } else { + setDistance.apply(vertex, NOT_REACHABLE_VERTEX_VALUE); + vertexUpdatedDistance.apply(false); + } + } + ); + } + + /** + * Propagate connected components to neighbor pieces + */ + private static <I extends WritableComparable, V extends Writable> + Block createPropagateConnectedComponentsPiece( + Supplier<Boolean> vertexToPropagate, + Consumer<Boolean> vertexUpdatedDistance, + Consumer<Boolean> converged, + SupplierFromVertex<I, V, Writable, IntWritable> getDistance, + ConsumerWithVertex<I, V, Writable, IntWritable> setDistance, + SupplierFromVertex<I, V, Writable, ? extends Iterable<? + extends Edge<I, ?>>> edgeSupplier) { + return new Piece<I, V, Writable, IntWritable, Object>() { + private ReducerHandle<IntWritable, IntWritable> propagatedAggregator; + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + propagatedAggregator = reduceApi.createLocalReducer(SumReduce.INT); + } + + @Override + public VertexSender<I, V, Writable> getVertexSender( + BlockWorkerSendApi<I, V, Writable, IntWritable> workerApi, + Object executionStage + ) { + return (vertex) -> { + if (vertexToPropagate.get()) { + workerApi.sendMessageToMultipleEdges( + Iterators.transform( + edgeSupplier.get(vertex).iterator(), + Edge::getTargetVertexId + ), + getDistance.get(vertex) + ); + reduceInt(propagatedAggregator, 1); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + converged.apply( + propagatedAggregator.getReducedValue(master).get() == 0); + LOG.info("BFS: " + propagatedAggregator.getReducedValue(master).get() + + " many vertices sent in this iteration"); + } + + @Override + public VertexReceiver<I, V, Writable, IntWritable> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, + Object executionStage + ) { + IntWritable next = new IntWritable(); + return (vertex, messages) -> { + vertexUpdatedDistance.apply(false); + for (IntWritable receivedValue : messages) { + IntWritable currentValue = getDistance.get(vertex); + next.set(receivedValue.get() + 1); + if (currentValue.compareTo(NOT_REACHABLE_VERTEX_VALUE) == 0 || + currentValue.compareTo(next) > 0) { + setDistance.apply(vertex, next); + vertexUpdatedDistance.apply(true); + } + } + }; + } + + @Override + public Class<IntWritable> getMessageClass() { + return IntWritable.class; + } + + @Override + public String toString() { + return "PropagateConnectedComponentsPiece"; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java new file mode 100644 index 0000000..8beef59 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSet.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Iterables; + +/** + * Class for computing maximal independent sets of a graph. + * + * Graph is expected to be symmetric before calling methods here. + */ +public class DistributedIndependentSet { + private static final Logger LOG = + Logger.getLogger(DistributedIndependentSet.class); + private static final IntWritable UNKNOWN = new IntWritable(-1); + private static final IntWritable NOT_IN_SET = new IntWritable(-2); + private static final IntWritable IN_SET = new IntWritable(-3); + + private DistributedIndependentSet() { + } + + /** + * Default block which decomposes the input graph into independent sets of + * vertices. An independent set of vertices is defined as a set of vertices + * that do not have edge to each other, i.e. the sub-graph induced by an + * independent set of vertices is an empty graph. + * + * The algorithm finds independent sets as follows: + * 1) Find the maximal independent set of vertices amongst unassigned + * vertices. + * 2) Assign found vertices to a new independent set. + * 3) If all vertices of the graph are assigned, go to step 1. Otherwise + * terminate. + * + * @param numberClass Independent set id type + * @param chooseNumber Process that returns a deterministic id of type + * <code>numberClass</code> for each vertex + * @param getIndependentSet Getter for independent set id of each vertex + * @param setIndependentSet Setter for independent set id of each vertex + */ + public static + <I extends WritableComparable, V extends Writable, + N extends WritableComparable> + Block independentSets( + Class<N> numberClass, + SupplierFromVertex<I, V, Writable, N> chooseNumber, + SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet, + ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet + ) { + ObjectTransfer<Boolean> done = new ObjectTransfer<>(); + IntRef iteration = new IntRef(-1); + + return new SequenceBlock( + Pieces.<I, V, Writable>forAllVerticesOnReceive( + "InitializeIndependentSet", + (vertex) -> setIndependentSet.apply(vertex, UNKNOWN)), + RepeatUntilBlock.unlimited( + // find maximal independent sets amongst remaining un-assigned + // vertices, one after another, until all vertices of the graph + // are assigned to independent sets. + findMaximalIndependentSet( + numberClass, + chooseNumber, + getIndependentSet, + setIndependentSet, + iteration, done + ), + done + ) + ); + } + + /** + * Independent set calculation with vertex ids as messages to choose the + * vertices in a set. + */ + public static <I extends WritableComparable, V extends Writable> + Block independentSets( + Class<I> vertexIdClass, + SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet, + ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet + ) { + return independentSets( + vertexIdClass, + VertexSuppliers.vertexIdSupplier(), + getIndependentSet, + setIndependentSet + ); + } + + /** + * Finds a maximal independent set, amongst un-assigned vertices of the + * graph. The algorithm is as follows: + * 1) Normalize the state of all un-assigned vertices of the graph to + * UNKNOWN. + * 2) Each UNKNOWN vertex sends a number (output of <code>chooseNumber + * </code>) to all its neighbors. + * 3) Each UNKNOWN vertex finds the maximum value of all incoming + * messages. If the max value is less than the chosen number for the + * vertex (output of <code>chooseNumber</code> in step 2), the vertex + * assigns itself to the independent set (changes its state to IN_SET), + * and sends 'ack' messages to all its neighbors. + * 4) Each UNKNOWN vertex that receives an 'ack' message, changes its + * state to NOT_IN_SET. + * 5) If there are any UNKNOWN vertex, go to step 2. Otherwise, we found + * the maximal independent set, and terminate. + */ + private static + <I extends WritableComparable, V extends Writable, + N extends WritableComparable> + Block findMaximalIndependentSet( + Class<N> numberClass, + SupplierFromVertex<I, V, Writable, N> chooseNumber, + SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet, + ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet, + IntRef iteration, ObjectTransfer<Boolean> done + ) { + ObjectTransfer<Boolean> foundMIS = new ObjectTransfer<>(); + return new SequenceBlock( + createInitializePiece(getIndependentSet, setIndependentSet, iteration), + RepeatUntilBlock.unlimited( + new SequenceBlock( + // Announce the number to all the neighbors. + createAnnounceBlock( + numberClass, + chooseNumber, + getIndependentSet, + setIndependentSet + ), + // Select the vertices in the independent set, and refine the + // state of neighboring vertices so not to consider them for + // this independent set. + createSelectAndRefinePiece( + getIndependentSet, + setIndependentSet, + iteration, + foundMIS, + done + ) + ), + foundMIS + ) + ); + } + + /** + * Piece to initialize vertex values so they are either assigned to a + * previously found independent set, or UNKNOWN to be considered for the + * discovery of the current independent set. + */ + private static <I extends WritableComparable, V extends Writable> + Block createInitializePiece( + SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet, + ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet, + IntRef iteration) { + return new Piece<I, V, Writable, NoMessage, Object>() { + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + iteration.value++; + LOG.info("Start finding independent set with ID " + iteration.value); + } + + @Override + public VertexReceiver<I, V, Writable, NoMessage> getVertexReceiver( + final BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return (vertex, messages) -> { + if (getIndependentSet.get(vertex).equals(NOT_IN_SET)) { + setIndependentSet.apply(vertex, UNKNOWN); + } + }; + } + + @Override + public String toString() { + return "InitializePiece"; + } + }; + } + + /** + * A Block that assigns some of the UNKNOWN vertices to the new independent + * set based on numbers each vertex broadcast to all its neighbors. + */ + private static + <I extends WritableComparable, V extends Writable, + N extends WritableComparable> + Block createAnnounceBlock( + Class<N> numberClass, + SupplierFromVertex<I, V, Writable, N> chooseNumber, + SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet, + ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet) { + return Pieces.<I, V, Writable, N>sendMessageToNeighbors( + "AnnounceNumber", + numberClass, + (vertex) -> { + if (getIndependentSet.get(vertex).equals(UNKNOWN)) { + return chooseNumber.get(vertex); + } else { + return null; + } + }, + (vertex, messages) -> { + if (getIndependentSet.get(vertex).equals(UNKNOWN)) { + N curVal = chooseNumber.get(vertex); + boolean myValIsMax = true; + for (N receivedValue : messages) { + if (receivedValue.compareTo(curVal) > 0) { + myValIsMax = false; + break; + } + } + if (myValIsMax) { + setIndependentSet.apply(vertex, IN_SET); + } + } + } + ); + } + + /** + * Piece to confirm selection of some vertices for the independent set. Also, + * changes the state of neighboring vertices of newly assigned vertices to + * NOT_IN_SET, so not to consider them for the discovery of the current + * independent set. + * + * @param foundMIS Specifies the end of discovery for current independent set. + * @param done Specifies the end of whole computation of decomposing to + * independent sets. + */ + private static <I extends WritableComparable, V extends Writable> + Block createSelectAndRefinePiece( + SupplierFromVertex<I, V, Writable, IntWritable> getIndependentSet, + ConsumerWithVertex<I, V, Writable, IntWritable> setIndependentSet, + IntRef iteration, + Consumer<Boolean> foundMIS, + Consumer<Boolean> done) { + return new Piece<I, V, Writable, BooleanWritable, Object>() { + private ReducerHandle<IntWritable, IntWritable> numVerticesUnknown; + private ReducerHandle<IntWritable, IntWritable> numVerticesNotAssigned; + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + numVerticesUnknown = reduceApi.createLocalReducer(SumReduce.INT); + numVerticesNotAssigned = reduceApi.createLocalReducer(SumReduce.INT); + } + + @Override + public VertexSender<I, V, Writable> getVertexSender( + final BlockWorkerSendApi<I, V, Writable, BooleanWritable> workerApi, + Object executionStage) { + BooleanWritable ack = new BooleanWritable(true); + IntWritable one = new IntWritable(1); + return (vertex) -> { + IntWritable vertexState = getIndependentSet.get(vertex); + if (vertexState.equals(IN_SET)) { + setIndependentSet.apply(vertex, new IntWritable(iteration.value)); + workerApi.sendMessageToAllEdges(vertex, ack); + } else if (vertexState.equals(UNKNOWN)) { + numVerticesUnknown.reduce(one); + numVerticesNotAssigned.reduce(one); + } else if (vertexState.equals(NOT_IN_SET)) { + numVerticesNotAssigned.reduce(one); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + done.apply(numVerticesNotAssigned.getReducedValue(master).get() == 0); + foundMIS.apply(numVerticesUnknown.getReducedValue(master).get() == 0); + } + + @Override + public VertexReceiver<I, V, Writable, BooleanWritable> getVertexReceiver( + final BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return (vertex, messages) -> { + if (getIndependentSet.get(vertex).equals(UNKNOWN) && + Iterables.size(messages) > 0) { + setIndependentSet.apply(vertex, NOT_IN_SET); + } + }; + } + + @Override + public Class<BooleanWritable> getMessageClass() { + return BooleanWritable.class; + } + + @Override + public String toString() { + return "SelectAndRefinePiece"; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java new file mode 100644 index 0000000..03a550a --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/algo/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utility algorithms + */ +package org.apache.giraph.block_app.library.algo; http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java new file mode 100644 index 0000000..c74a056 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/CoarseningUtils.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.coarsening; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.function.Function; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.WritableWriter; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.writable.tuple.PairWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Utilities class that creates a new coarsened graph, while keeping the + * original graph, allowing us to execute things on either representation + * of the graph. + */ +public class CoarseningUtils { + + private CoarseningUtils() { } + + //CHECKSTYLE: stop ParameterNumberCheck + + /** + * Create block that creates coarsened graph from given set of vertices + * (original vertices, that return true for originalNodesChecker). + * + * Old graph is not deleted, if needed, you can trivially do that by calling + * Pieces.removeVertices(originalNodesChecker) after this block. + * + * + * @param idTypeOps Type ops for vertex ids + * @param edgeTypeOps Type ops for edge values + * @param coarsenedVertexIdSupplier Supplier called on original nodes, + * that should return it's coarsened vertex + * id + * @param vertexInfoClass Part of the vertex value that is needed to create + * coarsened vertices + * @param vertexInfoSupplier Supplier providing part of the vertex value that + * is needed to create coarsened vertices + * @param vertexInfoCombiner Combiner that aggregates individual vertex info + * values into coarsened value + * @param coarsenedVertexValueInitializer Function that initializes coarsened + * vertex value, from an aggregated + * vertex info of original vertex + * values that are coarsened into this + * vertex + * @param edgeCoarseningCombiner Combiner that aggregates individual edge + * values into coarsened edge value + * @param originalNodesChecker Vertices with id that this gives true for will + * be coarsened + * @param clusterNodesChecker Should return true only for coarsened ids + * (ones returned by coarsenedVertexIdSupplier) + * @return Block that does coarsening + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable, + VI extends Writable> + Block createCustomCoarseningBlock( + PrimitiveIdTypeOps<I> idTypeOps, + TypeOps<E> edgeTypeOps, + SupplierFromVertex<I, V, E, I> coarsenedVertexIdSupplier, + Class<VI> vertexInfoClass, + SupplierFromVertex<I, V, E, VI> vertexInfoSupplier, + MessageCombiner<? super I, VI> vertexInfoCombiner, + ConsumerWithVertex<I, V, E, VI> coarsenedVertexValueInitializer, + MessageCombiner<? super I, E> edgeCoarseningCombiner, + Function<I, Boolean> originalNodesChecker, + Function<I, Boolean> clusterNodesChecker) { + + //CHECKSTYLE: resume ParameterNumberCheck + + ObjectTransfer<Iterable<PairWritable<I, E>>> edgesHolder = + new ObjectTransfer<>(); + + MessageValueFactory<PairWritable<I, E>> pairMessageFactory = + () -> new PairWritable<I, E>( + idTypeOps.create(), edgeTypeOps.create()); + Piece<I, V, E, PairWritable<I, E>, Object> calcWeightsOnEdgesPiece = + new Piece<I, V, E, PairWritable<I, E>, Object>() { + @Override + public VertexSender<I, V, E> getVertexSender( + BlockWorkerSendApi<I, V, E, PairWritable<I, E>> workerApi, + Object executionStage) { + PairWritable<I, E> message = pairMessageFactory.newInstance(); + return (vertex) -> { + idTypeOps.set( + message.getLeft(), coarsenedVertexIdSupplier.get(vertex)); + for (Edge<I, E> edge : vertex.getEdges()) { + edgeTypeOps.set(message.getRight(), edge.getValue()); + workerApi.sendMessage(edge.getTargetVertexId(), message); + } + }; + } + + @Override + public VertexReceiver<I, V, E, PairWritable<I, E>> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return (vertex, messages) -> { + edgesHolder.apply(messages); + }; + } + + @Override + protected MessageValueFactory<PairWritable<I, E>> getMessageFactory( + ImmutableClassesGiraphConfiguration conf) { + return pairMessageFactory; + } + + @Override + public String toString() { + return "CoarseningCalcWeightsOnEdges"; + } + }; + + WritableWriter<E> edgeValueWriter = new WritableWriter<E>() { + @Override + public void write(DataOutput out, E value) throws IOException { + value.write(out); + } + + @Override + public E readFields(DataInput in) throws IOException { + E edge = edgeTypeOps.create(); + edge.readFields(in); + return edge; + } + }; + + Piece<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>, Object> + createNewVerticesPiece = + new Piece<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>, Object>() { + @Override + public VertexSender<I, V, E> getVertexSender( + BlockWorkerSendApi<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>> + workerApi, + Object executionStage) { + return (vertex) -> { + Basic2ObjectMap<I, E> map = + idTypeOps.create2ObjectOpenHashMap(edgeValueWriter); + for (PairWritable<I, E> message : edgesHolder.get()) { + + E value = map.get(message.getLeft()); + if (value == null) { + value = edgeCoarseningCombiner.createInitialMessage(); + map.put(message.getLeft(), value); + } + + edgeCoarseningCombiner.combine( + message.getLeft(), value, message.getRight()); + } + + workerApi.sendMessage( + coarsenedVertexIdSupplier.get(vertex), + new PairWritable<>(vertexInfoSupplier.get(vertex), map)); + }; + } + + @Override + public + VertexReceiver<I, V, E, PairWritable<VI, Basic2ObjectMap<I, E>>> + getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return (vertex, messages) -> { + VI vertexInfo = vertexInfoCombiner.createInitialMessage(); + Basic2ObjectMap<I, E> map = idTypeOps.create2ObjectOpenHashMap(null); + for (PairWritable<VI, Basic2ObjectMap<I, E>> message : messages) { + vertexInfoCombiner.combine( + vertex.getId(), vertexInfo, message.getLeft()); + + + for (Iterator<I> iter = message.getRight().fastKeyIterator(); + iter.hasNext();) { + I key = iter.next(); + E value = map.get(key); + if (value == null) { + value = edgeCoarseningCombiner.createInitialMessage(); + map.put(key, value); + } + edgeCoarseningCombiner.combine( + vertex.getId(), value, message.getRight().get(key)); + } + } + + for (Iterator<I> iter = map.fastKeyIterator(); iter.hasNext();) { + I key = iter.next(); + Edge<I, E> edge = EdgeFactory.create( + idTypeOps.createCopy(key), map.get(key)); + vertex.addEdge(edge); + } + + coarsenedVertexValueInitializer.apply(vertex, vertexInfo); + }; + } + + @Override + protected MessageValueFactory<PairWritable<VI, Basic2ObjectMap<I, E>>> + getMessageFactory( + ImmutableClassesGiraphConfiguration conf) { + return () -> new PairWritable<>( + ReflectionUtils.newInstance(vertexInfoClass), + idTypeOps.create2ObjectOpenHashMap(edgeValueWriter)); + } + + @Override + public String toString() { + return "CoarseningCreateNewVertices"; + } + }; + + return new SequenceBlock( + new FilteringPiece<>( + (vertex) -> originalNodesChecker.apply(vertex.getId()), + calcWeightsOnEdgesPiece), + new FilteringPiece<>( + (vertex) -> originalNodesChecker.apply(vertex.getId()), + (vertex) -> clusterNodesChecker.apply(vertex.getId()), + createNewVerticesPiece)); + } + + /** + * Create block that creates coarsened graph from given set of vertices + * (original vertices, that return true for originalNodesChecker), when + * vertex and edge values are primitives (or TypeOps exist for them). + * Coarsening vertex values and edge values are computed as sum of their + * individual values. + * + * Old graph is not deleted, if needed, you can trivially do that by calling + * Pieces.removeVertices(originalNodesChecker) after this block. + * + * @param idTypeOps Vertex id TypeOps + * @param valueTypeOps Vertex value TypeOps + * @param edgeTypeOps Edge value TypeOps + * @param coarsenedVertexIdSupplier Supplier called on original nodes, + * that should return it's coarsened vertex + * id + * @param originalNodesChecker Vertices with id that this gives true for will + * be coarsened + * @param clusterNodesChecker Should return true only for coarsened ids + * (ones returned by coarsenedVertexIdSupplier) + * @return Block that does coarsening + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Block createCoarseningBlock( + PrimitiveIdTypeOps<I> idTypeOps, + NumericTypeOps<V> valueTypeOps, + NumericTypeOps<E> edgeTypeOps, + SupplierFromVertex<I, V, E, I> coarsenedVertexIdSupplier, + Function<I, Boolean> originalNodesChecker, + Function<I, Boolean> clusterNodesChecker) { + return CoarseningUtils.<I, V, E, V>createCustomCoarseningBlock( + idTypeOps, + edgeTypeOps, + coarsenedVertexIdSupplier, + valueTypeOps.getTypeClass(), + VertexSuppliers.vertexValueSupplier(), + new MessageCombiner<I, V>() { + @Override + public void combine( + I vertexIndex, V originalMessage, V messageToCombine) { + valueTypeOps.plusInto(originalMessage, messageToCombine); + } + + @Override + public V createInitialMessage() { + return valueTypeOps.createZero(); + } + }, + (vertex, value) -> valueTypeOps.set(vertex.getValue(), value), + new MessageCombiner<I, E>() { + @Override + public void combine( + I vertexIndex, E originalMessage, E messageToCombine) { + edgeTypeOps.plusInto(originalMessage, messageToCombine); + } + + @Override + public E createInitialMessage() { + return edgeTypeOps.createZero(); + } + }, + originalNodesChecker, + clusterNodesChecker); + } + + /** + * Uses configuration to figure out vertex id, value and edge value types, + * and calls above createCoarseningBlock with it, look at it's + * documentation for more details. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Block createCoarseningBlock( + ImmutableClassesGiraphConfiguration<I, V, E> conf, + SupplierFromVertex<I, V, E, I> coarsenedVertexIdSupplier, + Function<I, Boolean> originalNodesChecker, + Function<I, Boolean> clusterNodesChecker) { + return createCoarseningBlock( + TypeOpsUtils.getPrimitiveIdTypeOps(conf.getVertexIdClass()), + (NumericTypeOps<V>) + TypeOpsUtils.getTypeOps(conf.getVertexValueClass()), + (NumericTypeOps<E>) TypeOpsUtils.getTypeOps(conf.getEdgeValueClass()), + coarsenedVertexIdSupplier, originalNodesChecker, clusterNodesChecker); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java new file mode 100644 index 0000000..16693fa --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/coarsening/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Coarsening utils + */ +package org.apache.giraph.block_app.library.coarsening; http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java new file mode 100644 index 0000000..1739718 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/DirectedGraphStats.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.stats; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.block_app.reducers.map.BasicMapReduce; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.reducers.impl.MaxReduce; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.ops.IntTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Iterables; + +/** Utility class for calculating stats of a directed graph */ +public class DirectedGraphStats { + private static final Logger LOG = Logger.getLogger(DirectedGraphStats.class); + private static final double LOG_2 = Math.log(2); + private static final int MAX_LOG_DEGREE = 20; + + private DirectedGraphStats() { } + + /** + * Calculate and print on master statistics about in and out degrees + * of all vertices. + */ + public static <I extends WritableComparable> + Block createInAndOutDegreeStatsBlock(Class<I> idClass) { + ObjectTransfer<Iterable<I>> inEdges = new ObjectTransfer<>(); + + Block announceToNeighbors = Pieces.sendMessageToNeighbors( + "AnnounceToNeighbors", + idClass, + VertexSuppliers.<I, Writable, Writable>vertexIdSupplier(), + inEdges.<I, Writable, Writable>castToConsumer()); + + return new SequenceBlock( + announceToNeighbors, + new AggregateInAndOutDegreeStatsPiece<>( + inEdges.<I, Writable, Writable>castToSupplier())); + } + + /** Aggregating in and out degree statistics */ + private static class AggregateInAndOutDegreeStatsPiece + <I extends WritableComparable> + extends Piece<I, Writable, Writable, Writable, Object> { + private final + SupplierFromVertex<I, Writable, Writable, Iterable<I>> inEdges; + + private ReducerHandle<IntWritable, IntWritable> maxDegreeAgg; + + private ReducerMapHandle<IntWritable, LongWritable, LongWritable> + inHistograms; + private ReducerMapHandle<IntWritable, LongWritable, LongWritable> + outHistograms; + + private ReducerMapHandle<IntWritable, LongWritable, LongWritable> + inVsOutHistograms; + + + public AggregateInAndOutDegreeStatsPiece( + SupplierFromVertex<I, Writable, Writable, Iterable<I>> inEdges) { + this.inEdges = inEdges; + } + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + inHistograms = BasicMapReduce.createLocalMapHandles( + IntTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG, reduceApi); + outHistograms = BasicMapReduce.createLocalMapHandles( + IntTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG, reduceApi); + + inVsOutHistograms = BasicMapReduce.createLocalMapHandles( + IntTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG, reduceApi); + + maxDegreeAgg = + reduceApi.createLocalReducer(new MaxReduce<>(IntTypeOps.INSTANCE)); + } + + @Override + public VertexSender<I, Writable, Writable> getVertexSender( + BlockWorkerSendApi<I, Writable, Writable, Writable> workerApi, + Object executionStage) { + final IntWritable indexWrap = new IntWritable(); + + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, Writable, Writable> vertex) { + Iterable<I> in = inEdges.get(vertex); + + int inCount = Iterables.size(in); + int outCount = vertex.getNumEdges(); + + reduceInt(maxDegreeAgg, Math.max(inCount, outCount)); + increment(inHistograms, inCount); + increment(outHistograms, outCount); + increment(inVsOutHistograms, + log2(inCount + 1) * MAX_LOG_DEGREE + log2(outCount + 1)); + + // TODO add count for common edges. + } + + private int log2(int value) { + return (int) (Math.log(value) / LOG_2); + } + + private void increment( + ReducerMapHandle<IntWritable, LongWritable, LongWritable> + reduceHandle, + int index) { + indexWrap.set(index); + reduceLong(inHistograms.get(indexWrap), 1); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + + int maxDegree = maxDegreeAgg.getReducedValue(master).get(); + LOG.info("Max degree : " + maxDegree); + + StringBuilder sb = new StringBuilder("In and out degree histogram:\n"); + sb.append("degree\tnumIn\tnumOut\n"); + + final IntWritable index = new IntWritable(); + + for (int i = 0; i <= maxDegree; i++) { + index.set(i); + long numIn = inHistograms.get(index).getReducedValue(master).get(); + long numOut = outHistograms.get(index).getReducedValue(master).get(); + if (numIn > 0 || numOut > 0) { + sb.append(i + "\t" + numIn + "\t" + numOut + "\n"); + } + } + LOG.info(sb); + + sb = new StringBuilder("In vs out degree log/log histogram:\n"); + sb.append("<inDeg\t<outDeg\tnum\n"); + + for (int in = 0; in < MAX_LOG_DEGREE; in++) { + for (int out = 0; out < MAX_LOG_DEGREE; out++) { + index.set(in * MAX_LOG_DEGREE + out); + long num = inVsOutHistograms.get(index).getReducedValue(master).get(); + if (num > 0) { + sb.append(Math.pow(2, in) + "\t" + Math.pow(2, out) + + "\t" + num + "\n"); + } + } + } + LOG.info(sb); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java new file mode 100644 index 0000000..950c144 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/PartitioningStats.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.stats; + +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.SendMessageChain; +import org.apache.giraph.function.primitive.DoubleConsumer; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.reducers.impl.PairReduce; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Utility blocks for calculating stats for a given partitioning - an + * assignment of vertices to buckets. + */ +public class PartitioningStats { + private static final Logger LOG = Logger.getLogger(PartitioningStats.class); + + private PartitioningStats() { } + + /** + * Calculate edge locality - ratio of edges that are within a same bucket. + */ + public static <V extends Writable> Block calculateEdgeLocality( + SupplierFromVertex<WritableComparable, V, Writable, LongWritable> + bucketSupplier, + DoubleConsumer edgeLocalityConsumer) { + final Pair<LongWritable, LongWritable> pair = + Pair.of(new LongWritable(), new LongWritable()); + return SendMessageChain.<WritableComparable, V, Writable, LongWritable> + startSendToNeighbors( + "CalcLocalEdgesPiece", + LongWritable.class, + bucketSupplier + ).endReduceWithMaster( + "AggregateEdgeLocalityPiece", + new PairReduce<>(SumReduce.LONG, SumReduce.LONG), + (vertex, messages) -> { + long bucket = bucketSupplier.get(vertex).get(); + int local = 0; + int total = 0; + for (LongWritable otherCluster : messages) { + total++; + if (bucket == otherCluster.get()) { + local++; + } + } + pair.getLeft().set(local); + pair.getRight().set(total); + return pair; + }, + (reducedPair, master) -> { + long localEdges = reducedPair.getLeft().get(); + long totalEdges = reducedPair.getRight().get(); + double edgeLocality = (double) localEdges / totalEdges; + LOG.info("locality ratio = " + edgeLocality); + master.getCounter( + "Edge locality stats", "edge locality (in percent * 1000)") + .setValue((long) (edgeLocality * 100000)); + edgeLocalityConsumer.apply(edgeLocality); + } + ); + } + + /** + * Calculates average fanout - average number of distinct buckets that vertex + * has neighbors in. + */ + public static <V extends Writable> Block calculateFanout( + SupplierFromVertex<WritableComparable, V, Writable, LongWritable> + bucketSupplier, + DoubleConsumer averageFanoutConsumer) { + final Pair<LongWritable, LongWritable> pair = + Pair.of(new LongWritable(), new LongWritable(1)); + return SendMessageChain.<WritableComparable, V, Writable, LongWritable> + startSendToNeighbors( + "CalcFanoutPiece", + LongWritable.class, + bucketSupplier + ).endReduceWithMaster( + "AggregateFanoutPiece", + new PairReduce<>(SumReduce.LONG, SumReduce.LONG), + (vertex, messages) -> { + LongSet setOfNeighborBuckets = new LongOpenHashSet(); + for (LongWritable neighborBucket : messages) { + setOfNeighborBuckets.add(neighborBucket.get()); + } + pair.getLeft().set(setOfNeighborBuckets.size()); + return pair; + }, + (reducedPair, master) -> { + long fanout = reducedPair.getLeft().get(); + long numVertices = reducedPair.getRight().get(); + double avgFanout = (double) fanout / numVertices; + LOG.info("fanout ratio = " + avgFanout); + master.getCounter("Fanout stats", "fanout (in percent * 1000)") + .setValue((long) (avgFanout * 100000)); + averageFanoutConsumer.apply(avgFanout); + } + ); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java new file mode 100644 index 0000000..4240dd7 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/stats/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities for calculating graph statistics. + */ +package org.apache.giraph.block_app.library.stats; http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java new file mode 100644 index 0000000..13dfbdb --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/AbstractPageRankExampleBlockFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +/** + * Parent class for PageRank example of using BlockFactory. Contains same typing information needed + * for all examples. + */ +public abstract class AbstractPageRankExampleBlockFactory extends AbstractBlockFactory<Object> { + public static final IntConfOption NUM_ITERATIONS = new IntConfOption( + "page_rank_example.num_iterations", 10, "num iterations"); + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<DoubleWritable> getVertexValueClass(GiraphConfiguration conf) { + return DoubleWritable.class; + } + + @Override + protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java new file mode 100644 index 0000000..dd70c0e --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankExampleBlockFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * PageRank example of using BlockFactory - in it's simplest form, using functional primitives. + */ +public class PageRankExampleBlockFactory extends AbstractPageRankExampleBlockFactory { + @Override + @SuppressWarnings("rawtypes") + public Block createBlock(GiraphConfiguration conf) { + Block iter = Pieces.<WritableComparable, DoubleWritable, Writable, DoubleWritable> + sendMessageToNeighbors( + "IterationPiece", + SumMessageCombiner.DOUBLE, + (vertex) -> new DoubleWritable(vertex.getValue().get() / vertex.getNumEdges()), + (vertex, value) -> { + double sum = value != null ? value.get() : 0; + vertex.getValue().set(0.15f + 0.85f * sum); + }); + return new RepeatBlock(NUM_ITERATIONS.get(conf), iter); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java new file mode 100644 index 0000000..1585557 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithConvergenceExampleBlockFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.library.SendMessageChain; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * PageRank example with convergence check, using functional primitives and SendMessageChain + * for send/receive/reply logic. + */ +public class PageRankWithConvergenceExampleBlockFactory + extends AbstractPageRankExampleBlockFactory { + private static final double EPS = 1e-3; + private static final LongWritable ONE = new LongWritable(1); + private static final LongWritable ZERO = new LongWritable(0); + + @Override + @SuppressWarnings("rawtypes") + public Block createBlock(GiraphConfiguration conf) { + ObjectTransfer<Boolean> converged = new ObjectTransfer<>(); + + Block iter = SendMessageChain.<WritableComparable, DoubleWritable, Writable, DoubleWritable> + startSendToNeighbors( + "PageRankUpdate", + SumMessageCombiner.DOUBLE, + (vertex) -> new DoubleWritable(vertex.getValue().get() / vertex.getNumEdges()) + ).endReduce( + "PageRankCheckConvergence", + SumReduce.LONG, + (vertex, value) -> { + double sum = value != null ? value.get() : 0; + double newValue = 0.15f + 0.85f * sum; + double change = Math.abs(newValue - vertex.getValue().get()); + vertex.getValue().set(newValue); + return (change > EPS) ? ONE : ZERO; + }, + (changingCount) -> converged.apply(changingCount.get() == 0) + ); + + return new RepeatUntilBlock( + NUM_ITERATIONS.get(conf), + iter, + converged); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java new file mode 100644 index 0000000..878bf1f --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesAndConvergenceExampleBlockFactory.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * PageRank example with convergence check. + */ +@SuppressWarnings("rawtypes") +public class PageRankWithPiecesAndConvergenceExampleBlockFactory extends AbstractPageRankExampleBlockFactory { + private static final double EPS = 1e-3; + + @Override + public Block createBlock(GiraphConfiguration conf) { + ObjectTransfer<Boolean> converged = new ObjectTransfer<>(); + ObjectTransfer<Double> vertexValueChange = new ObjectTransfer<>(); + return new RepeatUntilBlock( + NUM_ITERATIONS.get(conf), + new SequenceBlock( + new PageRankUpdatePiece(vertexValueChange), + new PageRankConvergencePiece(vertexValueChange, converged) + ), + converged); + } + + /** One PageRank iteration */ + public static class PageRankUpdatePiece + extends Piece<WritableComparable, DoubleWritable, Writable, DoubleWritable, Object> { + private final Consumer<Double> changeConsumer; + + public PageRankUpdatePiece(Consumer<Double> changeConsumer) { + this.changeConsumer = changeConsumer; + } + + @Override + public VertexSender<WritableComparable, DoubleWritable, Writable> getVertexSender( + final BlockWorkerSendApi<WritableComparable, DoubleWritable, Writable, + DoubleWritable> workerApi, + Object executionStage) { + DoubleWritable message = new DoubleWritable(); + return (vertex) -> { + message.set(vertex.getValue().get() / vertex.getNumEdges()); + workerApi.sendMessageToAllEdges(vertex, message); + }; + } + + @Override + public VertexReceiver<WritableComparable, DoubleWritable, Writable, DoubleWritable> + getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi, Object executionStage) { + return (vertex, messages) -> { + double sum = 0; + for (DoubleWritable value : messages) { + sum += value.get(); + } + double newValue = 0.15f + 0.85f * sum; + changeConsumer.apply(Math.abs(newValue - vertex.getValue().get())); + vertex.getValue().set(newValue); + }; + } + + @Override + public MessageCombiner<? super WritableComparable, DoubleWritable> getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return SumMessageCombiner.DOUBLE; + } + } + + /** PageRank convergence check */ + public static class PageRankConvergencePiece + extends Piece<WritableComparable, DoubleWritable, Writable, NoMessage, Object> { + private ReducerHandle<LongWritable, LongWritable> countModified; + + private final Supplier<Double> changeSupplier; + private final Consumer<Boolean> converged; + + public PageRankConvergencePiece( + Supplier<Double> changeSupplier, + Consumer<Boolean> converged) { + this.changeSupplier = changeSupplier; + this.converged = converged; + } + + @Override + public void registerReducers(CreateReducersApi reduceApi, Object executionStage) { + countModified = reduceApi.createLocalReducer(SumReduce.LONG); + } + + @Override + public VertexSender<WritableComparable, DoubleWritable, Writable> + getVertexSender( + BlockWorkerSendApi<WritableComparable, DoubleWritable, Writable, NoMessage> workerApi, + Object executionStage) { + return (vertex) -> { + double change = changeSupplier.get(); + if (change > EPS) { + reduceLong(countModified, 1); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + LongWritable count = countModified.getReducedValue(master); + converged.apply(count.get() == 0); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java new file mode 100644 index 0000000..8881752 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithPiecesExampleBlockFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * PageRank example of using BlockFactory - in it's simplest form. + * This single class represents everything needed for the application. + * To use it - set BlockUtils.BLOCK_FACTORY_CLASS property to this class. + * + * Note, as a general practice, for iteration Piece to be reusable, it should not + * assume fixed vertex value class, but specify interface it needs, and use it instead. + */ +public class PageRankWithPiecesExampleBlockFactory extends AbstractPageRankExampleBlockFactory { + @Override + public Block createBlock(GiraphConfiguration conf) { + return new RepeatBlock(NUM_ITERATIONS.get(conf), new PageRankUpdatePiece()); + } + + /** One PageRank iteration */ + @SuppressWarnings("rawtypes") + public static class PageRankUpdatePiece + extends Piece<WritableComparable, DoubleWritable, Writable, DoubleWritable, Object> { + @Override + public VertexSender<WritableComparable, DoubleWritable, Writable> + getVertexSender(BlockWorkerSendApi<WritableComparable, DoubleWritable, Writable, + DoubleWritable> workerApi, + Object executionStage) { + DoubleWritable message = new DoubleWritable(); + return (vertex) -> { + message.set(vertex.getValue().get() / vertex.getNumEdges()); + workerApi.sendMessageToAllEdges(vertex, message); + }; + } + + @Override + public VertexReceiver<WritableComparable, DoubleWritable, Writable, DoubleWritable> + getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi, Object executionStage) { + return (vertex, messages) -> { + double sum = 0; + for (DoubleWritable value : messages) { + sum += value.get(); + } + vertex.getValue().set(0.15f + 0.85f * sum); + }; + } + + @Override + public MessageCombiner<? super WritableComparable, DoubleWritable> getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return SumMessageCombiner.DOUBLE; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java new file mode 100644 index 0000000..c939701 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/PageRankWithTransferAndConvergenceExampleBlockFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * PageRank example with convergence check, using functional primitives + */ +public class PageRankWithTransferAndConvergenceExampleBlockFactory + extends AbstractPageRankExampleBlockFactory { + private static final double EPS = 1e-3; + private static final LongWritable ONE = new LongWritable(1); + private static final LongWritable ZERO = new LongWritable(0); + + @Override + @SuppressWarnings("rawtypes") + public Block createBlock(GiraphConfiguration conf) { + ObjectTransfer<Boolean> converged = new ObjectTransfer<>(); + ObjectTransfer<Double> vertexValueChange = new ObjectTransfer<>(); + + Block iter = Pieces.<WritableComparable, DoubleWritable, Writable, DoubleWritable> + sendMessageToNeighbors( + "IterationPiece", + SumMessageCombiner.DOUBLE, + (vertex) -> new DoubleWritable(vertex.getValue().get() / vertex.getNumEdges()), + (vertex, value) -> { + double sum = value != null ? value.get() : 0; + double newValue = 0.15f + 0.85f * sum; + vertexValueChange.apply(Math.abs(newValue - vertex.getValue().get())); + vertex.getValue().set(newValue); + }); + + Block checkConverged = Pieces.reduce( + "CheckConvergedPiece", + SumReduce.LONG, + (vertex) -> { + double change = vertexValueChange.get(); + return (change > EPS) ? ONE : ZERO; + }, + (changingCount) -> converged.apply(changingCount.get() == 0)); + + return new RepeatUntilBlock( + NUM_ITERATIONS.get(conf), + new SequenceBlock(iter, checkConverged), + converged); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java new file mode 100644 index 0000000..b6b65ba --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/examples/pagerank/TestPageRankExample.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.examples.pagerank; + +import org.apache.giraph.block_app.framework.BlockFactory; +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit; +import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.WritableComparable; +import org.junit.Assert; +import org.junit.Test; + +public class TestPageRankExample { + private <I extends WritableComparable> + void testTenIterations(final Class<I> type, + final Class<? extends BlockFactory<?>> factory) throws Exception { + TestGraphUtils.runTest( + TestGraphUtils.chainModifiers( + new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(), + new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))), + (graph) -> { + float outside = 0.8759f; + float inside = 1.2481f; + float isolated = 0.15f; + + for (int i : new int[] {0, 1, 4, 5}) { + Assert.assertEquals(outside, graph.getValue(i).get(), 0.001); + } + for (int i : new int[] {2, 3}) { + Assert.assertEquals(inside, graph.getValue(i).get(), 0.001); + } + for (int i : new int[] {6}) { + Assert.assertEquals(isolated, graph.getValue(i).get(), 0.001); + } + }, + (GiraphConfiguration conf) -> { + GiraphConstants.VERTEX_ID_CLASS.set(conf, type); + BlockUtils.setBlockFactoryClass(conf, factory); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 10); + }); + } + + private void testTenIterationsOnAllTypes(Class<? extends BlockFactory<?>> factory) + throws Exception { + testTenIterations( + IntWritable.class, + factory); + testTenIterations( + LongWritable.class, + factory); + } + + private <I extends WritableComparable> + void testConverging( + final Class<I> type, Class<? extends BlockFactory<?>> factory) throws Exception { + TestGraphUtils.runTest( + new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(), + (graph) -> { + }, + (GiraphConfiguration conf) -> { + GiraphConstants.VERTEX_ID_CLASS.set(conf, type); + BlockUtils.setBlockFactoryClass(conf, factory); + AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, Integer.MAX_VALUE); + }); + // Test is that this doesn't loop forever, even though num_iterations=Integer.MAX_VALUE + } + + private <I extends WritableComparable> + void testConvergingOnAllTypes(Class<? extends BlockFactory<?>> factory) throws Exception { + testConverging( + IntWritable.class, + factory); + testConverging( + LongWritable.class, + factory); + } + + @Test + public void testExample() throws Exception { + testTenIterationsOnAllTypes(PageRankWithPiecesExampleBlockFactory.class); + } + + @Test + public void testConvergenceExample() throws Exception { + testTenIterationsOnAllTypes(PageRankWithPiecesAndConvergenceExampleBlockFactory.class); + testConvergingOnAllTypes(PageRankWithPiecesAndConvergenceExampleBlockFactory.class); + } + + @Test + public void testFunctionalExample() throws Exception { + testTenIterationsOnAllTypes(PageRankExampleBlockFactory.class); + } + + @Test + public void testFunctionalConvergenceExample() throws Exception { + testTenIterationsOnAllTypes(PageRankWithTransferAndConvergenceExampleBlockFactory.class); + testConvergingOnAllTypes(PageRankWithTransferAndConvergenceExampleBlockFactory.class); + } + + @Test + public void testFunctionalChainConvergenceExample() throws Exception { + testTenIterationsOnAllTypes(PageRankWithConvergenceExampleBlockFactory.class); + testConvergingOnAllTypes(PageRankWithConvergenceExampleBlockFactory.class); + } +}
