Author: tjungblut
Date: Tue Jan 31 09:19:53 2012
New Revision: 1238308
URL: http://svn.apache.org/viewvc?rev=1238308&view=rev
Log:
[HAMA-493] Provide text to seq-file utils for graph examples
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
(with props)
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
(with props)
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
(with props)
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
(with props)
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
(with props)
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jan 31 09:19:53 2012
@@ -3,7 +3,8 @@ Hama Change Log
Release 0.4 - Unreleased
NEW FEATURES
-
+
+ HAMA-493: Provide text to seq-file utils for graph examples (tjungblut)
HAMA-491: Add dist module to generate release tarball (edwardyoon)
HAMA-451: Show Supersteps in waitForCompletion in YARN (tjungblut)
HAMA-479: Add Counters to Hama Jobs (edwardyoon)
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
Tue Jan 31 09:19:53 2012
@@ -178,9 +178,24 @@ public abstract class FileInputFormat<K,
FileStatus[] files = listStatus(job);
long totalSize = 0; // compute total size
- for (FileStatus file : files) { // check we have valid files
+ for (int i = 0; i < files.length; i++) { // check we have valid files
+ FileStatus file = files[i];
if (file.isDir()) {
- throw new IOException("Not a file: " + file.getPath());
+ final Path path = file.getPath();
+ if (path.getName().equals("hama-partitions")
+ || (job.get("bsp.partitioning.dir") != null && path.getName()
+ .equals(job.get("bsp.partitioning.dir")))) {
+ // if we find the partitioning dir, just remove it.
+ LOG.warn("Removing already existing partitioning directory " + path);
+ FileSystem fileSystem = path.getFileSystem(job.getConf());
+ if (!fileSystem.delete(path, true)) {
+ LOG.error("Remove failed.");
+ }
+ // remove this file from our initial list
+ files[i] = null;
+ } else {
+ throw new IOException("Not a file (dir): " + path);
+ }
}
totalSize += file.getLen();
}
@@ -189,8 +204,10 @@ public abstract class FileInputFormat<K,
// take the short circuit path if we have already partitioned
if (numSplits == files.length) {
for (FileStatus file : files) {
- splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
- new String[0]));
+ if (file != null) {
+ splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
+ new String[0]));
+ }
}
return splits.toArray(new FileSplit[splits.size()]);
}
@@ -202,36 +219,40 @@ public abstract class FileInputFormat<K,
// generate splits
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file : files) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConf());
- long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(fs, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
- LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
- + minSize + ", " + blockSize + ")");
-
- long bytesRemaining = length;
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations, length
- - bytesRemaining, splitSize, clusterMap);
- splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
- splitHosts));
- bytesRemaining -= splitSize;
- }
+ if (file != null) {
+ Path path = file.getPath();
+ FileSystem fs = path.getFileSystem(job.getConf());
+ long length = file.getLen();
+ BlockLocation[] blkLocations = fs
+ .getFileBlockLocations(file, 0, length);
+ if ((length != 0) && isSplitable(fs, path)) {
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+ LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+ + minSize + ", " + blockSize + ")");
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+ String[] splitHosts = getSplitHosts(blkLocations, length
+ - bytesRemaining, splitSize, clusterMap);
+ splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+ splitHosts));
+ bytesRemaining -= splitSize;
+ }
- if (bytesRemaining != 0) {
- splits
- .add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length - 1].getHosts()));
+ if (bytesRemaining != 0) {
+ splits.add(new FileSplit(path, length - bytesRemaining,
+ bytesRemaining, blkLocations[blkLocations.length - 1]
+ .getHosts()));
+ }
+ } else if (length != 0) {
+ String[] splitHosts = getSplitHosts(blkLocations, 0, length,
+ clusterMap);
+ splits.add(new FileSplit(path, 0, length, splitHosts));
+ } else {
+ // Create empty hosts array for zero length files
+ splits.add(new FileSplit(path, 0, length, new String[0]));
}
- } else if (length != 0) {
- String[] splitHosts = getSplitHosts(blkLocations, 0, length,
clusterMap);
- splits.add(new FileSplit(path, 0, length, splitHosts));
- } else {
- // Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.info("Total # of splits: " + splits.size());
@@ -266,8 +287,8 @@ public abstract class FileInputFormat<K,
* inputs for the map-reduce job.
*/
public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
- setInputPaths(conf, StringUtils
- .stringToPath(getPathStrings(commaSeparatedPaths)));
+ setInputPaths(conf,
+ StringUtils.stringToPath(getPathStrings(commaSeparatedPaths)));
}
/**
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
Tue Jan 31 09:19:53 2012
@@ -62,8 +62,7 @@ public class LocalBSPRunner implements J
private static final String IDENTIFIER = "localrunner";
private static String WORKING_DIR = "/tmp/hama-bsp/";
- protected static volatile ThreadPoolExecutor threadPool =
(ThreadPoolExecutor) Executors
- .newCachedThreadPool();
+ private volatile ThreadPoolExecutor threadPool;
@SuppressWarnings("rawtypes")
protected static final LinkedList<Future<BSP>> futureList = new
LinkedList<Future<BSP>>();
@@ -143,6 +142,8 @@ public class LocalBSPRunner implements J
splitFile.close();
}
}
+
+ threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(numBspTask);
peerNames = new String[numBspTask];
for (int i = 0; i < numBspTask; i++) {
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Tue Jan 31 09:19:53 2012
@@ -20,6 +20,8 @@
package org.apache.hama.examples;
import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.examples.util.SSSPTextToSeq;
public class ExampleDriver {
@@ -27,9 +29,11 @@ public class ExampleDriver {
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+ pgd.addClass("sssp-text2seq", SSSPTextToSeq.class, "Generates SSSP input
from textfile");
pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path");
pgd.addClass("cmb", CombineExample.class, "Combine");
pgd.addClass("bench", RandBench.class, "Random Benchmark");
+ pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class, "Generates
Pagerank input from textfile");
pgd.addClass("pagerank", PageRank.class, "PageRank");
pgd.driver(args);
} catch (Throwable e) {
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java?rev=1238308&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
(added)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
Tue Jan 31 09:19:53 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * This utility turns a Pagerank-formatted text file into a sequence file that
+ * can be inputted to the Pagerank example. <br/>
+ *
+ * <pre>
+ * Usage: <input path> <output path> <optional: text separator.
Default is \"\t\">
+ *
+ * </pre>
+ *
+ * So you may start this with: <br/>
+ *
+ * <pre>
+ * bin/hama -jar examples.jar pagerank-text2seq /tmp/in /tmp/out ";"
+ * </pre>
+ */
+public class PagerankTextToSeq extends TextToSequenceFile {
+
+ public PagerankTextToSeq(Path inPath, Path outPath, String delimiter)
+ throws IOException {
+ super(inPath, outPath, delimiter);
+ }
+
+ @Override
+ protected KeyValuePair<VertexWritable, VertexArrayWritable> processLine(
+ String line) throws IOException {
+ String[] split = line.split(delimiter);
+ VertexWritable key = new VertexWritable(split[0]);
+ VertexWritable[] v = new VertexWritable[split.length - 1];
+ for (int i = 1; i < split.length; i++) {
+ v[i - 1] = new VertexWritable(split[i]);
+ }
+ VertexArrayWritable value = new VertexArrayWritable();
+ value.set(v);
+ return new KeyValuePair<VertexWritable, VertexArrayWritable>(key, value);
+ }
+
+ @Override
+ protected Writer getWriter(Path outPath) throws IOException {
+ return new Writer(destFs, conf, outPath, VertexWritable.class,
+ VertexArrayWritable.class);
+ }
+
+ private static void printUsage() {
+ LOG.info("<input path> <output path> <optional: text separator. Default is
\"\t\">");
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 2 && args.length != 3) {
+ printUsage();
+ System.exit(-1);
+ }
+
+ String inPath = args[0];
+ String outPath = args[1];
+ String delimiter = "\t";
+ if (args.length > 2) {
+ delimiter = args[2];
+ }
+ PagerankTextToSeq o = new PagerankTextToSeq(new Path(inPath), new Path(
+ outPath), delimiter);
+ o.run();
+ }
+}
Propchange:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java?rev=1238308&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
(added)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
Tue Jan 31 09:19:53 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hama.examples.ShortestPathVertex;
+import org.apache.hama.examples.ShortestPathVertexArrayWritable;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * This utility turns a SSSP (Single Source Shortest Paths)-formatted text file
+ * into a sequence file that can be inputted to the SSSP example. <br/>
+ *
+ * <pre>
+ * Usage: <input path> <output path> <optional: text separator.
Default is \"\t\">
+ * <optional: edge weight separator. Default is \":\">
+ *
+ * </pre>
+ *
+ * So you may start this with: <br/>
+ *
+ * <pre>
+ * bin/hama -jar examples.jar sssp-text2seq /tmp/in /tmp/out ";" ":"
+ * </pre>
+ */
+public class SSSPTextToSeq extends TextToSequenceFile {
+
+ private final String edgeDelimiter;
+
+ public SSSPTextToSeq(Path inPath, Path outPath, String delimiter,
+ String edgeDelimiter) throws IOException {
+ super(inPath, outPath, delimiter);
+ this.edgeDelimiter = edgeDelimiter;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected KeyValuePair<VertexWritable, VertexArrayWritable> processLine(
+ String line) throws IOException {
+ String[] split = line.split(delimiter);
+ ShortestPathVertex key = new ShortestPathVertex(0, split[0]);
+ ShortestPathVertex[] v = new ShortestPathVertex[split.length - 1];
+ for (int i = 1; i < split.length; i++) {
+ String[] weightSplit = split[i].split(edgeDelimiter);
+ if (weightSplit.length != 2) {
+ LOG.error("Adjacent vertices must contain a \"" + edgeDelimiter
+ + "\" between the vertex name and the edge weight! Line was: "
+ + line);
+ }
+ v[i - 1] = new ShortestPathVertex(Integer.parseInt(weightSplit[1]),
+ weightSplit[0]);
+ }
+ ShortestPathVertexArrayWritable value = new
ShortestPathVertexArrayWritable();
+ value.set(v);
+ return new KeyValuePair(key, value);
+ }
+
+ @Override
+ protected Writer getWriter(Path outPath) throws IOException {
+ return new Writer(destFs, conf, outPath, ShortestPathVertex.class,
+ ShortestPathVertexArrayWritable.class);
+ }
+
+ private static void printUsage() {
+ LOG.info("<input path> <output path> <optional: text separator. Default is
\"\t\"> <optional: edge weight separator. Default is \":\">");
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 2 && args.length != 3 && args.length != 4) {
+ printUsage();
+ System.exit(-1);
+ }
+
+ String inPath = args[0];
+ String outPath = args[1];
+ String delimiter = "\t";
+ if (args.length > 2) {
+ delimiter = args[2];
+ }
+ String edgeDelimiter = ":";
+ if (args.length > 3) {
+ edgeDelimiter = args[3];
+ }
+ SSSPTextToSeq o = new SSSPTextToSeq(new Path(inPath), new Path(outPath),
+ delimiter, edgeDelimiter);
+ o.run();
+ }
+}
Propchange:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/SSSPTextToSeq.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java?rev=1238308&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
(added)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
Tue Jan 31 09:19:53 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+import org.apache.hama.util.KeyValuePair;
+
+/**
+ * Abstract base class for turning a text graph into a sequence file. It offers
+ * help for multiple inputs in a directory.
+ */
+public abstract class TextToSequenceFile {
+
+ protected static final Log LOG = LogFactory.getLog(TextToSequenceFile.class);
+
+ protected final Path inPath;
+ protected final Path outPath;
+ protected final String delimiter;
+ protected final Configuration conf;
+ protected final FileSystem sourceFs;
+ protected final FileSystem destFs;
+
+ public TextToSequenceFile(Path inPath, Path outPath, String delimiter)
+ throws IOException {
+ super();
+ this.inPath = inPath;
+ this.outPath = outPath;
+ this.delimiter = delimiter;
+
+ this.conf = new Configuration();
+ this.sourceFs = inPath.getFileSystem(conf);
+ this.destFs = outPath.getFileSystem(conf);
+ }
+
+ public final void run() throws IOException {
+ final FileStatus[] stati = sourceFs.globStatus(inPath);
+ if (stati == null || stati.length == 0) {
+ throw new FileNotFoundException("Cannot access " + inPath
+ + ": No such file or directory.");
+ }
+
+ for (int i = 0; i < stati.length; i++) {
+ final Path p = stati[i].getPath();
+ if (!sourceFs.getFileStatus(p).isDir()) {
+ Writer writer = null;
+ try {
+ LOG.info("Processing file : " + p);
+ Path out = new Path(outPath, p.getName() + ".seq");
+ writer = getWriter(out);
+ processFile(p, writer);
+ LOG.info("Written " + writer.getLength() + " bytes to " + out);
+ } finally {
+ if (writer != null)
+ writer.close();
+ }
+ } else {
+ LOG.warn("Skipping dir : " + p);
+ }
+ }
+
+ }
+
+ private final void processFile(Path p, SequenceFile.Writer writer)
+ throws IOException {
+ final FileStatus fileStatus = sourceFs.getFileStatus(p);
+ if (sourceFs.exists(p) && !fileStatus.isDir()) {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new InputStreamReader(sourceFs.open(p)));
+ String line;
+ while ((line = br.readLine()) != null) {
+ final KeyValuePair<VertexWritable, VertexArrayWritable> kv =
processLine(line);
+ writer.append(kv.getKey(), kv.getValue());
+ }
+ } finally {
+ if (br != null)
+ br.close();
+ }
+ } else {
+ LOG.error(p + " is a directory or does not exist!");
+ }
+ }
+
+ protected abstract SequenceFile.Writer getWriter(Path outPath)
+ throws IOException;
+
+ protected abstract KeyValuePair<VertexWritable, VertexArrayWritable>
processLine(
+ String line) throws IOException;
+
+}
Propchange:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Tue Jan 31 09:19:53 2012
@@ -1,5 +1,7 @@
package org.apache.hama.examples;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -13,22 +15,64 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.util.PagerankTextToSeq;
import org.apache.hama.graph.VertexArrayWritable;
import org.apache.hama.graph.VertexWritable;
public class PageRankTest extends TestCase {
+ /**
+ * The graph looks like this (adjacency list, [] contains outlinks):<br/>
+ * stackoverflow.com [yahoo.com] <br/>
+ * google.com []<br/>
+ * facebook.com [twitter.com, google.com, nasa.gov]<br/>
+ * yahoo.com [nasa.gov, stackoverflow.com]<br/>
+ * twitter.com [google.com, facebook.com]<br/>
+ * nasa.gov [yahoo.com, stackoverflow.com]<br/>
+ * youtube.com [google.com, yahoo.com]<br/>
+ */
+ private static final Map<VertexWritable, VertexArrayWritable> tmp = new
HashMap<VertexWritable, VertexArrayWritable>();
+ static {
+ // our first entry is null, because our indices in hama 3.0 pre calculated
+ // example starts at 1.
+ // FIXME This is really ugly.
+ String[] pages = new String[] { null, "twitter.com", "google.com",
+ "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
+ "youtube.com" };
+ String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
+ "5;4;6", "6;4", "7;2;4" };
+
+ for (int i = 0; i < lineArray.length; i++) {
+ String[] adjacencyStringArray = lineArray[i].split(";");
+ int vertexId = Integer.parseInt(adjacencyStringArray[0]);
+ String name = pages[vertexId];
+ VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length -
1];
+ for (int j = 1; j < adjacencyStringArray.length; j++) {
+ arr[j - 1] = new VertexWritable(
+ pages[Integer.parseInt(adjacencyStringArray[j])]);
+ }
+ VertexArrayWritable wr = new VertexArrayWritable();
+ wr.set(arr);
+ tmp.put(new VertexWritable(name), wr);
+ }
+ }
private static String INPUT = "/tmp/pagerank-tmp.seq";
+ private static String TEXT_INPUT = "/tmp/pagerank.txt";
+ private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
private static String OUTPUT = "/tmp/pagerank-out";
- private Configuration conf;
+ private Configuration conf = new HamaConfiguration();
private FileSystem fs;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
public void testPageRank() throws IOException, InterruptedException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
- conf = new HamaConfiguration();
- fs = FileSystem.get(conf);
- generateTestData();
+ generateSeqTestData();
try {
PageRank.main(new String[] { INPUT, OUTPUT, "0.85", "0.000001" });
verifyResult();
@@ -37,6 +81,20 @@ public class PageRankTest extends TestCa
}
}
+ public void testPageRankUtil() throws IOException, InterruptedException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ generateTestTextData();
+ // <input path> <output path>
+ PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+ try {
+ PageRank.main(new String[] { TEXT_OUTPUT, OUTPUT, "0.85", "0.000001" });
+
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
private void verifyResult() throws IOException {
Map<String, Double> rs = new HashMap<String, Double>();
// our desired results
@@ -62,43 +120,7 @@ public class PageRankTest extends TestCa
assertEquals(sum, 1.0d);
}
- /**
- * The graph looks like this (adjacency list, [] contains outlinks):<br/>
- * stackoverflow.com [yahoo.com] <br/>
- * google.com []<br/>
- * facebook.com [twitter.com, google.com, nasa.gov]<br/>
- * yahoo.com [nasa.gov, stackoverflow.com]<br/>
- * twitter.com [google.com, facebook.com]<br/>
- * nasa.gov [yahoo.com, stackoverflow.com]<br/>
- * youtube.com [google.com, yahoo.com]<br/>
- */
- private void generateTestData() throws IOException {
- Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable,
VertexArrayWritable>();
-
- // our first entry is null, because our indices in hama 3.0 pre calculated
- // example starts at 1.
- // FIXME This is really ugly.
- String[] pages = new String[] { null, "twitter.com", "google.com",
- "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
- "youtube.com" };
- String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
- "5;4;6", "6;4", "7;2;4" };
-
- for (int i = 0; i < lineArray.length; i++) {
-
- String[] adjacencyStringArray = lineArray[i].split(";");
- int vertexId = Integer.parseInt(adjacencyStringArray[0]);
- String name = pages[vertexId];
- VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length -
1];
- for (int j = 1; j < adjacencyStringArray.length; j++) {
- arr[j - 1] = new VertexWritable(
- pages[Integer.parseInt(adjacencyStringArray[j])]);
- }
- VertexArrayWritable wr = new VertexArrayWritable();
- wr.set(arr);
- tmp.put(new VertexWritable(name), wr);
- }
-
+ private void generateSeqTestData() throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
INPUT), VertexWritable.class, VertexArrayWritable.class);
for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
@@ -107,12 +129,29 @@ public class PageRankTest extends TestCa
writer.close();
}
+ private void generateTestTextData() throws IOException {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
+ for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+ writer.write(e.getKey() + "\t");
+ for (int i = 0; i < e.getValue().get().length; i++) {
+ VertexWritable writable = (VertexWritable) e.getValue().get()[i];
+ writer.write(writable.getName() + "\t");
+ }
+ writer.write("\n");
+ }
+ writer.close();
+ }
+
private void deleteTempDirs() {
try {
if (fs.exists(new Path(INPUT)))
fs.delete(new Path(INPUT), true);
if (fs.exists(new Path(OUTPUT)))
fs.delete(new Path(OUTPUT), true);
+ if (fs.exists(new Path(TEXT_INPUT)))
+ fs.delete(new Path(TEXT_INPUT), true);
+ if (fs.exists(new Path(TEXT_OUTPUT)))
+ fs.delete(new Path(TEXT_OUTPUT), true);
} catch (IOException e) {
e.printStackTrace();
}
Modified:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java?rev=1238308&r1=1238307&r2=1238308&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
(original)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
Tue Jan 31 09:19:53 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hama.examples;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -30,56 +32,17 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.util.SSSPTextToSeq;
/**
* Testcase for {@link ShortestPaths}
*/
public class ShortestPathsTest extends TestCase {
- private static String INPUT = "/tmp/sssp-tmp.seq";
- private static String OUTPUT = "/tmp/sssp-out";
- private Configuration conf;
- private FileSystem fs;
-
- public void testShortestPaths() throws IOException, InterruptedException,
- ClassNotFoundException, InstantiationException, IllegalAccessException {
- conf = new HamaConfiguration();
- fs = FileSystem.get(conf);
-
- generateTestData();
- try {
- ShortestPaths.main(new String[] { "Frankfurt", INPUT, OUTPUT });
-
- verifyResult();
- } finally {
- deleteTempDirs();
- }
- }
-
- private void verifyResult() throws IOException {
- Map<String, Integer> rs = new HashMap<String, Integer>();
- rs.put("Erfurt", 403);
- rs.put("Mannheim", 85);
- rs.put("Stuttgart", 503);
- rs.put("Kassel", 173);
- rs.put("Nuernberg", 320);
- rs.put("Augsburg", 415);
- rs.put("Frankfurt", 0);
- rs.put("Muenchen", 487);
- rs.put("Wuerzburg", 217);
- rs.put("Karlsruhe", 165);
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
- + "/part-00000"), conf);
- Text key = new Text();
- IntWritable value = new IntWritable();
- while (reader.next(key, value)) {
- assertEquals(value.get(), (int) rs.get(key.toString()));
- }
- }
+ private static final Map<ShortestPathVertex,
ShortestPathVertexArrayWritable> testData = new HashMap<ShortestPathVertex,
ShortestPathVertexArrayWritable>();
- private void generateTestData() throws IOException {
- Map<ShortestPathVertex, ShortestPathVertexArrayWritable> tmp = new
HashMap<ShortestPathVertex, ShortestPathVertexArrayWritable>();
+ static {
String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
"Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
"Muenchen" };
@@ -92,26 +55,26 @@ public class ShortestPathsTest extends T
textArr[2] = new ShortestPathVertex(217, "Wuerzburg");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Stuttgart")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[1];
textArr[0] = new ShortestPathVertex(183, "Nuernberg");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Kassel")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[2];
textArr[0] = new ShortestPathVertex(502, "Muenchen");
textArr[1] = new ShortestPathVertex(173, "Frankfurt");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Erfurt")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[1];
textArr[0] = new ShortestPathVertex(186, "Wuerzburg");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Wuerzburg")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[3];
textArr[0] = new ShortestPathVertex(217, "Frankfurt");
@@ -119,28 +82,28 @@ public class ShortestPathsTest extends T
textArr[2] = new ShortestPathVertex(103, "Nuernberg");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Mannheim")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[2];
textArr[0] = new ShortestPathVertex(80, "Karlsruhe");
textArr[1] = new ShortestPathVertex(85, "Frankfurt");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Karlsruhe")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[2];
textArr[0] = new ShortestPathVertex(250, "Augsburg");
textArr[1] = new ShortestPathVertex(80, "Mannheim");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Augsburg")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[2];
textArr[0] = new ShortestPathVertex(250, "Karlsruhe");
textArr[1] = new ShortestPathVertex(84, "Muenchen");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Nuernberg")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[3];
textArr[0] = new ShortestPathVertex(183, "Stuttgart");
@@ -148,7 +111,7 @@ public class ShortestPathsTest extends T
textArr[2] = new ShortestPathVertex(103, "Wuerzburg");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
} else if (city.equals("Muenchen")) {
ShortestPathVertex[] textArr = new ShortestPathVertex[3];
textArr[0] = new ShortestPathVertex(167, "Nuernberg");
@@ -156,26 +119,109 @@ public class ShortestPathsTest extends T
textArr[2] = new ShortestPathVertex(84, "Augsburg");
ShortestPathVertexArrayWritable arr = new
ShortestPathVertexArrayWritable();
arr.set(textArr);
- tmp.put(new ShortestPathVertex(0, city), arr);
+ testData.put(new ShortestPathVertex(0, city), arr);
}
}
+ }
+
+ private static String INPUT = "/tmp/sssp-tmp.seq";
+ private static String TEXT_INPUT = "/tmp/sssp.txt";
+ private static String TEXT_OUTPUT = INPUT + "sssp.txt.seq";
+ private static String OUTPUT = "/tmp/sssp-out";
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ public void testShortestPaths() throws IOException, InterruptedException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ generateTestSequenceFileData();
+ try {
+ ShortestPaths.main(new String[] { "Frankfurt", INPUT, OUTPUT });
+
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+ public void testShortestPathsUtil() throws IOException, InterruptedException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ generateTestTextData();
+ // <input path> <output path>
+ SSSPTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+ try {
+ ShortestPaths.main(new String[] { "Frankfurt", TEXT_OUTPUT, OUTPUT });
+
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+ private void verifyResult() throws IOException {
+ Map<String, Integer> rs = new HashMap<String, Integer>();
+ rs.put("Erfurt", 403);
+ rs.put("Mannheim", 85);
+ rs.put("Stuttgart", 503);
+ rs.put("Kassel", 173);
+ rs.put("Nuernberg", 320);
+ rs.put("Augsburg", 415);
+ rs.put("Frankfurt", 0);
+ rs.put("Muenchen", 487);
+ rs.put("Wuerzburg", 217);
+ rs.put("Karlsruhe", 165);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
+ + "/part-00000"), conf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ while (reader.next(key, value)) {
+ assertEquals(value.get(), (int) rs.get(key.toString()));
+ }
+ }
+
+ private void generateTestSequenceFileData() throws IOException {
SequenceFile.Writer writer = SequenceFile
.createWriter(fs, conf, new Path(INPUT), ShortestPathVertex.class,
ShortestPathVertexArrayWritable.class);
- for (Map.Entry<ShortestPathVertex, ShortestPathVertexArrayWritable> e : tmp
+ for (Map.Entry<ShortestPathVertex, ShortestPathVertexArrayWritable> e :
testData
.entrySet()) {
writer.append(e.getKey(), e.getValue());
}
writer.close();
}
-
+
+ private void generateTestTextData() throws IOException {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
+ for (Map.Entry<ShortestPathVertex, ShortestPathVertexArrayWritable> e :
testData
+ .entrySet()) {
+ writer.write(e.getKey().getName() + "\t");
+ for (int i = 0; i < e.getValue().get().length; i++) {
+ writer.write(((ShortestPathVertex) e.getValue().get()[i]).getName()
+ + ":" + ((ShortestPathVertex) e.getValue().get()[i]).getWeight()
+ + "\t");
+ }
+ writer.write("\n");
+ }
+ writer.close();
+ }
+
private void deleteTempDirs() {
try {
if (fs.exists(new Path(INPUT)))
fs.delete(new Path(INPUT), true);
if (fs.exists(new Path(OUTPUT)))
fs.delete(new Path(OUTPUT), true);
+ if (fs.exists(new Path(TEXT_INPUT)))
+ fs.delete(new Path(TEXT_INPUT), true);
+ if (fs.exists(new Path(TEXT_OUTPUT)))
+ fs.delete(new Path(TEXT_OUTPUT), true);
} catch (IOException e) {
e.printStackTrace();
}
Added:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java?rev=1238308&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
(added)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
Tue Jan 31 09:19:53 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+
+public class PagerankTextToSeqTest extends TestCase {
+
+ private static final String DELIMITER = ";";
+ private static final String TXT_INPUT_DIR = "/tmp/pageranktext/";
+ private static final String TXT_INPUT = TXT_INPUT_DIR + "in.txt";
+ private static final String SEQ_OUTPUT = "/tmp/pageranktext/";
+ private static final String SEQ_INPUT = SEQ_OUTPUT + "in.txt.seq";
+
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ deleteTempDirs();
+ File dir = new File(TXT_INPUT_DIR);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ }
+
+ private void writeTextFile() throws IOException {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(TXT_INPUT));
+ for (int lines = 0; lines < 10; lines++) {
+ for (int cols = 0; cols < 5; cols++) {
+ writer.append(cols + DELIMITER);
+ }
+ writer.append("\n");
+ }
+ writer.close();
+ }
+
+ private void verifyOutput() throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ new Path(SEQ_INPUT), conf);
+ VertexWritable vertex = new VertexWritable();
+ VertexArrayWritable vertexArray = new VertexArrayWritable();
+
+
+ while (reader.next(vertex, vertexArray)) {
+ int count = 0;
+ assertEquals(vertex.getName(), count + "");
+ Writable[] writables = vertexArray.get();
+ assertEquals(writables.length, 4);
+ for (int i = 0; i < 4; i++) {
+ count++;
+ assertEquals(((VertexWritable) writables[i]).getName(), count + "");
+ }
+ }
+ reader.close();
+ }
+
+ public void testArgs() throws Exception {
+ writeTextFile();
+ PagerankTextToSeq.main(new String[] { TXT_INPUT, SEQ_OUTPUT, DELIMITER });
+ verifyOutput();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ deleteTempDirs();
+ }
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(TXT_INPUT_DIR)))
+ fs.delete(new Path(TXT_INPUT_DIR), true);
+ if (fs.exists(new Path(TXT_INPUT)))
+ fs.delete(new Path(TXT_INPUT), true);
+ if (fs.exists(new Path(SEQ_OUTPUT)))
+ fs.delete(new Path(SEQ_OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java?rev=1238308&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
(added)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
Tue Jan 31 09:19:53 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.ShortestPathVertex;
+import org.apache.hama.examples.ShortestPathVertexArrayWritable;
+
+public class SSSPTextToSeqTest extends TestCase {
+
+ private static final String DELIMITER = ";";
+ private static final String EDGE_DELIMITER = ":";
+ private static final String TXT_INPUT_DIR = "/tmp/sssptest/";
+ private static final String TXT_INPUT = TXT_INPUT_DIR + "in.txt";
+ private static final String SEQ_OUTPUT = "/tmp/sssptest/";
+ private static final String SEQ_INPUT = SEQ_OUTPUT + "in.txt.seq";
+
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ deleteTempDirs();
+ File dir = new File(TXT_INPUT_DIR);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ }
+
+ private void writeTextFile() throws IOException {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(TXT_INPUT));
+ for (int lines = 0; lines < 10; lines++) {
+ writer.append(lines + DELIMITER);
+ for (int cols = 0; cols < 5; cols++) {
+ writer.append(cols + EDGE_DELIMITER + lines + "" + DELIMITER);
+ }
+ writer.append("\n");
+ }
+ writer.close();
+ }
+
+ private void verifyOutput() throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ new Path(SEQ_INPUT), conf);
+ ShortestPathVertex vertex = new ShortestPathVertex();
+ ShortestPathVertexArrayWritable vertexArray = new
ShortestPathVertexArrayWritable();
+
+ int lines = 0;
+ while (reader.next(vertex, vertexArray)) {
+ int count = 0;
+ assertEquals(vertex.getName(), lines + "");
+ assertEquals(vertex.getWeight(), 0);
+ Writable[] writables = vertexArray.get();
+ assertEquals(writables.length, 5);
+ for (int i = 0; i < 5; i++) {
+ assertEquals(((ShortestPathVertex) writables[i]).getName(), count +
"");
+ assertEquals(((ShortestPathVertex) writables[i]).getWeight(), lines);
+ count++;
+ }
+ lines++;
+ }
+ reader.close();
+ }
+
+ public void testArgs() throws Exception {
+ writeTextFile();
+ SSSPTextToSeq.main(new String[] { TXT_INPUT, SEQ_OUTPUT, DELIMITER,
EDGE_DELIMITER });
+ verifyOutput();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ deleteTempDirs();
+ }
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(TXT_INPUT_DIR)))
+ fs.delete(new Path(TXT_INPUT_DIR), true);
+ if (fs.exists(new Path(TXT_INPUT)))
+ fs.delete(new Path(TXT_INPUT), true);
+ if (fs.exists(new Path(SEQ_OUTPUT)))
+ fs.delete(new Path(SEQ_OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
------------------------------------------------------------------------------
svn:eol-style = native