Author: edwardyoon
Date: Thu Nov 24 12:30:37 2011
New Revision: 1205805
URL: http://svn.apache.org/viewvc?rev=1205805&view=rev
Log:
Adding my few minor changes to trunk.
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1205805&r1=1205804&r2=1205805&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Thu Nov 24 12:30:37 2011
@@ -312,15 +312,6 @@ public class BSPJobClient extends Config
*/
UnixUserGroupInformation ugi = getUGI(job.getConf());
- ClusterStatus clusterStatus = getClusterStatus(true);
-
- // check the number of BSP tasks
- int tasks = job.getNumBspTask();
- int maxTasks = clusterStatus.getMaxTasks();
- if (tasks <= 0 || tasks > maxTasks) {
- job.setNumBspTask(maxTasks);
- }
-
FileSystem fs = getFs();
// Create a number of filenames in the BSPMaster's fs namespace
fs.delete(submitJobDir, true);
@@ -335,7 +326,9 @@ public class BSPJobClient extends Config
if (job.get("bsp.input.dir") != null) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- job = partition(job);
+ if (job.getConf().get("bsp.input.partitioner.class") != null) {
+ job = partition(job);
+ }
job.setNumBspTask(writeSplits(job, submitSplitFile));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -390,84 +383,81 @@ public class BSPJobClient extends Config
@SuppressWarnings( { "rawtypes", "unchecked" })
private BSPJob partition(BSPJob job) throws IOException {
- if (job.getConf().get("bsp.input.partitioner.class") != null) {
- InputSplit[] splits = job.getInputFormat().getSplits(job,
- job.getNumBspTask());
- int numOfTasks = job.getNumBspTask();
- String input = job.getConf().get("bsp.input.dir");
-
- if (input != null) {
- InputFormat<?, ?> inputFormat = job.getInputFormat();
-
- Path partitionedPath = new Path(input, "hama-partitions");
- Path inputPath = new Path(input);
- if (fs.isFile(inputPath)) {
- partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
- }
+ InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
+ int numOfTasks = splits.length; // job.getNumBspTask();
+ String input = job.getConf().get("bsp.input.dir");
- String alternatePart = job.get("bsp.partitioning.dir");
- if (alternatePart != null) {
- partitionedPath = new Path(alternatePart, job.getJobID().toString());
- }
+ if (input != null) {
+ InputFormat<?, ?> inputFormat = job.getInputFormat();
- if (fs.exists(partitionedPath)) {
- fs.delete(partitionedPath, true);
- } else {
- fs.mkdirs(partitionedPath);
- }
- // FIXME this is soo unsafe
- RecordReader sampleReader = inputFormat.getRecordReader(splits[0],
job);
+ Path partitionedPath = new Path(input, "hama-partitions");
+ Path inputPath = new Path(input);
+ if (fs.isFile(inputPath)) {
+ partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
+ }
- List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
- numOfTasks);
+ String alternatePart = job.get("bsp.partitioning.dir");
+ if (alternatePart != null) {
+ partitionedPath = new Path(alternatePart, job.getJobID().toString());
+ }
- CompressionType compressionType = getOutputCompressionType(job);
- Class<? extends CompressionCodec> outputCompressorClass =
getOutputCompressorClass(
- job, null);
- CompressionCodec codec = null;
- if (outputCompressorClass != null) {
- codec = ReflectionUtils.newInstance(outputCompressorClass, job
- .getConf());
- }
+ if (fs.exists(partitionedPath)) {
+ fs.delete(partitionedPath, true);
+ } else {
+ fs.mkdirs(partitionedPath);
+ }
+ // FIXME this is soo unsafe
+ RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
- try {
- for (int i = 0; i < numOfTasks; i++) {
- Path p = new Path(partitionedPath, "part-" + i);
- if (codec == null) {
- writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
- sampleReader.createKey().getClass(), sampleReader
- .createValue().getClass(), CompressionType.NONE));
- } else {
- writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
- sampleReader.createKey().getClass(), sampleReader
- .createValue().getClass(), compressionType, codec));
- }
- }
+ List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+ numOfTasks);
- Partitioner partitioner = job.getPartitioner();
- for (int i = 0; i < splits.length; i++) {
- InputSplit split = splits[i];
- RecordReader recordReader = inputFormat.getRecordReader(split,
job);
- Object key = recordReader.createKey();
- Object value = recordReader.createValue();
- while (recordReader.next(key, value)) {
- int index = Math.abs(partitioner.getPartition(key, value,
- numOfTasks));
- writers.get(index).append(key, value);
- }
- LOG.debug("Done with split " + i);
- }
- } finally {
- for (SequenceFile.Writer wr : writers) {
- wr.close();
+ CompressionType compressionType = getOutputCompressionType(job);
+ Class<? extends CompressionCodec> outputCompressorClass =
getOutputCompressorClass(
+ job, null);
+ CompressionCodec codec = null;
+ if (outputCompressorClass != null) {
+ codec = ReflectionUtils.newInstance(outputCompressorClass, job
+ .getConf());
+ }
+
+ try {
+ for (int i = 0; i < numOfTasks; i++) {
+ Path p = new Path(partitionedPath, "part-" + i);
+ if (codec == null) {
+ writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+ sampleReader.createKey().getClass(), sampleReader.createValue()
+ .getClass(), CompressionType.NONE));
+ } else {
+ writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+ sampleReader.createKey().getClass(), sampleReader.createValue()
+ .getClass(), compressionType, codec));
}
}
- job.setInputFormat(SequenceFileInputFormat.class);
- job.setInputPath(partitionedPath);
+ Partitioner partitioner = job.getPartitioner();
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit split = splits[i];
+ RecordReader recordReader = inputFormat.getRecordReader(split, job);
+ Object key = recordReader.createKey();
+ Object value = recordReader.createValue();
+ while (recordReader.next(key, value)) {
+ int index = Math.abs(partitioner.getPartition(key, value,
+ numOfTasks));
+ writers.get(index).append(key, value);
+ }
+ LOG.debug("Done with split " + i);
+ }
+ } finally {
+ for (SequenceFile.Writer wr : writers) {
+ wr.close();
+ }
}
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputPath(partitionedPath);
}
+
return job;
}
@@ -513,8 +503,7 @@ public class BSPJobClient extends Config
}
private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException
{
- InputSplit[] splits = job.getInputFormat().getSplits(job,
- job.getNumBspTask());
+ InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
submitSplitFile, splits.length);
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1205805&r1=1205804&r2=1205805&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
Thu Nov 24 12:30:37 2011
@@ -209,6 +209,8 @@ public abstract class FileInputFormat<K,
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+ LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+ + minSize + ", " + blockSize + ")");
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
@@ -264,8 +266,8 @@ public abstract class FileInputFormat<K,
* inputs for the map-reduce job.
*/
public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
- setInputPaths(conf,
- StringUtils.stringToPath(getPathStrings(commaSeparatedPaths)));
+ setInputPaths(conf, StringUtils
+ .stringToPath(getPathStrings(commaSeparatedPaths)));
}
/**