Author: edwardyoon
Date: Tue Dec 18 03:57:02 2012
New Revision: 1423269
URL: http://svn.apache.org/viewvc?rev=1423269&view=rev
Log:
HAMA-531: Minor changes. Make partition dir configurable
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1423269&r1=1423268&r2=1423269&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Dec 18
03:57:02 2012
@@ -240,6 +240,7 @@ public class BSPJob extends BSPJobContex
HamaConfiguration conf = new HamaConfiguration();
conf.setInt("desired.num.of.tasks",
Integer.parseInt(this.getConfiguration().get("bsp.peers.num")));
+ conf.set("bsp.partitioning.dir",
this.getConfiguration().get("bsp.partitioning.dir"));
BSPJob partitioningJob = new BSPJob(conf);
partitioningJob.setInputPath(new Path(this.getConfiguration().get(
"bsp.input.dir")));
@@ -251,7 +252,11 @@ public class BSPJob extends BSPJobContex
isPartitioned = partitioningJob.waitForCompletion(true);
if (isPartitioned) {
- this.setInputPath(new Path(inputDir + "/partitions"));
+ if(conf.get("bsp.partitioning.dir") != null) {
+ this.setInputPath(new Path(conf.get("bsp.partitioning.dir")));
+ } else {
+ this.setInputPath(new Path(inputDir + "/partitions"));
+ }
}
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1423269&r1=1423268&r2=1423269&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Tue Dec 18 03:57:02 2012
@@ -54,7 +54,11 @@ public class PartitioningRunner extends
inputDir = inputDir.getParent();
}
- this.partitionDir = new Path(inputDir + "/partitions");
+ if(conf.get("bsp.partitioning.dir") != null) {
+ this.partitionDir = new Path(conf.get("bsp.partitioning.dir"));
+ } else {
+ this.partitionDir = new Path(inputDir + "/partitions");
+ }
}
@Override
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1423269&r1=1423268&r2=1423269&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Tue
Dec 18 03:57:02 2012
@@ -24,6 +24,7 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@@ -52,6 +53,9 @@ public class TestPartitioning extends Te
bsp.setInputPath(new Path("../CHANGES.txt"));
bsp.setPartitioner(HashPartitioner.class);
assertTrue(bsp.waitForCompletion(true));
+
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(new Path("/tmp/hama-test/partitioning/localtest"), true);
}
public static class PartionedBSP extends
@@ -69,7 +73,6 @@ public class TestPartitioning extends Te
assertTrue(numOfPairs > 2);
}
-
}
}