Author: edwardyoon
Date: Wed Dec 12 05:32:53 2012
New Revision: 1420530
URL: http://svn.apache.org/viewvc?rev=1420530&view=rev
Log:
Reimplementation of partitioner
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Dec 12 05:32:53 2012
@@ -10,6 +10,8 @@ Release 0.7 (unreleased changes)
IMPROVEMENTS
+ HAMA-531: Reimplementation of partitioner (edwardyoon)
+
Release 0.6 - November 28, 2012
NEW FEATURES
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Dec 12
05:32:53 2012
@@ -22,6 +22,7 @@ import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -219,8 +220,41 @@ public class BSPJob extends BSPJobContex
state = JobState.RUNNING;
}
+ boolean isPartitioned = false;
+
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
+ if (this.getConfiguration().get("bsp.input.partitioner.class") != null
+ && !isPartitioned) {
+ FileSystem fs = FileSystem.get(conf);
+ Path inputDir = new Path(conf.get("bsp.input.dir"));
+ if (fs.isFile(inputDir)) {
+ inputDir = inputDir.getParent();
+ }
+ Path partitionDir = new Path(inputDir + "/partitions");
+
+ if (fs.exists(partitionDir)) {
+ fs.delete(partitionDir, true);
+ }
+
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.setInt("desired.num.of.tasks",
+ Integer.parseInt(this.getConfiguration().get("bsp.peers.num")));
+ BSPJob partitioningJob = new BSPJob(conf);
+ partitioningJob.setInputPath(new Path(this.getConfiguration().get(
+ "bsp.input.dir")));
+ partitioningJob.setInputFormat(this.getInputFormat().getClass());
+ partitioningJob.setInputKeyClass(this.getInputKeyClass());
+ partitioningJob.setInputValueClass(getInputValueClass());
+ partitioningJob.setOutputFormat(NullOutputFormat.class);
+ partitioningJob.setBspClass(PartitioningRunner.class);
+
+ isPartitioned = partitioningJob.waitForCompletion(true);
+ if (isPartitioned) {
+ this.setInputPath(new Path(inputDir + "/partitions"));
+ }
+ }
+
if (state == JobState.DEFINE) {
submit();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Dec
12 05:32:53 2012
@@ -30,9 +30,7 @@ import java.io.OutputStreamWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
@@ -55,7 +53,6 @@ import org.apache.hadoop.io.WritableUtil
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -301,9 +298,10 @@ public class BSPJobClient extends Config
throws IOException {
BSPJob job = pJob;
job.setJobID(jobId);
- int limitTasks =
job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0);
+ int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+ 0);
int maxTasks = checkTaskLimits(job, limitTasks);
-
+
Path submitJobDir = new Path(getSystemDir(), "submit_"
+ Integer.toString(Math.abs(r.nextInt()), 36));
Path submitSplitFile = new Path(submitJobDir, "job.split");
@@ -325,12 +323,6 @@ public class BSPJobClient extends Config
if (job.get("bsp.input.dir") != null) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- if (job.getConfiguration().get("bsp.input.partitioner.class") != null
- && !job.getConfiguration()
- .getBoolean("hama.graph.runtime.partitioning", false)) {
- job = partition(job, maxTasks);
- maxTasks = job.getInt("hama.partition.count", maxTasks);
- }
job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -375,15 +367,16 @@ public class BSPJobClient extends Config
protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException
{
int maxTasks;
ClusterStatus clusterStatus = getClusterStatus(true);
-
- if(limitTasks > 0) {
+
+ if (limitTasks > 0) {
maxTasks = limitTasks;
} else {
maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
}
-
+
if (maxTasks < job.getNumBspTask()) {
- throw new IOException("Job failed! The number of tasks has exceeded the
maximum allowed.");
+ throw new IOException(
+ "Job failed! The number of tasks has exceeded the maximum allowed.");
}
return maxTasks;
}
@@ -402,97 +395,10 @@ public class BSPJobClient extends Config
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
- InputSplit[] splits = job.getInputFormat().getSplits(
- job,
- (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
- : maxTasks);
-
- String input = job.getConfiguration().get("bsp.input.dir");
-
- if (input != null) {
- InputFormat<?, ?> inputFormat = job.getInputFormat();
-
- Path partitionedPath = new Path(input, "hama-partitions");
- Path inputPath = new Path(input);
- if (fs.isFile(inputPath)) {
- partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
- }
-
- String alternatePart = job.get("bsp.partitioning.dir");
- if (alternatePart != null) {
- partitionedPath = new Path(alternatePart, job.getJobID().toString());
- }
-
- if (fs.exists(partitionedPath)) {
- fs.delete(partitionedPath, true);
- } else {
- fs.mkdirs(partitionedPath);
- }
- // FIXME this is soo unsafe
- RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
-
- List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- splits.length);
-
- CompressionType compressionType = getOutputCompressionType(job);
- Class<? extends CompressionCodec> outputCompressorClass =
getOutputCompressorClass(
- job, null);
- CompressionCodec codec = null;
- if (outputCompressorClass != null) {
- codec = ReflectionUtils.newInstance(outputCompressorClass,
- job.getConfiguration());
- }
-
- try {
- for (int i = 0; i < splits.length; i++) {
- Path p = new Path(partitionedPath, getPartitionName(i));
- if (codec == null) {
- writers.add(SequenceFile.createWriter(fs, job.getConfiguration(),
p,
- sampleReader.createKey().getClass(), sampleReader.createValue()
- .getClass(), CompressionType.NONE));
- } else {
- writers.add(SequenceFile.createWriter(fs, job.getConfiguration(),
p,
- sampleReader.createKey().getClass(), sampleReader.createValue()
- .getClass(), compressionType, codec));
- }
- }
-
- Partitioner partitioner = job.getPartitioner();
- for (int i = 0; i < splits.length; i++) {
- InputSplit split = splits[i];
- RecordReader recordReader = inputFormat.getRecordReader(split, job);
- Object key = recordReader.createKey();
- Object value = recordReader.createValue();
- while (recordReader.next(key, value)) {
- int index = Math.abs(partitioner.getPartition(key, value,
- splits.length));
- writers.get(index).append(key, value);
- }
- LOG.debug("Done with split " + i);
- }
- } finally {
- for (SequenceFile.Writer wr : writers) {
- wr.close();
- }
- }
- job.set("hama.partition.count", writers.size() + "");
- job.setInputFormat(SequenceFileInputFormat.class);
- job.setInputPath(partitionedPath);
- }
-
- return job;
- }
-
private static boolean isProperSize(int numBspTask, int maxTasks) {
return (numBspTask > 1 && numBspTask < maxTasks);
}
- private static String getPartitionName(int i) {
- return "part-" + String.valueOf(100000 + i).substring(1, 6);
- }
-
/**
* Get the {@link CompressionType} for the output {@link SequenceFile}.
*
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1420530&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Wed Dec 12 05:32:53 2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+public class PartitioningRunner extends
+ BSP<Writable, Writable, Writable, Writable, NullWritable> {
+ private Configuration conf;
+ private int desiredNum;
+ private FileSystem fs = null;
+ private Path partitionDir;
+ private Map<Integer, Map<Writable, Writable>> values = new HashMap<Integer,
Map<Writable, Writable>>();
+
+ @Override
+ public final void setup(
+ BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ this.conf = peer.getConfiguration();
+ this.desiredNum = conf.getInt("desired.num.of.tasks", 1);
+ this.fs = FileSystem.get(conf);
+
+ Path inputDir = new Path(conf.get("bsp.input.dir"));
+ if (fs.isFile(inputDir)) {
+ inputDir = inputDir.getParent();
+ }
+
+ this.partitionDir = new Path(inputDir + "/partitions");
+ }
+
+ @Override
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void bsp(
+ BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+ throws IOException, SyncException, InterruptedException {
+ Partitioner partitioner = getPartitioner();
+ KeyValuePair<Writable, Writable> pair = null;
+
+ Class keyClass = null;
+ Class valueClass = null;
+ while ((pair = peer.readNext()) != null) {
+ if (keyClass == null && valueClass == null) {
+ keyClass = pair.getKey().getClass();
+ valueClass = pair.getValue().getClass();
+ }
+
+ int index = Math.abs(partitioner.getPartition(pair.getKey(),
+ pair.getValue(), desiredNum));
+
+ if (!values.containsKey(index)) {
+ values.put(index, new HashMap<Writable, Writable>());
+ }
+ values.get(index).put(pair.getKey(), pair.getValue());
+ }
+
+ for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
+ Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+ + peer.getPeerIndex());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ destFile, keyClass, valueClass, CompressionType.NONE);
+ for (Map.Entry<Writable, Writable> v : e.getValue().entrySet()) {
+ writer.append(v.getKey(), v.getValue());
+ }
+ writer.close();
+ }
+
+ peer.sync();
+
+ // merge files into one.
+ FileStatus[] status = fs.listStatus(partitionDir);
+ for (int j = 0; j < status.length; j++) {
+ int idx =
Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
+ int assignedID = idx / (desiredNum / peer.getNumPeers());
+ if (assignedID == peer.getNumPeers())
+ assignedID = assignedID - 1;
+
+ if (assignedID == peer.getPeerIndex()) {
+ FileStatus[] files = fs.listStatus(status[j].getPath());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ new Path(partitionDir + "/" + getPartitionName(j)), keyClass,
+ valueClass, CompressionType.NONE);
+
+ for (int i = 0; i < files.length; i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ files[i].getPath(), conf);
+
+ Writable key = (Writable) ReflectionUtils.newInstance(keyClass,
conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(valueClass,
conf);
+
+ while (reader.next(key, value)) {
+ writer.append(key, value);
+ }
+ reader.close();
+ }
+
+ writer.close();
+ fs.delete(status[j].getPath(), true);
+ }
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Partitioner getPartitioner() {
+ return ReflectionUtils.newInstance(conf
+ .getClass("bsp.input.partitioner.class", HashPartitioner.class,
+ Partitioner.class), conf);
+ }
+
+ private static String getPartitionName(int i) {
+ return "part-" + String.valueOf(100000 + i).substring(1, 6);
+ }
+
+}
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
Wed Dec 12 05:32:53 2012
@@ -22,17 +22,15 @@ import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
-import org.apache.hama.bsp.TextInputFormat;
-import org.apache.hama.graph.Edge;
+import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexInputReader;
public class InlinkCount extends Vertex<Text, NullWritable, IntWritable> {
@@ -51,34 +49,6 @@ public class InlinkCount extends Vertex<
}
}
- public static class InlinkCountTextReader extends
- VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
-
- /**
- * The text file essentially should look like: <br/>
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
- * E.G:<br/>
- * 1\t2\t3\t4<br/>
- * 2\t3\t1<br/>
- * etc.
- */
- @Override
- public boolean parseVertex(LongWritable key, Text value,
- Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
- String[] split = value.toString().split("\t");
- for (int i = 0; i < split.length; i++) {
- if (i == 0) {
- vertex.setVertexID(new Text(split[i]));
- } else {
- vertex
- .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
- }
- }
- return true;
- }
-
- }
-
private static void printUsage() {
System.out.println("Usage: <input> <output> [tasks]");
System.exit(-1);
@@ -104,14 +74,14 @@ public class InlinkCount extends Vertex<
}
inlinkJob.setVertexClass(InlinkCount.class);
- inlinkJob.setInputFormat(TextInputFormat.class);
- inlinkJob.setInputKeyClass(LongWritable.class);
- inlinkJob.setInputValueClass(Text.class);
+
+ inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+ inlinkJob.setInputKeyClass(Text.class);
+ inlinkJob.setInputValueClass(TextArrayWritable.class);
inlinkJob.setVertexIDClass(Text.class);
inlinkJob.setVertexValueClass(IntWritable.class);
inlinkJob.setEdgeValueClass(NullWritable.class);
- inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class);
inlinkJob.setPartitioner(HashPartitioner.class);
inlinkJob.setOutputFormat(SequenceFileOutputFormat.class);
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
(original)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
Wed Dec 12 05:32:53 2012
@@ -22,18 +22,17 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexInputReader;
/**
* Finding the mindist vertex in a connected component.
@@ -97,34 +96,6 @@ public class MindistSearch {
}
- public static class MindistSearchCountReader extends
- VertexInputReader<LongWritable, Text, Text, NullWritable, Text> {
-
- /**
- * The text file essentially should look like: <br/>
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
- * E.G:<br/>
- * 1\t2\t3\t4<br/>
- * 2\t3\t1<br/>
- * etc.
- */
- @Override
- public boolean parseVertex(LongWritable key, Text value,
- Vertex<Text, NullWritable, Text> vertex) throws Exception {
- String[] split = value.toString().split("\t");
- for (int i = 0; i < split.length; i++) {
- if (i == 0) {
- vertex.setVertexID(new Text(split[i]));
- } else {
- vertex
- .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
- }
- }
- return true;
- }
-
- }
-
private static void printUsage() {
System.out
.println("Usage: <input> <output> [maximum iterations (default 30)]
[tasks]");
@@ -157,10 +128,10 @@ public class MindistSearch {
job.setVertexValueClass(Text.class);
job.setEdgeValueClass(NullWritable.class);
- job.setInputKeyClass(LongWritable.class);
- job.setInputValueClass(Text.class);
- job.setInputFormat(TextInputFormat.class);
- job.setVertexInputReaderClass(MindistSearchCountReader.class);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(Text.class);
+ job.setInputValueClass(TextArrayWritable.class);
+
job.setPartitioner(HashPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Wed Dec 12 05:32:53 2012
@@ -23,19 +23,17 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.graph.AbstractAggregator;
import org.apache.hama.graph.AverageAggregator;
-import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
-import org.apache.hama.graph.VertexInputReader;
/**
* Real pagerank with dangling node contribution.
@@ -99,37 +97,8 @@ public class PageRank {
}
}
- public static class PagerankTextReader extends
- VertexInputReader<LongWritable, Text, Text, NullWritable,
DoubleWritable> {
-
- /**
- * The text file essentially should look like: <br/>
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)<br/>
- * E.G:<br/>
- * 1\t2\t3\t4<br/>
- * 2\t3\t1<br/>
- * etc.
- */
- @Override
- public boolean parseVertex(LongWritable key, Text value,
- Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
- String[] split = value.toString().split("\t");
- for (int i = 0; i < split.length; i++) {
- if (i == 0) {
- vertex.setVertexID(new Text(split[i]));
- } else {
- vertex
- .addEdge(new Edge<Text, NullWritable>(new Text(split[i]), null));
- }
- }
- return true;
- }
-
- }
-
private static void printUsage() {
- System.out
- .println("Usage: <input> <output> [damping factor (default 0.85)]
[Epsilon (convergence error, default 0.001)] [Max iterations (default 30)]
[tasks]");
+ System.out.println("Usage: <input> <output> [tasks]");
System.exit(-1);
}
@@ -161,15 +130,11 @@ public class PageRank {
// set the defaults
pageJob.setMaxIteration(30);
pageJob.set("hama.pagerank.alpha", "0.85");
+ pageJob.set("hama.graph.max.convergence.error", "0.001");
- if (args.length == 6)
- pageJob.setNumBspTask(Integer.parseInt(args[5]));
- if (args.length >= 5)
- pageJob.setMaxIteration(Integer.parseInt(args[4]));
- if (args.length >= 4)
- pageJob.set("hama.graph.max.convergence.error", args[3]);
- if (args.length >= 3)
- pageJob.set("hama.pagerank.alpha", args[2]);
+ if (args.length == 3) {
+ pageJob.setNumBspTask(Integer.parseInt(args[2]));
+ }
// error, dangling node probability sum
pageJob.setAggregatorClass(AverageAggregator.class,
@@ -179,10 +144,10 @@ public class PageRank {
pageJob.setVertexValueClass(DoubleWritable.class);
pageJob.setEdgeValueClass(NullWritable.class);
- pageJob.setInputKeyClass(LongWritable.class);
- pageJob.setInputValueClass(Text.class);
- pageJob.setInputFormat(TextInputFormat.class);
- pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ pageJob.setInputFormat(SequenceFileInputFormat.class);
+ pageJob.setInputKeyClass(Text.class);
+ pageJob.setInputValueClass(TextArrayWritable.class);
+
pageJob.setPartitioner(HashPartitioner.class);
pageJob.setOutputFormat(TextOutputFormat.class);
pageJob.setOutputKeyClass(Text.class);
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
Wed Dec 12 05:32:53 2012
@@ -18,8 +18,6 @@
package org.apache.hama.examples;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
@@ -31,8 +29,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.examples.MindistSearch.MinTextCombiner;
public class MindistSearchTest extends TestCase {
@@ -95,22 +96,25 @@ public class MindistSearchTest extends T
}
private void generateTestData() {
- BufferedWriter bw = null;
try {
- bw = new BufferedWriter(new FileWriter(INPUT));
- for (String s : input) {
- bw.write(s + "\n");
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (bw != null) {
- try {
- bw.close();
- } catch (IOException e) {
- e.printStackTrace();
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ new Path(INPUT), Text.class, TextArrayWritable.class);
+
+ for (int i = 0; i < input.length; i++) {
+ String[] x = input[i].split("\t");
+ Text key = new Text(x[0]);
+ Writable[] values = new Writable[x.length - 1];
+ for (int j = 1; j < x.length; j++) {
+ values[j - 1] = new Text(x[j]);
}
+ TextArrayWritable value = new TextArrayWritable();
+ value.set(values);
+ writer.append(key, value);
}
+
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Wed Dec 12 05:32:53 2012
@@ -18,8 +18,6 @@
package org.apache.hama.examples;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -30,33 +28,15 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.util.SymmetricMatrixGen;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner;
public class PageRankTest extends TestCase {
- /**
- * The graph looks like this (adjacency list, [] contains outlinks):<br/>
- * stackoverflow.com [yahoo.com] <br/>
- * google.com []<br/>
- * facebook.com [twitter.com, google.com, nasa.gov]<br/>
- * yahoo.com [nasa.gov, stackoverflow.com]<br/>
- * twitter.com [google.com, facebook.com]<br/>
- * nasa.gov [yahoo.com, stackoverflow.com]<br/>
- * youtube.com [google.com, yahoo.com]<br/>
- * Note that google is removed in this part mainly to test the repair
- * functionality.
- */
- String[] input = new String[] { "stackoverflow.com\tyahoo.com",
- "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
- "yahoo.com\tnasa.gov\tstackoverflow.com",
- "twitter.com\tgoogle.com\tfacebook.com",
- "nasa.gov\tyahoo.com\tstackoverflow.com",
- "youtube.com\tgoogle.com\tyahoo.com" };
-
- private static String INPUT = "/tmp/pagerank-tmp.seq";
- private static String TEXT_INPUT = "/tmp/pagerank.txt";
+ private static String INPUT = "/tmp/pagerank/pagerank-tmp.seq";
+ private static String TEXT_INPUT = "/tmp/pagerank/pagerank.txt";
private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
- private static String OUTPUT = "/tmp/pagerank-out";
+ private static String OUTPUT = "/tmp/pagerank/pagerank-out";
private Configuration conf = new HamaConfiguration();
private FileSystem fs;
@@ -87,9 +67,10 @@ public class PageRankTest extends TestCa
generateTestData();
try {
HamaConfiguration conf = new HamaConfiguration(new Configuration());
- conf.set("bsp.local.tasks.maximum", "1");
+ conf.set("bsp.local.tasks.maximum", "10");
+ conf.set("bsp.peers.num", "7");
conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
- GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT },
+ GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT, "7"
},
conf);
if (!pageJob.waitForCompletion(true)) {
@@ -101,24 +82,9 @@ public class PageRankTest extends TestCa
}
}
- private void generateTestData() {
- BufferedWriter bw = null;
- try {
- bw = new BufferedWriter(new FileWriter(INPUT));
- for (String s : input) {
- bw.write(s + "\n");
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (bw != null) {
- try {
- bw.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
+ private void generateTestData() throws ClassNotFoundException,
+ InterruptedException, IOException {
+ SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "3" });
}
private void deleteTempDirs() {
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed Dec
12 05:32:53 2012
@@ -165,9 +165,6 @@ public class GraphJob extends BSPJob {
.checkArgument(this.getConfiguration()
.get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null,
"Please provide an edge value class, if you don't need one, use
NullWritable!");
- Preconditions.checkArgument(
- this.getConfiguration().get(VERTEX_GRAPH_INPUT_READER) != null,
- "Please provide a vertex input reader!");
super.submit();
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed Dec 12 05:32:53 2012
@@ -28,8 +28,8 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -273,53 +273,24 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
-
- /*
- * Several partitioning constants begin
- */
-
- final VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
- .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
- VertexInputReader.class), conf);
-
final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
- final boolean runtimePartitioning = conf.getBoolean(
- GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
-
- final long splitSize = peer.getSplitSize();
- final int partitioningSteps = partitionMultiSteps(peer, splitSize);
- final long interval = splitSize / partitioningSteps;
final boolean selfReference = conf.getBoolean("hama.graph.self.ref",
false);
- /*
- * Several partitioning constants end
- */
-
LOG.debug("vertex class: " + vertexClass);
Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
vertex.runner = this;
- long startPos = peer.getPos();
- if (startPos == 0)
- startPos = 1L;
-
KeyValuePair<Writable, Writable> next = null;
- int steps = 1;
while ((next = peer.readNext()) != null) {
- boolean vertexFinished = false;
- try {
- vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
- vertex);
- } catch (Exception e) {
- // LOG.error("exception occured during parsing vertex!" +
e.toString());
- throw new IOException("exception occured during parsing vertex!"
- + e.toString());
- }
-
- if (!vertexFinished) {
- continue;
+ V key = (V) next.getKey();
+ Writable[] edges = ((ArrayWritable) next.getValue()).get();
+ vertex.setVertexID(key);
+ List<Edge<V, E>> edgeList = new ArrayList<Edge<V, E>>();
+ for (Writable edge : edges) {
+ edgeList.add(new Edge<V, E>((V) edge, null));
}
+ vertex.setEdges(edgeList);
if (vertex.getEdges() == null) {
if (selfReference) {
@@ -334,44 +305,12 @@ public final class GraphJobRunner<V exte
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
- if (runtimePartitioning) {
- int partition = partitioner.getPartition(vertex.getVertexID(),
- vertex.getValue(), peer.getNumPeers());
- peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
- } else {
- vertex.setup(conf);
- vertices.add(vertex);
- }
+ vertex.setup(conf);
+ vertices.add(vertex);
vertex = newVertexInstance(vertexClass, conf);
vertex.runner = this;
-
- if (runtimePartitioning) {
- if (steps < partitioningSteps && (peer.getPos() - startPos) >=
interval) {
- peer.sync();
- steps++;
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
- messagedVertex.runner = this;
- messagedVertex.setup(conf);
- vertices.add(messagedVertex);
- }
- startPos = peer.getPos();
- }
- }
}
- if (runtimePartitioning) {
- peer.sync();
-
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
- messagedVertex.runner = this;
- messagedVertex.setup(conf);
- vertices.add(messagedVertex);
- }
- }
LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
/*
@@ -383,7 +322,7 @@ public final class GraphJobRunner<V exte
*/
if (repairNeeded) {
LOG.debug("Starting repair of this graph!");
- repair(peer, partitioningSteps, selfReference);
+ repair(peer, selfReference);
}
LOG.debug("Starting Vertex processing!");
@@ -392,83 +331,16 @@ public final class GraphJobRunner<V exte
@SuppressWarnings("unchecked")
private void repair(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- int partitioningSteps, boolean selfReference) throws IOException,
+ boolean selfReference) throws IOException,
SyncException, InterruptedException {
- int multiSteps = 0;
- MapWritable ssize = new MapWritable();
- ssize.put(new IntWritable(peer.getPeerIndex()),
- new IntWritable(vertices.size()));
- peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
- ssize = null;
- peer.sync();
-
- if (isMasterTask(peer)) {
- int minVerticesSize = Integer.MAX_VALUE;
- GraphJobMessage received = null;
- while ((received = peer.getCurrentMessage()) != null) {
- MapWritable x = received.getMap();
- for (Entry<Writable, Writable> e : x.entrySet()) {
- int curr = ((IntWritable) e.getValue()).get();
- if (minVerticesSize > curr) {
- minVerticesSize = curr;
- }
- }
- }
-
- if (minVerticesSize < (partitioningSteps * 2)) {
- multiSteps = minVerticesSize;
- } else {
- multiSteps = (partitioningSteps * 2);
- }
-
- for (String peerName : peer.getAllPeerNames()) {
- MapWritable temp = new MapWritable();
- temp.put(new Text("steps"), new IntWritable(multiSteps));
- peer.send(peerName, new GraphJobMessage(temp));
- }
- }
- peer.sync();
-
- GraphJobMessage received = peer.getCurrentMessage();
- MapWritable x = received.getMap();
- for (Entry<Writable, Writable> e : x.entrySet()) {
- multiSteps = ((IntWritable) e.getValue()).get();
- }
-
Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
- int i = 0;
- int syncs = 0;
-
for (Vertex<V, E, M> v : vertices) {
for (Edge<V, E> e : v.getEdges()) {
peer.send(v.getDestinationPeerName(e),
new GraphJobMessage(e.getDestinationVertexID()));
}
-
- if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
- peer.sync();
- syncs++;
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- V vertexName = (V) msg.getVertexId();
-
- Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
- newVertex.setVertexID(vertexName);
- newVertex.runner = this;
- if (selfReference) {
- newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
- newVertex.getVertexID(), null)));
- } else {
- newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
- }
- newVertex.setup(conf);
- tmp.put(vertexName, newVertex);
-
- }
- }
- i++;
}
peer.sync();
@@ -488,7 +360,6 @@ public final class GraphJobRunner<V exte
newVertex.setup(conf);
tmp.put(vertexName, newVertex);
newVertex = null;
-
}
for (Vertex<V, E, M> e : vertices) {
@@ -502,59 +373,6 @@ public final class GraphJobRunner<V exte
}
/**
- * Partitions our vertices through multiple supersteps to save memory.
- */
- private int partitionMultiSteps(
- BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- long splitSize) throws IOException, SyncException, InterruptedException {
- int multiSteps = 1;
-
- MapWritable ssize = new MapWritable();
- ssize
- .put(new IntWritable(peer.getPeerIndex()), new
LongWritable(splitSize));
- peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
- ssize = null;
- peer.sync();
-
- if (isMasterTask(peer)) {
- long maxSplitSize = 0L;
- GraphJobMessage received = null;
- while ((received = peer.getCurrentMessage()) != null) {
- MapWritable x = received.getMap();
- for (Entry<Writable, Writable> e : x.entrySet()) {
- long curr = ((LongWritable) e.getValue()).get();
- if (maxSplitSize < curr) {
- maxSplitSize = curr;
- }
- }
- }
-
- int steps = (int) (maxSplitSize / conf.getLong( // 20 mb
- "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
-
- for (String peerName : peer.getAllPeerNames()) {
- MapWritable temp = new MapWritable();
- temp.put(new Text("max"), new IntWritable(steps));
- peer.send(peerName, new GraphJobMessage(temp));
- }
- }
- peer.sync();
-
- GraphJobMessage received = peer.getCurrentMessage();
- MapWritable x = received.getMap();
- for (Entry<Writable, Writable> e : x.entrySet()) {
- multiSteps = ((IntWritable) e.getValue()).get();
- }
-
- if (isMasterTask(peer)) {
- peer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment(
- multiSteps);
- }
-
- return multiSteps;
- }
-
- /**
* Counts vertices globally by sending the count of vertices in the map to
the
* other peers.
*/
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1420530&r1=1420529&r2=1420530&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Wed Dec 12 05:32:53 2012
@@ -17,38 +17,36 @@
*/
package org.apache.hama.graph;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.TestBSPMasterGroomServer;
-import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.graph.example.PageRank;
public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
String[] input = new String[] { "stackoverflow.com\tyahoo.com",
- "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
- "yahoo.com\tnasa.gov\tstackoverflow.com]",
- "twitter.com\tgoogle.com\tfacebook.com]",
- "nasa.gov\tyahoo.com\tstackoverflow.com]",
- "youtube.com\tgoogle.com\tyahoo.com]" };
+ "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
+ "yahoo.com\tnasa.gov\tstackoverflow.com",
+ "twitter.com\tgoogle.com\tfacebook.com",
+ "nasa.gov\tyahoo.com\tstackoverflow.com",
+ "youtube.com\tgoogle.com\tyahoo.com" };
- private static String INPUT = "/tmp/pagerank-real-tmp.seq";
- private static String OUTPUT = "/tmp/pagerank-real-out";
+ private static String INPUT = "/tmp/pagerank/real-tmp.seq";
+ private static String OUTPUT = "/tmp/pagerank/real-out";
@SuppressWarnings("unchecked")
@Override
@@ -60,7 +58,7 @@ public class TestSubmitGraphJob extends
configuration.setInt("hama.graph.multi.step.partitioning.interval", 30);
GraphJob bsp = new GraphJob(configuration, PageRank.class);
- bsp.setInputPath(new Path(INPUT));
+ bsp.setInputPath(new Path("/tmp/pagerank"));
bsp.setOutputPath(new Path(OUTPUT));
BSPJobClient jobClient = new BSPJobClient(configuration);
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
@@ -77,14 +75,15 @@ public class TestSubmitGraphJob extends
bsp.setAggregatorClass(AverageAggregator.class,
PageRank.DanglingNodeAggregator.class);
+ bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
+ bsp.setInputFormat(SequenceFileInputFormat.class);
+ bsp.setInputKeyClass(Text.class);
+ bsp.setInputValueClass(TextArrayWritable.class);
+
bsp.setVertexIDClass(Text.class);
bsp.setVertexValueClass(DoubleWritable.class);
bsp.setEdgeValueClass(NullWritable.class);
- bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
- bsp.setInputFormat(TextInputFormat.class);
- bsp.setInputKeyClass(LongWritable.class);
- bsp.setInputValueClass(Text.class);
bsp.setPartitioner(HashPartitioner.class);
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(Text.class);
@@ -123,26 +122,25 @@ public class TestSubmitGraphJob extends
}
private void generateTestData() {
- BufferedWriter bw = null;
try {
- bw = new BufferedWriter(new FileWriter(INPUT));
- for (String s : input) {
- bw.write(s + "\n");
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- if (bw != null) {
- try {
- bw.close();
-
- File file = new File(INPUT);
- LOG.info("Temp file length: " + file.length());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(),
+ new Path(INPUT), Text.class, TextArrayWritable.class);
- } catch (IOException e) {
- e.printStackTrace();
+ for (int i = 0; i < input.length; i++) {
+ String[] x = input[i].split("\t");
+ Text key = new Text(x[0]);
+ Writable[] values = new Writable[x.length - 1];
+ for (int j = 1; j < x.length; j++) {
+ values[j - 1] = new Text(x[j]);
}
+ TextArrayWritable value = new TextArrayWritable();
+ value.set(values);
+ writer.append(key, value);
}
+
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
}
}