Repository: giraph Updated Branches: refs/heads/trunk 77f8a075c -> acd532373
[GIRAPH 1013] Adding TestGraphUtils and NumericTestGraph Summary: Adding simplified framework for running application tests. Code for testing is going to be much shorter, especially when Java 8 is used. Only difference compared to our codebase is addition of SendingMessagesTest to showcase these capabilities Test Plan: mvn clean install Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D40533 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/acd53237 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/acd53237 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/acd53237 Branch: refs/heads/trunk Commit: acd532373072b69d0d802c4456943aa35388bdca Parents: 77f8a07 Author: Igor Kabiljo <ikabi...@fb.com> Authored: Mon Jun 22 21:44:49 2015 -0700 Committer: Igor Kabiljo <ikabi...@fb.com> Committed: Thu Jun 25 17:40:18 2015 -0700 ---------------------------------------------------------------------- .../block_app/test_setup/NumericTestGraph.java | 382 +++++++++++++++++++ .../block_app/test_setup/TestGraphChecker.java | 34 ++ .../block_app/test_setup/TestGraphModifier.java | 34 ++ .../block_app/test_setup/TestGraphUtils.java | 208 ++++++++++ .../test_setup/graphs/EachVertexInit.java | 49 +++ .../test_setup/graphs/Small1GraphInit.java | 62 +++ .../test_setup/graphs/Small2GraphInit.java | 63 +++ .../test_setup/graphs/SyntheticGraphInit.java | 94 +++++ .../test_setup/graphs/package-info.java | 21 + .../block_app/test_setup/package-info.java | 21 + .../framework/SendingMessagesTest.java | 138 +++++++ .../framework/TestLongNullNullBlockFactory.java | 46 +++ .../block_app/framework/TestWorkerMessages.java | 111 ++++++ .../reducers/array/ObjectStripingTest.java | 17 + 14 files changed, 1280 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java new file mode 100644 index 0000000..4886c80 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.function.Function; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * Wraps TestGraph to allow using numbers to create and inspect the graph, + * instead of needing to have actual Writable values, which don't have + * auto-boxing. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class NumericTestGraph<I extends WritableComparable, + V extends Writable, + E extends Writable> { + + private static final Logger LOG = Logger.getLogger(NumericTestGraph.class); + + private final TestGraph<I, V, E> testGraph; + private final Function<Number, I> numberToVertexId; + private final Function<Number, V> numberToVertexValue; + private final Function<Number, E> numberToEdgeValue; + + public NumericTestGraph(TestGraph<I, V, E> testGraph) { + this.testGraph = testGraph; + numberToVertexId = + numericConvForType(testGraph.getConf().getVertexIdClass()); + numberToVertexValue = + numericConvForType(testGraph.getConf().getVertexValueClass()); + numberToEdgeValue = + numericConvForType(testGraph.getConf().getEdgeValueClass()); + Preconditions.checkState(this.numberToVertexId != null); + } + + public NumericTestGraph(GiraphConfiguration conf) { + this(new TestGraph<I, V, E>(conf)); + } + + public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { + return testGraph.getConf(); + } + + public TestGraph<I, V, E> getTestGraph() { + return testGraph; + } + + + /** + * Get Vertex for a given id. + */ + public Vertex<I, V, E> getVertex(Number vertexId) { + return testGraph.getVertex(numberToVertexId(vertexId)); + } + + /** + * Get Vertex Value for a given id. + */ + public V getValue(Number vertexId) { + return testGraph.getVertex(numberToVertexId(vertexId)).getValue(); + } + + /** + * Get number of vertices in the graph + */ + public int getVertexCount() { + return testGraph.getVertices().size(); + } + + /** + * Add Vertex with a given id to the graph, initializing it to + * default vertex value and no edges. + */ + public void addVertex(Number vertexId) { + addVertex(vertexId, (Number) null); + } + + /** + * Add Vertex with a given id and a given Vertex Value to the graph, + * initializing it to have no edges. + */ + public void addVertex(Number vertexId, Number vertexValue) { + addVertex(vertexId, vertexValue, null); + } + + /** + * Add Vertex with a given id and a given Vertex Value to the graph, + * with listed outgoing edges, all initialized to same provided + * {@code edgeValue}. + */ + public void addVertex(Number vertexId, Number vertexValue, + Number edgeValue, Number... outEdges) { + Vertex<I, V, E> vertex = makeVertex( + vertexId, vertexValue, edgeValue, outEdges); + testGraph.addVertex(vertex); + } + + /** + * Add Vertex with a given id and a given Vertex Value to the graph, + * initializing it to have no edges. + */ + public void addVertex(Number vertexId, V vertexValue) { + addVertex(vertexId, vertexValue, null); + } + + /** + * Add Vertex with a given id and a given Vertex Value to the graph, + * with listed outgoing edges, all initialized to same provided + * {@code edgeSupplier}. + */ + public void addVertex(Number vertexId, V vertexValue, + Supplier<E> edgeSupplier, Number... outEdges) { + Vertex<I, V, E> vertex = makeVertex( + vertexId, vertexValue, edgeSupplier, outEdges); + testGraph.addVertex(vertex); + } + + /** + * Add Edge to the graph with default Edge Value, by adding it to + * outEdges of {@code fromVertex}, potentially creating {@code fromVertex} + * if it doesn't exist. + */ + public void addEdge(Number fromVertex, Number toVertex) { + addEdge(fromVertex, toVertex, (Number) null); + } + + /** + * Add Edge to the graph with provided Edge Value, by adding it to + * outEdges of {@code fromVertex}, potentially creating {@code fromVertex} + * if it doesn't exist. + */ + public void addEdge(Number fromVertex, Number toVertex, Number edgeValue) { + testGraph.addEdge( + numberToVertexId(fromVertex), + numberToVertexId(toVertex), + numberToEdgeValue(edgeValue)); + } + + /** + * Add Edge to the graph with provided Edge Value, by adding it to + * outEdges of {@code fromVertex}, potentially creating {@code fromVertex} + * if it doesn't exist. + */ + public void addEdge(Number fromVertex, Number toVertex, E edgeValue) { + testGraph.addEdge( + numberToVertexId(fromVertex), numberToVertexId(toVertex), edgeValue); + } + + /** + * Add symmetric Edge to the graph with default Edge Value, by adding it to + * outEdges of vertices on both ends, potentially creating them both, + * if they don't exist. + */ + public void addSymmetricEdge(Number fromVertex, Number toVertex) { + addEdge(fromVertex, toVertex); + addEdge(toVertex, fromVertex); + } + + /** + * Add symmetric Edge to the graph with provided Edge Value, by adding it to + * outEdges of vertices on both ends, potentially creating them both, + * if they don't exist. + */ + public void addSymmetricEdge( + Number fromVertex, Number toVertex, Number edgeValue) { + addEdge(fromVertex, toVertex, edgeValue); + addEdge(toVertex, fromVertex, edgeValue); + } + + /** + * Add symmetric Edge to the graph with provided Edge Value, by adding it to + * outEdges of vertices on both ends, potentially creating them both, + * if they don't exist. + */ + public void addSymmetricEdge(Number vertexId, Number toVertex, E edgeValue) { + addEdge(vertexId, toVertex, edgeValue); + addEdge(toVertex, vertexId, edgeValue); + } + + /** + * Creates a new Vertex object, without adding it into the graph. + * + * This function is safe to call from multiple threads at the same time, + * and then synchronize only on actual addition of Vertex to the graph + * itself. + */ + public Vertex<I, V, E> makeVertex( + Number vertexId, V vertexValue, + Entry<? extends Number, ? extends Number>... edges) { + Vertex<I, V, E> vertex = getConf().createVertex(); + List<Edge<I, E>> edgesList = new ArrayList<>(); + + int i = 0; + for (Entry<? extends Number, ? extends Number> edge: edges) { + edgesList.add(EdgeFactory.create( + numberToVertexId(edge.getKey()), + numberToEdgeValue(edge.getValue()))); + i++; + } + vertex.initialize( + numberToVertexId(vertexId), + vertexValue != null ? + vertexValue : getConf().createVertexValue(), + edgesList); + return vertex; + } + + /** + * Creates a new Vertex object, without adding it into the graph. + * + * This function is safe to call from multiple threads at the same time, + * and then synchronize only on actual addition of Vertex to the graph + * itself. + */ + public Vertex<I, V, E> makeVertex( + Number vertexId, V vertexValue, + Supplier<E> edgeSupplier, Number... edges) { + Vertex<I, V, E> vertex = getConf().createVertex(); + + List<Edge<I, E>> edgesList = new ArrayList<>(); + for (Number edge: edges) { + edgesList.add( + EdgeFactory.create(numberToVertexId.apply(edge), + edgeSupplier != null ? + edgeSupplier.get() : getConf().createEdgeValue())); + } + + vertex.initialize( + numberToVertexId.apply(vertexId), + vertexValue != null ? + vertexValue : getConf().createVertexValue(), + edgesList); + return vertex; + } + + /** + * Creates a new Vertex object, without adding it into the graph. + * + * This function is safe to call from multiple threads at the same time, + * and then synchronize only on actual addition of Vertex to the graph + * itself. + */ + public Vertex<I, V, E> makeVertex( + Number vertexId, Number value, + Number edgeValue, Number... edges) { + Vertex<I, V, E> vertex = getConf().createVertex(); + + List<Edge<I, E>> edgesList = new ArrayList<>(); + for (Number edge: edges) { + edgesList.add( + EdgeFactory.create(numberToVertexId.apply(edge), + numberToEdgeValue(edgeValue))); + } + + vertex.initialize( + numberToVertexId.apply(vertexId), + numberToVertexValue(value), + edgesList); + return vertex; + } + + public I numberToVertexId(Number value) { + return numberToVertexId.apply(value); + } + + public V numberToVertexValue(Number value) { + return value != null ? + numberToVertexValue.apply(value) : getConf().createVertexValue(); + } + + public E numberToEdgeValue(Number edgeValue) { + return edgeValue != null ? + numberToEdgeValue.apply(edgeValue) : getConf().createEdgeValue(); + } + + public Vertex<I, V, E> createVertex() { + return getConf().createVertex(); + } + + public void initializeVertex( + Vertex<I, V, E> v, I id, Supplier<V> valueSupplier, + List<Edge<I, E>> edgesList) { + v.initialize( + id, + valueSupplier != null ? + valueSupplier.get() : getConf().createVertexValue(), + edgesList != null ? edgesList : new ArrayList<Edge<I, E>>()); + } + + @Override + public String toString() { + return testGraph.toString(); + } + + + private static Function<Number, IntWritable> numberToInt() { + return new Function<Number, IntWritable>() { + @Override + public IntWritable apply(Number input) { + return new IntWritable(input.intValue()); + } + }; + } + + private static Function<Number, LongWritable> numberToLong() { + return new Function<Number, LongWritable>() { + @Override + public LongWritable apply(Number input) { + return new LongWritable(input.longValue()); + } + }; + } + + private static Function<Number, DoubleWritable> numberToDouble() { + return new Function<Number, DoubleWritable>() { + @Override + public DoubleWritable apply(Number input) { + return new DoubleWritable(input.doubleValue()); + } + }; + } + + private static Function<Number, FloatWritable> numberToFloat() { + return new Function<Number, FloatWritable>() { + @Override + public FloatWritable apply(Number input) { + return new FloatWritable(input.floatValue()); + } + }; + } + + private static <T> Function<Number, T> numericConvForType(Class<T> type) { + if (type.equals(LongWritable.class)) { + return (Function) numberToLong(); + } else if (type.equals(IntWritable.class)) { + return (Function) numberToInt(); + } else if (type.equals(DoubleWritable.class)) { + return (Function) numberToDouble(); + } else if (type.equals(FloatWritable.class)) { + return (Function) numberToFloat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java new file mode 100644 index 0000000..c7b2bc5 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphChecker.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Test graph checker, function that checks whether output + * of a test is correct. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public interface TestGraphChecker<I extends WritableComparable, + V extends Writable, E extends Writable> { + void checkOutput(NumericTestGraph<I, V, E> graph); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java new file mode 100644 index 0000000..1017590 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphModifier.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Test graph modifier, function that initializes graph + * (i.e. creates edges and vertices) for a given test. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public interface TestGraphModifier<I extends WritableComparable, + V extends Writable, E extends Writable> { + void modifyGraph(NumericTestGraph<I, V, E> graph); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java new file mode 100644 index 0000000..15bf434 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/TestGraphUtils.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup; + +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.BulkConfigurator; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.function.Supplier; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Utility functions for running TestGraph unit tests. + */ +public class TestGraphUtils { + /** modify locally for running full Digraph tests from IDE */ + public static final + BooleanConfOption USE_FULL_GIRAPH_ENV_IN_TESTS = new BooleanConfOption( + "giraph.blocks.test_setup.use_full_giraph_env_in_tests", false, + "Whether to use full giraph environemnt for tests, " + + "or only local implementation"); + + // if you want to check stability of the test and make sure it passes always + // test it with larger number, like ~10. + private static int TEST_REPEAT_TIMES = 1; + + private TestGraphUtils() { } + + /** + * Creates configuration using configurator, initializes the graph using + * graphInitializer, and checks it via graphChecker. + * + * Supports using TEST_REPEAT_TIMES for running the same test multiple times. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + void runTest( + final TestGraphModifier<? super I, ? super V, ? super E> graphInitializer, + final TestGraphChecker<? super I, ? super V, ? super E> graphChecker, + final BulkConfigurator configurator) throws Exception { + repeat( + repeatTimes(), + new OneTest() { + @Override + public void test() throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + configurator.configure(conf); + BlockUtils.initAndCheckConfig(conf); + runTest(graphInitializer, graphChecker, conf); + } + }); + } + + /** + * Uses provided configuration, initializes the graph using + * graphInitializer, and checks it via graphChecker. + */ + public static + <I extends WritableComparable, E extends Writable, V extends Writable> + void runTest( + TestGraphModifier<? super I, ? super V, ? super E> graphInitializer, + TestGraphChecker<? super I, ? super V, ? super E> graphChecker, + GiraphConfiguration conf) throws Exception { + NumericTestGraph<I, V, E> graph = new NumericTestGraph<>(conf); + graphInitializer.modifyGraph((NumericTestGraph) graph); + runTest(graph, graphChecker); + } + + /** + * Base of runTest. Takes a created graph, a graph-checker and conf and runs + * the test. + */ + public static + <I extends WritableComparable, E extends Writable, V extends Writable> + void runTest( + NumericTestGraph<I, V, E> graph, + TestGraphChecker<? super I, ? super V, ? super E> graphChecker + ) throws Exception { + graph = new NumericTestGraph<I, V, E>( + LocalBlockRunner.runApp( + graph.getTestGraph(), useFullDigraphTests(graph.getConf()))); + if (graphChecker != null) { + graphChecker.checkOutput((NumericTestGraph) graph); + } + } + + /** + * Chain execution of multiple TestGraphModifier into one. + */ + @SafeVarargs + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + TestGraphModifier<I, V, E> chainModifiers( + final TestGraphModifier<I, V, E>... graphModifiers) { + return new TestGraphModifier<I, V, E>() { + @Override + public void modifyGraph( + NumericTestGraph<I, V, E> graph) { + for (TestGraphModifier<I, V, E> graphModifier : graphModifiers) { + graphModifier.modifyGraph(graph); + } + } + }; + } + + /** + * Chain execution of multiple BulkConfigurators into one. + * + * Order might matter, if they are setting the same fields. + * (later one will override what previous one already set). + */ + public static BulkConfigurator chainConfigurators( + final BulkConfigurator... configurators) { + return new BulkConfigurator() { + @Override + public void configure(GiraphConfiguration conf) { + for (BulkConfigurator configurator : configurators) { + configurator.configure(conf); + } + } + }; + } + + + public static Supplier<DoubleWritable> doubleSupplier(final double value) { + return new Supplier<DoubleWritable>() { + @Override + public DoubleWritable get() { + return new DoubleWritable(value); + } + }; + } + + public static Supplier<NullWritable> nullSupplier() { + return new Supplier<NullWritable>() { + @Override + public NullWritable get() { + return NullWritable.get(); + } + }; + } + + /** Interface for running a single test that can throw an exception */ + interface OneTest { + void test() throws Exception; + } + + private static void repeat(int times, OneTest test) throws Exception { + if (times == 1) { + test.test(); + } else { + int failures = 0; + StringBuilder failureMsgs = new StringBuilder(); + AssertionError firstError = null; + for (int i = 0; i < times; i++) { + try { + test.test(); + } catch (AssertionError error) { + failures++; + failureMsgs.append("\n").append(error.getMessage()); + if (firstError == null) { + firstError = error; + } + } + } + + if (failures > 0) { + throw new AssertionError( + "Failed " + failures + " times out of " + times + + " runs, messages: " + failureMsgs, + firstError); + } + } + } + + private static boolean useFullDigraphTests(GiraphConfiguration conf) { + return USE_FULL_GIRAPH_ENV_IN_TESTS.get(conf) || + System.getProperty("test_setup.UseFullGiraphEnvInTests") != null; + } + + private static int repeatTimes() { + String value = System.getProperty("test_setup.TestRepeatTimes"); + return value != null ? Integer.parseInt(value) : TEST_REPEAT_TIMES; + } + + public static void setTestRepeatTimes(int testRepeatTimes) { + TestGraphUtils.TEST_REPEAT_TIMES = testRepeatTimes; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java new file mode 100644 index 0000000..d51d819 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/EachVertexInit.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup.graphs; + +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Traverse each Vertex in the graph, and initialize it with a given + * consumer function. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class EachVertexInit<I extends WritableComparable, V extends Writable, + E extends Writable> implements TestGraphModifier<I, V, E> { + private final Consumer<Vertex<I, V, E>> vertexConsumer; + + public EachVertexInit(Consumer<Vertex<I, V, E>> vertexConsumer) { + this.vertexConsumer = vertexConsumer; + } + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + for (Vertex<I, V, E> vertex : graph.getTestGraph()) { + vertexConsumer.apply(vertex); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java new file mode 100644 index 0000000..ecec024 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup.graphs; + +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.function.Supplier; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Create a network that looks like: + * 1 5 + * / \ / \ 6 + * 0---2--3---4 + * + * where 6 is disconnected from the rest of the network. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class Small1GraphInit<I extends WritableComparable, + V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + + private final Supplier<V> valueSupplier; + private final Supplier<E> edgeSupplier; + + public Small1GraphInit( + Supplier<V> valueSupplier, Supplier<E> edgeSupplier) { + this.valueSupplier = valueSupplier; + this.edgeSupplier = edgeSupplier; + } + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2); + graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2); + graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1, 3); + graph.addVertex(3, valueSupplier.get(), edgeSupplier, 2, 4, 5); + graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5); + graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4); + graph.addVertex(6, valueSupplier.get(), edgeSupplier); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java new file mode 100644 index 0000000..eb38c45 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup.graphs; + +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.function.Supplier; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Create a network that looks like: + * 1 5 + * / \ / \ 6 + * 0---2 3---4 + * + * where 6 is disconnected from the rest of the network. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class Small2GraphInit<I extends WritableComparable, + V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + + private final Supplier<V> valueSupplier; + private final Supplier<E> edgeSupplier; + + public Small2GraphInit( + Supplier<V> valueSupplier, Supplier<E> edgeSupplier) { + this.valueSupplier = valueSupplier; + this.edgeSupplier = edgeSupplier; + } + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2); + graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2); + graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1); + graph.addVertex(3, valueSupplier.get(), edgeSupplier, 4, 5); + graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5); + graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4); + graph.addVertex(6, valueSupplier.get(), edgeSupplier); + } +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java new file mode 100644 index 0000000..3de158a --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup.graphs; + +import java.util.Random; + +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.conf.FloatConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.function.Supplier; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Creates synthetic graphs, that can have community structure. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class SyntheticGraphInit<I extends WritableComparable, + V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + public static final IntConfOption NUM_COMMUNITIES = new IntConfOption( + "test.SyntheticGraphCreator.NUM_COMMUNITIES", -1, ""); + public static final IntConfOption NUM_VERTICES = new IntConfOption( + "test.SyntheticGraphCreator.NUM_VERTICES", -1, ""); + public static final IntConfOption NUM_EDGES_PER_VERTEX = new IntConfOption( + "test.SyntheticGraphCreator.NUM_EDGES_PER_VERTEX", -1, ""); + public static final FloatConfOption ACTUAL_LOCALITY_RATIO = + new FloatConfOption( + "test.SyntheticGraphCreator.ACTUAL_LOCALITY_RATIO", -1, ""); + + protected final Supplier<E> edgeSupplier; + + public SyntheticGraphInit(Supplier<E> edgeSupplier) { + this.edgeSupplier = edgeSupplier; + } + + public SyntheticGraphInit() { + this.edgeSupplier = null; + } + + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + GiraphConfiguration conf = graph.getConf(); + int numPartitions = NUM_COMMUNITIES.get(conf); + int numVertices = NUM_VERTICES.get(conf); + int numEdgesPerVertex = NUM_EDGES_PER_VERTEX.get(conf); + int communitySize = numVertices / numPartitions; + float actualLocalityRatio = ACTUAL_LOCALITY_RATIO.get(conf); + Random random = new Random(42); + for (int i = 0; i < numVertices; ++i) { + for (int e = 0; e < numEdgesPerVertex / 2; ++e) { + boolean localEdge = random.nextFloat() < actualLocalityRatio; + int community = i / communitySize; + int j; + do { + if (localEdge) { + j = community * communitySize + random.nextInt(communitySize); + } else { + j = random.nextInt(numVertices); + } + } while (j == i); + graph.addSymmetricEdge( + i, j, edgeSupplier != null ? edgeSupplier.get() : null); + } + } + +// if (vertexModifier != null) { +// for (int i = 0; i < numVertices; i++) { +// vertexModifier.modifyVertexValue(i, graph.getVertex(i).getValue()); +// } +// } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java new file mode 100644 index 0000000..e16923b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Common Graphs for unit tests. + */ +package org.apache.giraph.block_app.test_setup.graphs; http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java new file mode 100644 index 0000000..763f79e --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities for unit tests. + */ +package org.apache.giraph.block_app.test_setup; http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java new file mode 100644 index 0000000..d4a7c2f --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/SendingMessagesTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphChecker; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.junit.Assert; +import org.junit.Test; + +public class SendingMessagesTest { + @Test + public void createVertexOnMsgsTest() throws Exception { + TestGraphUtils.runTest( + new TestGraphModifier<LongWritable, LongWritable, Writable>() { + @Override + public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) { + graph.addEdge(1, 2); + } + }, + new TestGraphChecker<LongWritable, LongWritable, Writable>() { + @Override + public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) { + Assert.assertEquals(1, graph.getValue(2).get()); + Assert.assertEquals(0, graph.getValue(1).get()); + } + }, + new BulkConfigurator() { + @Override + public void configure(GiraphConfiguration conf) { + BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class); + } + }); + } + + @Test + public void doNotCreateVertexOnMsgsTest() throws Exception { + TestGraphUtils.runTest( + new TestGraphModifier<LongWritable, LongWritable, Writable>() { + @Override + public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) { + graph.addEdge(1, 2); + } + }, + new TestGraphChecker<LongWritable, LongWritable, Writable>() { + @Override + public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) { + Assert.assertNull(graph.getVertex(2)); + Assert.assertEquals(0, graph.getValue(1).get()); + } + }, + new BulkConfigurator() { + @Override + public void configure(GiraphConfiguration conf) { + BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class); + GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.set(conf, false); + } + }); + } + + @Test + public void createMultiMsgs() throws Exception { + TestGraphUtils.runTest( + new TestGraphModifier<LongWritable, LongWritable, Writable>() { + @Override + public void modifyGraph(NumericTestGraph<LongWritable, LongWritable, Writable> graph) { + graph.addSymmetricEdge(1, 2); + graph.addSymmetricEdge(3, 2); + } + }, + new TestGraphChecker<LongWritable, LongWritable, Writable>() { + @Override + public void checkOutput(NumericTestGraph<LongWritable, LongWritable, Writable> graph) { + Assert.assertEquals(3, graph.getValue(2).get()); + Assert.assertEquals(2, graph.getValue(1).get()); + Assert.assertEquals(2, graph.getValue(3).get()); + } + }, + new BulkConfigurator() { + @Override + public void configure(GiraphConfiguration conf) { + BlockUtils.setBlockFactoryClass(conf, SendingMessagesToNeighborsBlockFactory.class); + } + }); + } + + public static class SendingMessagesToNeighborsBlockFactory extends TestLongNullNullBlockFactory { + @Override + protected Class<? extends Writable> getVertexValueClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + public Block createBlock(GiraphConfiguration conf) { + return Pieces.sendMessageToNeighbors( + "SendToNeighbors", + LongWritable.class, + VertexSuppliers.<LongWritable, LongWritable, Writable>vertexIdSupplier(), + new ConsumerWithVertex<LongWritable, LongWritable, Writable, Iterable<LongWritable>>() { + @Override + public void apply(Vertex<LongWritable, LongWritable, Writable> vertex, + Iterable<LongWritable> messages) { + long max = 0; + for (LongWritable v : messages) { + max = Math.max(max, v.get()); + } + vertex.getValue().set(max); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java new file mode 100644 index 0000000..927e715 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestLongNullNullBlockFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +public abstract class TestLongNullNullBlockFactory extends AbstractBlockFactory<Object> { + @Override + protected Class<? extends WritableComparable> getVertexIDClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<? extends Writable> getVertexValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } + + @Override + protected Class<? extends Writable> getEdgeValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java new file mode 100644 index 0000000..a68fca2 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/TestWorkerMessages.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import java.util.HashSet; +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test sending worker to worker messages + */ +public class TestWorkerMessages { + @Test + public void testWorkerMessages() throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + BlockUtils.setAndInitBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class); + TestGraph testGraph = new TestGraph(conf); + testGraph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get()); + LocalBlockRunner.runApp(testGraph); + } + + @Test + public void testWithTestSetup() throws Exception { + TestGraphUtils.runTest( + new TestGraphModifier<WritableComparable, Writable, Writable>() { + @Override + public void modifyGraph(NumericTestGraph<WritableComparable, Writable, Writable> graph) { + graph.addEdge(1, 2); + } + }, + null, + new BulkConfigurator() { + @Override + public void configure(GiraphConfiguration conf) { + BlockUtils.setBlockFactoryClass(conf, TestWorkerMessagesBlockFactory.class); + } + }); + } + + public static class TestWorkerMessagesBlockFactory extends TestLongNullNullBlockFactory { + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock( + new TestWorkerMessagesPiece(2, 4, 11), + new TestWorkerMessagesPiece(3, 5, 2, 100)); + } + } + + public static class TestWorkerMessagesPiece extends PieceWithWorkerContext<LongWritable, + Writable, Writable, NoMessage, Object, LongWritable, Object> { + private final HashSet<Long> values; + + public TestWorkerMessagesPiece(long... values) { + this.values = new HashSet<>(); + for (long value : values) { + this.values.add(value); + } + } + + @Override + public void workerContextSend(BlockWorkerContextSendApi<LongWritable> workerContextApi, + Object executionStage, Object workerValue) { + for (long value : values) { + workerContextApi.sendMessageToWorker(new LongWritable(value), + workerContextApi.getMyWorkerIndex()); + } + } + + @Override + public void workerContextReceive(BlockWorkerContextReceiveApi workerContextApi, + Object executionStage, Object workerValue, List<LongWritable> workerMessages) { + Assert.assertEquals(values.size(), workerMessages.size()); + for (LongWritable workerMessage : workerMessages) { + Assert.assertTrue(values.remove(workerMessage.get())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/acd53237/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java index 5e4eb11..fec5b39 100644 --- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/reducers/array/ObjectStripingTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.giraph.block_app.reducers.array; import static org.junit.Assert.assertEquals;