Updated Branches: refs/heads/trunk d644af866 -> 4b8fae259
GIRAPH-778: Testing with TestGraph is broken (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4b8fae25 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4b8fae25 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4b8fae25 Branch: refs/heads/trunk Commit: 4b8fae2593e3d9b7d46542d886046444aa97a092 Parents: d644af8 Author: Maja Kabiljo <[email protected]> Authored: Thu Oct 10 17:30:35 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Oct 10 17:30:35 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/bsp/ImmutableOutputCommitter.java | 2 +- .../io/formats/InMemoryVertexOutputFormat.java | 105 +++++++++++++++++++ .../giraph/utils/InternalVertexRunner.java | 33 +++++- .../java/org/apache/giraph/utils/TestGraph.java | 11 ++ .../apache/giraph/master/TestSwitchClasses.java | 2 +- ...nectedComponentsComputationTestInMemory.java | 2 +- 7 files changed, 149 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 18d956c..2d746b7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-778: Testing with TestGraph is broken (majakabiljo) + GIRAPH-777: Fix bug from GIRAPH-775 (majakabiljo) GIRAPH-776: Update Giraph to use HiveIO 0.18 (gmalewicz via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java index e5480cc..feb4041 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/ImmutableOutputCommitter.java @@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * where output isn't desired, or as a base for not using * FileOutputCommitter. */ -class ImmutableOutputCommitter extends OutputCommitter { +public class ImmutableOutputCommitter extends OutputCommitter { @Override public void abortTask(TaskAttemptContext context) throws IOException { } http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/giraph-core/src/main/java/org/apache/giraph/io/formats/InMemoryVertexOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/InMemoryVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/InMemoryVertexOutputFormat.java new file mode 100644 index 0000000..5876b37 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/InMemoryVertexOutputFormat.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.formats; + +import org.apache.giraph.bsp.ImmutableOutputCommitter; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.VertexWriter; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * VertexOutputFormat which stores all vertices in memory + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +public class InMemoryVertexOutputFormat<I extends WritableComparable, + V extends Writable, E extends Writable> extends + VertexOutputFormat<I, V, E> { + /** Graph where we store all vertices */ + private static TestGraph OUTPUT_GRAPH; + + /** + * Initialize this output format - needs to be called before running the + * application. Creates new instance of TestGraph + * + * @param conf Configuration + */ + public static void initializeOutputGraph(GiraphConfiguration conf) { + OUTPUT_GRAPH = new TestGraph(conf); + } + + /** + * Get graph containing all the vertices + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @return Output graph + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable> TestGraph<I, V, E> getOutputGraph() { + return OUTPUT_GRAPH; + } + + @Override + public VertexWriter<I, V, E> createVertexWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new VertexWriter<I, V, E>() { + @Override + public void initialize( + TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public void close( + TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public void writeVertex( + Vertex<I, V, E> vertex) throws IOException, InterruptedException { + synchronized (OUTPUT_GRAPH) { + OUTPUT_GRAPH.addVertex(vertex); + } + } + }; + } + + @Override + public void checkOutputSpecs( + JobContext context) throws IOException, InterruptedException { + } + + @Override + public OutputCommitter getOutputCommitter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new ImmutableOutputCommitter(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java index 72fab83..16e2f6a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java @@ -21,6 +21,7 @@ package org.apache.giraph.utils; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.io.formats.GiraphFileInputFormat; +import org.apache.giraph.io.formats.InMemoryVertexOutputFormat; import org.apache.giraph.job.GiraphJob; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -194,8 +195,8 @@ public class InternalVertexRunner { } /** - * Attempts to run the vertex internally in the current JVM, reading and - * writing to an in-memory graph. Will start its own zookeeper + * Attempts to run the vertex internally in the current JVM, + * reading from an in-memory graph. Will start its own zookeeper * instance. * * @param <I> Vertex ID @@ -203,12 +204,11 @@ public class InternalVertexRunner { * @param <E> Edge Value * @param conf GiraphClasses specifying which types to use * @param graph input graph - * @return iterable output data * @throws Exception if anything goes wrong */ public static <I extends WritableComparable, V extends Writable, - E extends Writable> TestGraph<I, V, E> run( + E extends Writable> void run( GiraphConfiguration conf, TestGraph<I, V, E> graph) throws Exception { File tmpDir = null; @@ -266,13 +266,36 @@ public class InternalVertexRunner { executorService.shutdown(); zookeeper.end(); } - return graph; } finally { FileUtils.delete(tmpDir); } } /** + * Attempts to run the vertex internally in the current JVM, reading and + * writing to an in-memory graph. Will start its own zookeeper + * instance. + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + * @param conf GiraphClasses specifying which types to use + * @param graph input graph + * @return Output graph + * @throws Exception if anything goes wrong + */ + public static <I extends WritableComparable, + V extends Writable, + E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput( + GiraphConfiguration conf, + TestGraph<I, V, E> graph) throws Exception { + conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class); + InMemoryVertexOutputFormat.initializeOutputGraph(conf); + InternalVertexRunner.run(conf, graph); + return InMemoryVertexOutputFormat.getOutputGraph(); + } + + /** * Configuration options for running local ZK. * * @param zkDir directory for ZK to hold files in. http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java index 312a287..183e90b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java @@ -73,6 +73,17 @@ public class TestGraph<I extends WritableComparable, } /** + * Add vertex + * + * @param vertex Vertex + * @return this + */ + public TestGraph<I, V, E> addVertex(Vertex<I, V, E> vertex) { + vertices.put(vertex.getId(), vertex); + return this; + } + + /** * Add vertex with given ID * * @param id the index http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java index e96fd12..29335af 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java @@ -54,7 +54,7 @@ public class TestSwitchClasses { graph.addVertex(id1, new StatusValue()); IntWritable id2 = new IntWritable(2); graph.addVertex(id2, new StatusValue()); - graph = InternalVertexRunner.run(conf, graph); + graph = InternalVertexRunner.runWithInMemoryOutput(conf, graph); Assert.assertEquals(2, graph.getVertices().size()); } http://git-wip-us.apache.org/repos/asf/giraph/blob/4b8fae25/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java index 1bb8e94..6154adb 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java @@ -82,7 +82,7 @@ public class ConnectedComponentsComputationTestInMemory { // run internally TestGraph<IntWritable, IntWritable, NullWritable> results = - InternalVertexRunner.run(conf, graph); + InternalVertexRunner.runWithInMemoryOutput(conf, graph); SetMultimap<Integer, Integer> components = parseResults(results);
