Author: edwardyoon
Date: Thu Jan 2 05:56:51 2014
New Revision: 1554740
URL: http://svn.apache.org/r1554740
Log:
Fix bug in BSPJobClient
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Jan
2 05:56:51 2014
@@ -412,14 +412,13 @@ public class BSPJobClient extends Config
Constants.RUNTIME_PARTITIONING_CLASS)));
}
- if ((numTasks > 0 && numTasks != numSplits)
- || (job.getConfiguration().getBoolean(
- Constants.ENABLE_RUNTIME_PARTITIONING, false) && job
- .getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) !=
null)) {
+ if (numTasks == 0) {
+ numTasks = numSplits;
+ }
- if (numTasks == 0) {
- numTasks = numSplits;
- }
+ if (job.getConfiguration().getBoolean(
+ Constants.ENABLE_RUNTIME_PARTITIONING, false)
+ && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS)
!= null) {
HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
@@ -428,10 +427,9 @@ public class BSPJobClient extends Config
conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
.get(Constants.RUNTIME_PARTITIONING_DIR));
}
- if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS)
!= null) {
- conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
- job.get(Constants.RUNTIME_PARTITIONING_CLASS));
- }
+
+ conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
+ job.get(Constants.RUNTIME_PARTITIONING_CLASS));
BSPJob partitioningJob = new BSPJob(conf);
LOG.debug("partitioningJob input: "
+ partitioningJob.get(Constants.JOB_INPUT_DIR));
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Thu Jan 2 05:56:51 2014
@@ -107,7 +107,8 @@ public class TestBSPMasterGroomServer ex
public static void checkOutput(FileSystem fileSys, Configuration conf,
int tasks) throws Exception {
- FileStatus[] listStatus = fileSys.listStatus(OUTPUT_PATH);
+ FileStatus[] listStatus = fileSys.globStatus(new Path(OUTPUT_PATH +
"/part-*"));
+
assertEquals(listStatus.length, tasks);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Thu Jan 2 05:56:51 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
@@ -159,6 +160,7 @@ public class TestKeyValueTextInputFormat
job.setJobName("Test KeyValueTextInputFormat together with
HashPartitioner");
job.setBspClass(KeyValueHashPartitionedBSP.class);
+ job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
job.setPartitioner(HashPartitioner.class);
job.setInputPath(dataPath);
Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Thu Jan
2 05:56:51 2014
@@ -52,7 +52,6 @@ import org.apache.hama.commons.io.PipesK
import org.apache.hama.commons.io.PipesVectorWritable;
import org.apache.hama.commons.math.DenseDoubleVector;
import org.apache.hama.commons.math.DoubleVector;
-import org.junit.Test;
/**
* Test case for {@link PipesBSP}
@@ -65,18 +64,19 @@ public class TestPipes extends HamaClust
public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation";
public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC =
"/examples/matrixmultiplication";
public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/";
- public static final String HAMA_TMP_OUTPUT = "/tmp/hama-test/";
+ public static final String HAMA_TMP_OUTPUT = "/tmp/hama-pipes/";
public static final String HAMA_TMP_DISK_QUEUE_OUTPUT = "/tmp/messageQueue";
public static final int DOUBLE_PRECISION = 6;
- protected HamaConfiguration configuration;
+ private HamaConfiguration configuration;
+ private static FileSystem fs = null;
public TestPipes() {
configuration = new HamaConfiguration();
try {
// Cleanup temp Hama locations
- FileSystem fs = FileSystem.get(configuration);
+ fs = FileSystem.get(configuration);
cleanup(fs, new Path(HAMA_TMP_OUTPUT));
cleanup(fs, new Path(HAMA_TMP_DISK_QUEUE_OUTPUT));
// Remove local temp folder
@@ -109,8 +109,9 @@ public class TestPipes extends HamaClust
super.tearDown();
}
- @Test
public void testPipes() throws Exception {
+ System.setProperty(EXAMPLES_INSTALL_PROPERTY,
+ "/home/edward/workspace/hama-trunk/c++/target/native/");
assertNotNull("System property " + EXAMPLES_INSTALL_PROPERTY
+ " is not defined!", System.getProperty(EXAMPLES_INSTALL_PROPERTY));
@@ -121,35 +122,45 @@ public class TestPipes extends HamaClust
return;
}
- LOG.info(EXAMPLES_INSTALL_PROPERTY + " is defined: '"
- + System.getProperty(EXAMPLES_INSTALL_PROPERTY) + "'");
+ // *** Summation Test ***
+ summation();
+
+ // *** MatrixMultiplication Test ***
+ matrixMult();
+
+ // Remove local temp folder
+ cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+ }
+ private void summation() throws Exception {
// Setup Paths
String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
Path summationExec = new Path(examplesInstallPath +
EXAMPLE_SUMMATION_EXEC);
- Path matrixmultiplicationExec = new Path(examplesInstallPath
- + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
- Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/in");
- Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/out");
+ Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/in");
+ Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/out");
- FileSystem fs = FileSystem.get(configuration);
-
- // *** Summation Test ***
// Generate Summation input
BigDecimal sum = writeSummationInputFile(fs, inputPath);
// Run Summation example
runProgram(getSummationJob(configuration), summationExec, inputPath,
- outputPath, 3, this.numOfGroom);
+ outputPath, 1, this.numOfGroom);
// Verify output
verifySummationOutput(configuration, outputPath, sum);
-
// Clean input and output folder
cleanup(fs, inputPath);
cleanup(fs, outputPath);
+ }
+
+ private void matrixMult() throws Exception {
+ String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
+ Path matrixmultiplicationExec = new Path(examplesInstallPath
+ + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
+
+ Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/in");
+ Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/out");
- // *** MatrixMultiplication Test ***
// Generate matrix dimensions
Random rand = new Random();
// (0-19) + 11 -> between 11-30
@@ -168,14 +179,14 @@ public class TestPipes extends HamaClust
// Run MatrixMultiplication example
runProgram(
getMatrixMultiplicationJob(configuration, transposedMatrixBPath),
- matrixmultiplicationExec, matrixAPath, outputPath, 3, this.numOfGroom);
+ matrixmultiplicationExec, matrixAPath, outputPath, 2, this.numOfGroom);
// Verify output
double[][] matrixC = multiplyMatrix(matrixA, matrixB);
verifyMatrixMultiplicationOutput(configuration, outputPath, matrixC);
- // Remove local temp folder
- cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+ cleanup(fs, inputPath);
+ cleanup(fs, outputPath);
}
static BSPJob getSummationJob(HamaConfiguration conf) throws IOException {
@@ -187,6 +198,7 @@ public class TestPipes extends HamaClust
bsp.setOutputKeyClass(Text.class);
bsp.setOutputValueClass(DoubleWritable.class);
bsp.set("bsp.message.class", DoubleWritable.class.getName());
+
return bsp;
}
@@ -199,8 +211,13 @@ public class TestPipes extends HamaClust
bsp.setOutputFormat(SequenceFileOutputFormat.class);
bsp.setOutputKeyClass(IntWritable.class);
bsp.setOutputValueClass(PipesVectorWritable.class);
+
+ bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts");
bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
+
+ bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
bsp.setPartitioner(PipesPartitioner.class);
+
// sort sent messages
bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
"org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
@@ -226,14 +243,12 @@ public class TestPipes extends HamaClust
out.writeBytes(line);
sum = sum.add(new BigDecimal(truncatedValue));
- LOG.info("input[" + i + "]: '" + line + "' sum: " + sum.toString());
}
out.close();
return sum;
}
static double[][] createRandomMatrix(int rows, int columns, Random rand) {
- LOG.info("createRandomMatrix rows: " + rows + " cols: " + columns);
final double[][] matrix = new double[rows][columns];
double rangeMin = 0;
double rangeMax = 100;
@@ -255,7 +270,6 @@ public class TestPipes extends HamaClust
// Write matrix to DFS
SequenceFile.Writer writer = null;
try {
- FileSystem fs = FileSystem.get(conf);
writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
PipesVectorWritable.class);
@@ -314,7 +328,6 @@ public class TestPipes extends HamaClust
static void verifyOutput(HamaConfiguration conf, Path outputPath,
String[] expectedResults) throws IOException {
- FileSystem fs = FileSystem.get(conf);
FileStatus[] listStatus = fs.listStatus(outputPath);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
@@ -342,7 +355,6 @@ public class TestPipes extends HamaClust
static void verifySummationOutput(HamaConfiguration conf, Path outputPath,
BigDecimal sum) throws IOException {
- FileSystem fs = FileSystem.get(conf);
FileStatus[] listStatus = fs.listStatus(outputPath);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
@@ -365,31 +377,22 @@ public class TestPipes extends HamaClust
static void verifyMatrixMultiplicationOutput(HamaConfiguration conf,
Path outputPath, double[][] matrix) throws IOException {
- FileSystem fs = FileSystem.get(conf);
FileStatus[] listStatus = fs.listStatus(outputPath);
for (FileStatus status : listStatus) {
if (!status.isDir()) {
- LOG.info("Output File: " + status.getPath());
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
status.getPath(), conf);
IntWritable key = new IntWritable();
PipesVectorWritable value = new PipesVectorWritable();
int rowIdx = 0;
while (reader.next(key, value)) {
-
assertEquals("Expected rowIdx: '" + rowIdx + "' != '" + key.get()
+ "'", rowIdx, key.get());
DoubleVector rowVector = value.getVector();
- LOG.info("key: " + key.get() + " value: " + rowVector.toString());
for (int colIdx = 0; colIdx < rowVector.getLength(); colIdx++) {
-
double colValue = rowVector.get(colIdx);
-
- LOG.info("value[" + rowIdx + "," + colIdx + "]: " + colValue
- + " expected: " + matrix[rowIdx][colIdx]);
-
assertEquals("Expected colValue: '" + matrix[rowIdx][colIdx]
+ "' != '" + colValue + "' in row: " + rowIdx + " values: "
+ rowVector.toString(), matrix[rowIdx][colIdx], colValue,
@@ -408,8 +411,8 @@ public class TestPipes extends HamaClust
}
static void runProgram(BSPJob bsp, Path program, Path inputPath,
- Path outputPath, int numBspTasks, int numOfGroom) throws IOException {
-
+ Path outputPath, int numBspTasks, int numOfGroom) throws IOException,
+ ClassNotFoundException, InterruptedException {
HamaConfiguration conf = (HamaConfiguration) bsp.getConfiguration();
bsp.setJobName("Test Hama Pipes " + program.getName());
bsp.setBspClass(PipesBSP.class);
@@ -421,7 +424,6 @@ public class TestPipes extends HamaClust
Submitter.setIsJavaRecordWriter(conf, true);
BSPJobClient jobClient = new BSPJobClient(conf);
- conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
// Set bspTaskNum
ClusterStatus cluster = jobClient.getClusterStatus(false);
@@ -430,7 +432,6 @@ public class TestPipes extends HamaClust
// Copy binary to DFS
Path testExec = new Path(EXAMPLE_TMP_OUTPUT + "testing/bin/application");
- FileSystem fs = FileSystem.get(conf);
fs.delete(testExec.getParent(), true);
fs.copyFromLocalFile(program, testExec);