Author: edwardyoon
Date: Mon Jan 14 05:37:25 2013
New Revision: 1432807
URL: http://svn.apache.org/viewvc?rev=1432807&view=rev
Log:
HAMA-712: PartitioningRunner should works for multiple input files
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan 14 05:37:25 2013
@@ -11,6 +11,8 @@ Release 0.7 (unreleased changes)
BUG FIXES
+ HAMA-712: PartitioningRunner should works for multiple input files
(surajsmenon via edwardyoon)
+
IMPROVEMENTS
HAMA-531: Reimplementation of partitioner (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Jan
14 05:37:25 2013
@@ -412,10 +412,13 @@ public class BSPJobClient extends Config
fs.delete(partitionDir, true);
}
+ if (numTasks == 0) {
+ numTasks = numSplits;
+ }
+
HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
- conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT,
- Integer.parseInt(job.getConfiguration().get("bsp.peers.num")));
+ conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, numTasks);
if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) !=
null) {
conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
.get(Constants.RUNTIME_PARTITIONING_DIR));
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Mon Jan 14 05:37:25 2013
@@ -71,7 +71,7 @@ public class PageRankTest extends TestCa
try {
HamaConfiguration conf = new HamaConfiguration(new Configuration());
conf.set("bsp.local.tasks.maximum", "10");
- conf.set("bsp.peers.num", "7");
+ conf.setInt("bsp.peers.num", 7);
conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
GraphJob pageJob = PageRank.createJob(
new String[] { INPUT, OUTPUT, "7" }, conf);
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1432807&r1=1432806&r2=1432807&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Mon Jan 14 05:37:25 2013
@@ -65,7 +65,6 @@ public class TestSubmitGraphJob extends
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
ClusterStatus cluster = jobClient.getClusterStatus(false);
assertEquals(this.numOfGroom, cluster.getGroomServers());
- bsp.setNumBspTask(2);
LOG.info("Client finishes execution job.");
bsp.setJobName("Pagerank");
bsp.setVertexClass(PageRank.PageRankVertex.class);
@@ -119,12 +118,13 @@ public class TestSubmitGraphJob extends
assertTrue(sum > 0.9d && sum <= 1.1d);
}
+
private void generateTestData() {
try {
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(),
- new Path(INPUT), PageRankVertex.class, NullWritable.class);
+ SequenceFile.Writer writer1 = SequenceFile.createWriter(fs, getConf(),
+ new Path(INPUT+"/part0"), PageRankVertex.class, NullWritable.class);
- for (int i = 0; i < input.length; i++) {
+ for (int i = 0; i < input.length/2; i++) {
String[] x = input[i].split("\t");
PageRankVertex vertex = new PageRankVertex();
@@ -133,15 +133,33 @@ public class TestSubmitGraphJob extends
vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
NullWritable.get()));
}
- writer.append(vertex, NullWritable.get());
+ writer1.append(vertex, NullWritable.get());
}
- writer.close();
+ writer1.close();
+
+ SequenceFile.Writer writer2 = SequenceFile.createWriter(fs, getConf(),
+ new Path(INPUT+"/part1"), PageRankVertex.class, NullWritable.class);
+
+ for (int i = 0; i < input.length/2 + 1; i++) {
+ String[] x = input[i].split("\t");
+
+ PageRankVertex vertex = new PageRankVertex();
+ vertex.setVertexID(new Text(x[0]));
+ for (int j = 1; j < x.length; j++) {
+ vertex.addEdge(new Edge<Text, NullWritable>(new Text(x[j]),
+ NullWritable.get()));
+ }
+ writer2.append(vertex, NullWritable.get());
+ }
+
+ writer2.close();
+
} catch (IOException e) {
e.printStackTrace();
}
}
-
+
private void deleteTempDirs() {
try {
if (fs.exists(new Path(INPUT)))