Author: aching
Date: Mon Dec 26 05:05:49 2011
New Revision: 1224678
URL: http://svn.apache.org/viewvc?rev=1224678&view=rev
Log:
GIRAPH-115: Port of the HCC algorithm for identifying all connected
components of a graph. (ssc via aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1224678&r1=1224677&r2=1224678&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Dec 26 05:05:49 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-115: Port of the HCC algorithm for identifying all connected
+ components of a graph. (ssc via aching)
+
GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex.
(aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.IntIntNullIntVertex;
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Implementation of the HCC algorithm that identifies connected components
and assigns each
+ * vertex its "component identifier" (the smallest vertex id in the component)
+ *
+ * The idea behind the algorithm is very simple: propagate the smallest vertex
id along the
+ * edges to all vertices of a connected component. The number of supersteps
necessary is
+ * equal to the length of the maximum diameter of all components + 1
+ *
+ * The original Hadoop-based variant of this algorithm was proposed by Kang,
Charalampos
+ * Tsourakakis and Faloutsos in "PEGASUS: Mining Peta-Scale Graphs", 2010
+ *
+ * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
+ */
+public class ConnectedComponentsVertex extends IntIntNullIntVertex {
+
+ /**
+ * Propagates the smallest vertex id to all neighbors. Will always choose
to halt and only
+ * reactivate if a smaller id has been sent to it.
+ *
+ * @param messages
+ * @throws IOException
+ */
+ @Override
+ public void compute(Iterator<IntWritable> messages) throws IOException {
+
+ int currentComponent = getVertexValue().get();
+
+ // first superstep is special, because we can simply look at the
neighbors
+ if (getSuperstep() == 0) {
+ for (Iterator<IntWritable> edges = iterator(); edges.hasNext();) {
+ int neighbor = edges.next().get();
+ if (neighbor < currentComponent) {
+ currentComponent = neighbor;
+ }
+ }
+ // only need to send value if it is not the own id
+ if (currentComponent != getVertexValue().get()) {
+ setVertexValue(new IntWritable(currentComponent));
+ for (Iterator<IntWritable> edges = iterator();
+ edges.hasNext();) {
+ int neighbor = edges.next().get();
+ if (neighbor > currentComponent) {
+ sendMsg(new IntWritable(neighbor), getVertexValue());
+ }
+ }
+ }
+
+ voteToHalt();
+ return;
+ }
+
+ boolean changed = false;
+ // did we get a smaller id ?
+ while (messages.hasNext()) {
+ int candidateComponent = messages.next().get();
+ if (candidateComponent < currentComponent) {
+ currentComponent = candidateComponent;
+ changed = true;
+ }
+ }
+
+ // propagate new component id to the neighbors
+ if (changed) {
+ setVertexValue(new IntWritable(currentComponent));
+ sendMsgToAllEdges(getVertexValue());
+ }
+ voteToHalt();
+ }
+
+}
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.lib.TextVertexInputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
unweighted
+ * graphs with int ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class IntIntNullIntTextInputFormat extends
+ TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+ IntWritable> {
+
+ @Override
+ public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
+ createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new IntIntNullIntVertexReader(
+ textInputFormat.createRecordReader(split, context));
+ }
+
+ public static class IntIntNullIntVertexReader extends
+ TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
+ NullWritable, IntWritable> {
+
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
+ lineReader) {
+ super(lineReader);
+ }
+
+ @Override
+ public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+ getCurrentVertex() throws IOException, InterruptedException {
+ BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+ vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
+ IntWritable>createVertex(getContext().getConfiguration());
+
+ String[] tokens = SEPARATOR.split(getRecordReader()
+ .getCurrentValue().toString());
+ Map<IntWritable, NullWritable> edges =
+ Maps.newHashMapWithExpectedSize(tokens.length - 1);
+ for (int n = 1; n < tokens.length; n++) {
+ edges.put(new IntWritable(Integer.parseInt(tokens[n])),
+ NullWritable.get());
+ }
+
+ IntWritable vertexId = new
IntWritable(Integer.parseInt(tokens[0]));
+ vertex.initialize(vertexId, vertexId, edges,
+ Lists.<IntWritable>newArrayList());
+
+ return vertex;
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+ }
+
+}
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link VertexCombiner} that finds the minimum {@link IntWritable}
+ */
+public class MinimumIntCombiner
+ extends VertexCombiner<IntWritable, IntWritable> {
+
+ @Override
+ public IntWritable combine(IntWritable target,
+ List<IntWritable> messages) throws IOException {
+ int minimum = Integer.MAX_VALUE;
+ for (IntWritable message : messages) {
+ if (message.get() < minimum) {
+ minimum = message.get();
+ }
+ }
+ return new IntWritable(minimum);
+ }
+}
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.giraph.lib.TextVertexOutputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Text-based {@link org.apache.giraph.graph.VertexOutputFormat} for usage with
+ * {@link ConnectedComponentsVertex}
+ *
+ * Each line consists of a vertex and its associated component (represented by
the smallest
+ * vertex id in the component)
+ */
+public class VertexWithComponentTextOutputFormat extends
+ TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> {
+
+ @Override
+ public VertexWriter<IntWritable, IntWritable, NullWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new VertexWithComponentWriter(recordWriter);
+ }
+
+ public static class VertexWithComponentWriter extends
+ TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable,
+ NullWritable> {
+
+ public VertexWithComponentWriter(RecordWriter<Text, Text> writer) {
+ super(writer);
+ }
+
+ @Override
+ public void writeVertex(BasicVertex<IntWritable, IntWritable,
+ NullWritable,?> vertex) throws IOException,
+ InterruptedException {
+ StringBuilder output = new StringBuilder();
+ output.append(vertex.getVertexId().get());
+ output.append('\t');
+ output.append(vertex.getVertexValue().get());
+ getRecordWriter().write(new Text(output.toString()), null);
+ }
+
+ }
+}
\ No newline at end of file
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Iterables;
+import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple implementation of {@link BasicVertex} using an int as id, value and
message.
+ * Edges are immutable and unweighted. This class aims to be as memory
efficient as possible.
+ */
+public abstract class IntIntNullIntVertex extends
+ BasicVertex<IntWritable, IntWritable, NullWritable,IntWritable> {
+
+ private int id;
+ private int value;
+
+ private int[] neighbors;
+ private int[] messages;
+
+ @Override
+ public void initialize(IntWritable vertexId, IntWritable vertexValue,
+ Map<IntWritable, NullWritable> edges, List<IntWritable> messages) {
+ id = vertexId.get();
+ value = vertexValue.get();
+ this.neighbors = new int[edges.size()];
+ int n = 0;
+ for (IntWritable neighbor : edges.keySet()) {
+ this.neighbors[n++] = neighbor.get();
+ }
+ this.messages = new int[messages.size()];
+ n = 0;
+ for (IntWritable message : messages) {
+ this.messages[n++] = message.get();
+ }
+ }
+
+ @Override
+ public IntWritable getVertexId() {
+ return new IntWritable(id);
+ }
+
+ @Override
+ public IntWritable getVertexValue() {
+ return new IntWritable(value);
+ }
+
+ @Override
+ public void setVertexValue(IntWritable vertexValue) {
+ value = vertexValue.get();
+ }
+
+ @Override
+ public Iterator<IntWritable> iterator() {
+ return new UnmodifiableIntArrayIterator(neighbors);
+ }
+
+ @Override
+ public NullWritable getEdgeValue(IntWritable targetVertexId) {
+ return NullWritable.get();
+ }
+
+ @Override
+ public boolean hasEdge(IntWritable targetVertexId) {
+ for (int neighbor : neighbors) {
+ if (neighbor == targetVertexId.get()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int getNumOutEdges() {
+ return neighbors.length;
+ }
+
+ @Override
+ public void sendMsgToAllEdges(final IntWritable message) {
+ for (int neighbor : neighbors) {
+ sendMsg(new IntWritable(neighbor), message);
+ }
+ }
+
+ @Override
+ public Iterable<IntWritable> getMessages() {
+ return new Iterable<IntWritable>() {
+ @Override
+ public Iterator<IntWritable> iterator() {
+ return new UnmodifiableIntArrayIterator(messages);
+ }
+ };
+ }
+
+ @Override
+ public void setMessages(Iterable<IntWritable> newMessages) {
+ messages = new int[Iterables.size(newMessages)];
+ int n = 0;
+ for (IntWritable message : newMessages) {
+ messages[n++] = message.get();
+ }
+ }
+
+ @Override
+ void releaseResources() {
+ messages = new int[0];
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ out.writeInt(id);
+ out.writeInt(value);
+ out.writeInt(neighbors.length);
+ for (int n = 0; n < neighbors.length; n++) {
+ out.writeInt(neighbors[n]);
+ }
+ out.writeInt(messages.length);
+ for (int n = 0; n < messages.length; n++) {
+ out.writeInt(messages[n]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ value = in.readInt();
+ int numEdges = in.readInt();
+ neighbors = new int[numEdges];
+ for (int n = 0; n < numEdges; n++) {
+ neighbors[n] = in.readInt();
+ }
+ int numMessages = in.readInt();
+ messages = new int[numMessages];
+ for (int n = 0; n < numMessages; n++) {
+ messages[n] = in.readInt();
+ }
+ }
+
+}
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1224678&r1=1224677&r2=1224678&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
Mon Dec 26 05:05:49 2011
@@ -40,13 +40,13 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
/**
- * a base class for running internal tests on a vertex
+ * A base class for running internal tests on a vertex
*
* Extending classes only have to invoke the run() method to test their
vertex. All data
* is written to a local tmp directory that is removed afterwards. A local
zookeeper
* instance is started in an extra thread and shutdown at the end.
*
- * heavily inspired from Apache Mahout's MahoutTestCase
+ * Heavily inspired from Apache Mahout's MahoutTestCase
*/
public class InternalVertexRunner {
@@ -69,24 +69,49 @@ public class InternalVertexRunner {
*/
public static Iterable<String> run(Class<?> vertexClass,
Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
- Map<String,String> params, String... data) throws Exception {
+ Map<String, String> params, String... data) throws Exception {
+ return run(vertexClass, null, vertexInputFormatClass,
+ vertexOutputFormatClass, params, data);
+ }
+
+ /**
+ * Attempts to run the vertex internally in the current JVM, reading from
and writing to a
+ * temporary folder on local disk. Will start an own zookeeper instance.
+ *
+ * @param vertexClass the vertex class to instantiate
+ * @param vertexCombinerClass the vertex combiner to use (or null)
+ * @param vertexInputFormatClass the inputformat to use
+ * @param vertexOutputFormatClass the outputformat to use
+ * @param params a map of parameters to add to the hadoop configuration
+ * @param data linewise input data
+ * @return linewise output data
+ * @throws Exception
+ */
+ public static Iterable<String> run(Class<?> vertexClass,
+ Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass,
+ Class<?> vertexOutputFormatClass, Map<String, String> params,
+ String... data) throws Exception {
File tmpDir = null;
try {
- /* prepare input file, output folder and zookeeper folder */
+ // prepare input file, output folder and zookeeper folder
tmpDir = createTestDir(vertexClass);
File inputFile = createTempFile(tmpDir, "graph.txt");
File outputDir = createTempDir(tmpDir, "output");
File zkDir = createTempDir(tmpDir, "zooKeeper");
- /* write input data to disk */
+ // write input data to disk
writeLines(inputFile, data);
- /* create and configure the job to run the vertex */
+ // create and configure the job to run the vertex
GiraphJob job = new GiraphJob(vertexClass.getName());
job.setVertexClass(vertexClass);
job.setVertexInputFormatClass(vertexInputFormatClass);
job.setVertexOutputFormatClass(vertexOutputFormatClass);
+
+ if (vertexCombinerClass != null) {
+ job.setVertexCombinerClass(vertexCombinerClass);
+ }
job.setWorkerConfiguration(1, 1, 100.0f);
Configuration conf = job.getConfiguration();
@@ -102,7 +127,7 @@ public class InternalVertexRunner {
FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
FileOutputFormat.setOutputPath(job, new
Path(outputDir.toString()));
- /* configure a local zookeeper instance */
+ // configure a local zookeeper instance
Properties zkProperties = new Properties();
zkProperties.setProperty("tickTime", "2000");
zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
@@ -118,8 +143,8 @@ public class InternalVertexRunner {
QuorumPeerConfig qpConfig = new QuorumPeerConfig();
qpConfig.parseProperties(zkProperties);
- /* create and run the zookeeper instance */
- final ZooKeeperServerMain zookeeper = new ZooKeeperServerMain();
+ // create and run the zookeeper instance
+ final InternalZooKeeper zookeeper = new InternalZooKeeper();
final ServerConfig zkConfig = new ServerConfig();
zkConfig.readFrom(qpConfig);
@@ -138,6 +163,7 @@ public class InternalVertexRunner {
job.run(true);
} finally {
executorService.shutdown();
+ zookeeper.end();
}
return Files.readLines(new File(outputDir, "part-m-00000"),
@@ -149,7 +175,9 @@ public class InternalVertexRunner {
}
}
- /* create a temporary folder that will be removed after the test */
+ /**
+ * Create a temporary folder that will be removed after the test
+ */
private static final File createTestDir(Class<?> vertexClass)
throws IOException {
String systemTmpDir = System.getProperty("java.io.tmpdir");
@@ -209,4 +237,13 @@ public class InternalVertexRunner {
}
}
+ /**
+ * Extension of {@link ZooKeeperServerMain} that allows programmatic
shutdown
+ */
+ private static class InternalZooKeeper extends ZooKeeperServerMain {
+ void end() {
+ shutdown();
+ }
+ }
+
}
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive int array
+ */
+public class UnmodifiableIntArrayIterator
+ extends UnmodifiableIterator<IntWritable> {
+
+ private final int[] arr;
+ private int offset;
+
+ public UnmodifiableIntArrayIterator(int[] arr) {
+ this.arr = arr;
+ offset = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < arr.length;
+ }
+
+ @Override
+ public IntWritable next() {
+ return new IntWritable(arr[offset++]);
+ }
+}
\ No newline at end of file
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
(added)
+++
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import junit.framework.TestCase;
+
+import org.apache.giraph.utils.InternalVertexRunner;
+
+import java.util.Set;
+
+/**
+ * Tests for {@link ConnectedComponentsVertex}
+ */
+public class ConnectedComponentsVertexTest extends TestCase {
+
+ /**
+ * A local integration test on toy data
+ */
+ public void testToyData() throws Exception {
+
+ // a small graph with three components
+ String[] graph = new String[] {
+ "1 2 3",
+ "2 1 4 5",
+ "3 1 4",
+ "4 2 3 5 13",
+ "5 2 4 12 13",
+ "12 5 13",
+ "13 4 5 12",
+
+ "6 7 8",
+ "7 6 10 11",
+ "8 6 10",
+ "10 7 8 11",
+ "11 7 10",
+
+ "9" };
+
+ // run internally
+ Iterable<String> results = InternalVertexRunner.run(
+ ConnectedComponentsVertex.class,
+ MinimumIntCombiner.class,
+ IntIntNullIntTextInputFormat.class,
+ VertexWithComponentTextOutputFormat.class,
+ Maps.<String,String>newHashMap(), graph);
+
+ SetMultimap<Integer,Integer> components = parseResults(results);
+
+ Set<Integer> componentIDs = components.keySet();
+ assertEquals(3, componentIDs.size());
+ assertTrue(componentIDs.contains(1));
+ assertTrue(componentIDs.contains(6));
+ assertTrue(componentIDs.contains(9));
+
+ Set<Integer> componentOne = components.get(1);
+ assertEquals(7, componentOne.size());
+ assertTrue(componentOne.contains(1));
+ assertTrue(componentOne.contains(2));
+ assertTrue(componentOne.contains(3));
+ assertTrue(componentOne.contains(4));
+ assertTrue(componentOne.contains(5));
+ assertTrue(componentOne.contains(12));
+ assertTrue(componentOne.contains(13));
+
+ Set<Integer> componentTwo = components.get(6);
+ assertEquals(5, componentTwo.size());
+ assertTrue(componentTwo.contains(6));
+ assertTrue(componentTwo.contains(7));
+ assertTrue(componentTwo.contains(8));
+ assertTrue(componentTwo.contains(10));
+ assertTrue(componentTwo.contains(11));
+
+ Set<Integer> componentThree = components.get(9);
+ assertEquals(1, componentThree.size());
+ assertTrue(componentThree.contains(9));
+ }
+
+ private SetMultimap<Integer,Integer> parseResults(
+ Iterable<String> results) {
+ SetMultimap<Integer,Integer> components = HashMultimap.create();
+ for (String result : results) {
+ Iterable<String> parts = Splitter.on('\t').split(result);
+ int vertex = Integer.parseInt(Iterables.get(parts, 0));
+ int component = Integer.parseInt(Iterables.get(parts, 1));
+ components.put(component, vertex);
+ }
+ return components;
+ }
+}
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java?rev=1224678&view=auto
==============================================================================
---
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
(added)
+++
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import junit.framework.TestCase;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.hadoop.io.IntWritable;
+
+import java.util.Arrays;
+
+public class MinimumIntCombinerTest extends TestCase {
+
+ public void testCombiner() throws Exception {
+
+ VertexCombiner<IntWritable, IntWritable> combiner =
+ new MinimumIntCombiner();
+
+ IntWritable result = combiner.combine(new IntWritable(1),
Arrays.asList(
+ new IntWritable(39947466), new IntWritable(199),
+ new IntWritable(19998888), new IntWritable(42)));
+
+ assertEquals(42, result.get());
+ }
+}
Modified:
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java?rev=1224678&r1=1224677&r2=1224678&view=diff
==============================================================================
---
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
(original)
+++
incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
Mon Dec 26 05:05:49 2011
@@ -33,10 +33,14 @@ import org.mockito.Mockito;
import java.util.Map;
-/** contains a simple unit test for {@link SimpleShortestPathsVertex} */
+/**
+ * Contains a simple unit test for {@link SimpleShortestPathsVertex}
+ */
public class SimpleShortestPathVertexTest extends TestCase {
- /** test the behavior when a shorter path to a vertex has been found */
+ /**
+ * Test the behavior when a shorter path to a vertex has been found
+ */
public void testOnShorterPathFound() throws Exception {
SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
@@ -62,7 +66,9 @@ public class SimpleShortestPathVertexTes
env.verifyMessageSent(new LongWritable(20L), new DoubleWritable(2));
}
- /** test the behavior when a new, but not shorter path to a vertex has
been found */
+ /**
+ * Test the behavior when a new, but not shorter path to a vertex has been
found
+ */
public void testOnNoShorterPathFound() throws Exception {
SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
@@ -86,21 +92,23 @@ public class SimpleShortestPathVertexTes
env.verifyNoMessageSent();
}
- /** a local integration test on toy data */
+ /**
+ * A local integration test on toy data
+ */
public void testToyData() throws Exception {
- /* a small four vertex graph */
+ // a small four vertex graph
String[] graph = new String[] {
"[1,0,[[2,1],[3,3]]]",
"[2,0,[[3,1],[4,10]]]",
"[3,0,[[4,2]]]",
"[4,0,[]]" };
- /* start from vertex 1 */
- Map<String,String> params = Maps.newHashMap();
+ // start from vertex 1
+ Map<String, String> params = Maps.newHashMap();
params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
- /* run internally */
+ // run internally
Iterable<String> results = InternalVertexRunner.run(
SimpleShortestPathsVertex.class,
SimpleShortestPathsVertex.
@@ -111,7 +119,7 @@ public class SimpleShortestPathVertexTes
Map<Long, Double> distances = parseDistances(results);
- /* verify results */
+ // verify results
assertNotNull(distances);
assertEquals(4, distances.size());
assertEquals(0.0, distances.get(1L));
@@ -120,8 +128,8 @@ public class SimpleShortestPathVertexTes
assertEquals(4.0, distances.get(4L));
}
- private Map<Long,Double> parseDistances(Iterable<String> results) {
- Map<Long,Double> distances =
+ private Map<Long, Double> parseDistances(Iterable<String> results) {
+ Map<Long, Double> distances =
Maps.newHashMapWithExpectedSize(Iterables.size(results));
for (String line : results) {
try {