Author: edwardyoon
Date: Thu Nov 24 12:30:37 2011
New Revision: 1205805

URL: http://svn.apache.org/viewvc?rev=1205805&view=rev
Log:
Adding my few minor changes to trunk.

Modified:
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1205805&r1=1205804&r2=1205805&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java 
Thu Nov 24 12:30:37 2011
@@ -312,15 +312,6 @@ public class BSPJobClient extends Config
      */
     UnixUserGroupInformation ugi = getUGI(job.getConf());
 
-    ClusterStatus clusterStatus = getClusterStatus(true);
-
-    // check the number of BSP tasks
-    int tasks = job.getNumBspTask();
-    int maxTasks = clusterStatus.getMaxTasks();
-    if (tasks <= 0 || tasks > maxTasks) {
-      job.setNumBspTask(maxTasks);
-    }
-
     FileSystem fs = getFs();
     // Create a number of filenames in the BSPMaster's fs namespace
     fs.delete(submitJobDir, true);
@@ -335,7 +326,9 @@ public class BSPJobClient extends Config
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-      job = partition(job);
+      if (job.getConf().get("bsp.input.partitioner.class") != null) {
+        job = partition(job);
+      }
       job.setNumBspTask(writeSplits(job, submitSplitFile));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
@@ -390,84 +383,81 @@ public class BSPJobClient extends Config
 
   @SuppressWarnings( { "rawtypes", "unchecked" })
   private BSPJob partition(BSPJob job) throws IOException {
-    if (job.getConf().get("bsp.input.partitioner.class") != null) {
-      InputSplit[] splits = job.getInputFormat().getSplits(job,
-          job.getNumBspTask());
-      int numOfTasks = job.getNumBspTask();
-      String input = job.getConf().get("bsp.input.dir");
-
-      if (input != null) {
-        InputFormat<?, ?> inputFormat = job.getInputFormat();
-
-        Path partitionedPath = new Path(input, "hama-partitions");
-        Path inputPath = new Path(input);
-        if (fs.isFile(inputPath)) {
-          partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
-        }
+    InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
+    int numOfTasks = splits.length; // job.getNumBspTask();
+    String input = job.getConf().get("bsp.input.dir");
 
-        String alternatePart = job.get("bsp.partitioning.dir");
-        if (alternatePart != null) {
-          partitionedPath = new Path(alternatePart, job.getJobID().toString());
-        }
+    if (input != null) {
+      InputFormat<?, ?> inputFormat = job.getInputFormat();
 
-        if (fs.exists(partitionedPath)) {
-          fs.delete(partitionedPath, true);
-        } else {
-          fs.mkdirs(partitionedPath);
-        }
-        // FIXME this is soo unsafe
-        RecordReader sampleReader = inputFormat.getRecordReader(splits[0], 
job);
+      Path partitionedPath = new Path(input, "hama-partitions");
+      Path inputPath = new Path(input);
+      if (fs.isFile(inputPath)) {
+        partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
+      }
 
-        List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
-            numOfTasks);
+      String alternatePart = job.get("bsp.partitioning.dir");
+      if (alternatePart != null) {
+        partitionedPath = new Path(alternatePart, job.getJobID().toString());
+      }
 
-        CompressionType compressionType = getOutputCompressionType(job);
-        Class<? extends CompressionCodec> outputCompressorClass = 
getOutputCompressorClass(
-            job, null);
-        CompressionCodec codec = null;
-        if (outputCompressorClass != null) {
-          codec = ReflectionUtils.newInstance(outputCompressorClass, job
-              .getConf());
-        }
+      if (fs.exists(partitionedPath)) {
+        fs.delete(partitionedPath, true);
+      } else {
+        fs.mkdirs(partitionedPath);
+      }
+      // FIXME this is soo unsafe
+      RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
 
-        try {
-          for (int i = 0; i < numOfTasks; i++) {
-            Path p = new Path(partitionedPath, "part-" + i);
-            if (codec == null) {
-              writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
-                  sampleReader.createKey().getClass(), sampleReader
-                      .createValue().getClass(), CompressionType.NONE));
-            } else {
-              writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
-                  sampleReader.createKey().getClass(), sampleReader
-                      .createValue().getClass(), compressionType, codec));
-            }
-          }
+      List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+          numOfTasks);
 
-          Partitioner partitioner = job.getPartitioner();
-          for (int i = 0; i < splits.length; i++) {
-            InputSplit split = splits[i];
-            RecordReader recordReader = inputFormat.getRecordReader(split, 
job);
-            Object key = recordReader.createKey();
-            Object value = recordReader.createValue();
-            while (recordReader.next(key, value)) {
-              int index = Math.abs(partitioner.getPartition(key, value,
-                  numOfTasks));
-              writers.get(index).append(key, value);
-            }
-            LOG.debug("Done with split " + i);
-          }
-        } finally {
-          for (SequenceFile.Writer wr : writers) {
-            wr.close();
+      CompressionType compressionType = getOutputCompressionType(job);
+      Class<? extends CompressionCodec> outputCompressorClass = 
getOutputCompressorClass(
+          job, null);
+      CompressionCodec codec = null;
+      if (outputCompressorClass != null) {
+        codec = ReflectionUtils.newInstance(outputCompressorClass, job
+            .getConf());
+      }
+
+      try {
+        for (int i = 0; i < numOfTasks; i++) {
+          Path p = new Path(partitionedPath, "part-" + i);
+          if (codec == null) {
+            writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+                sampleReader.createKey().getClass(), sampleReader.createValue()
+                    .getClass(), CompressionType.NONE));
+          } else {
+            writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+                sampleReader.createKey().getClass(), sampleReader.createValue()
+                    .getClass(), compressionType, codec));
           }
         }
 
-        job.setInputFormat(SequenceFileInputFormat.class);
-        job.setInputPath(partitionedPath);
+        Partitioner partitioner = job.getPartitioner();
+        for (int i = 0; i < splits.length; i++) {
+          InputSplit split = splits[i];
+          RecordReader recordReader = inputFormat.getRecordReader(split, job);
+          Object key = recordReader.createKey();
+          Object value = recordReader.createValue();
+          while (recordReader.next(key, value)) {
+            int index = Math.abs(partitioner.getPartition(key, value,
+                numOfTasks));
+            writers.get(index).append(key, value);
+          }
+          LOG.debug("Done with split " + i);
+        }
+      } finally {
+        for (SequenceFile.Writer wr : writers) {
+          wr.close();
+        }
       }
 
+      job.setInputFormat(SequenceFileInputFormat.class);
+      job.setInputPath(partitionedPath);
     }
+
     return job;
   }
 
@@ -513,8 +503,7 @@ public class BSPJobClient extends Config
   }
 
   private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException 
{
-    InputSplit[] splits = job.getInputFormat().getSplits(job,
-        job.getNumBspTask());
+    InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
 
     final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
         submitSplitFile, splits.length);

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1205805&r1=1205804&r2=1205805&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
 (original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
 Thu Nov 24 12:30:37 2011
@@ -209,6 +209,8 @@ public abstract class FileInputFormat<K,
       if ((length != 0) && isSplitable(fs, path)) {
         long blockSize = file.getBlockSize();
         long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+        LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+            + minSize + ", " + blockSize + ")");
 
         long bytesRemaining = length;
         while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
@@ -264,8 +266,8 @@ public abstract class FileInputFormat<K,
    *          inputs for the map-reduce job.
    */
   public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
-    setInputPaths(conf,
-        StringUtils.stringToPath(getPathStrings(commaSeparatedPaths)));
+    setInputPaths(conf, StringUtils
+        .stringToPath(getPathStrings(commaSeparatedPaths)));
   }
 
   /**


Reply via email to