Author: edwardyoon
Date: Tue Sep 25 03:56:22 2012
New Revision: 1389701
URL: http://svn.apache.org/viewvc?rev=1389701&view=rev
Log:
Make the input spliter robustly
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1389701&r1=1389700&r2=1389701&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Sep 25 03:56:22 2012
@@ -8,6 +8,7 @@ Release 0.6 (unreleased changes)
BUG FIXES
+ HAMA-647: Make the input spliter robustly (Yuesheng Hu via edwardyoon)
HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu
via tjungblut)
HAMA-633: Fix CI Failures (tjungblut)
HAMA-631: Add "commons-httpclient-3.1.jar" (Paul Gyuho Song via edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1389701&r1=1389700&r2=1389701&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue
Sep 25 03:56:22 2012
@@ -97,7 +97,7 @@ public abstract class FileInputFormat<K,
BSPJob job) throws IOException;
/**
- * Set a PathFilter to be applied to the input paths for the map-reduce job.
+ * Set a PathFilter to be applied to the input paths for the BSP job.
*
* @param filter the PathFilter class use for filtering the input paths.
*/
@@ -205,6 +205,7 @@ public abstract class FileInputFormat<K,
}
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+ long goalSize = 0;
// take the short circuit path if we have already partitioned
if (numSplits == files.length) {
for (FileStatus file : files) {
@@ -214,9 +215,13 @@ public abstract class FileInputFormat<K,
}
}
return splits.toArray(new FileSplit[splits.size()]);
+ } else if (files.length == 1) {
+ goalSize = totalSize / (numSplits == 0 ? 1 : numSplits - 1);
+ } else {
+ goalSize = totalSize
+ / (numSplits == 0 ? 1 : numSplits - files.length / 2 + 1);
}
-
- long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
+ LOG.debug("numSplits: " + numSplits);
long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
minSplitSize);
@@ -232,7 +237,7 @@ public abstract class FileInputFormat<K,
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
- LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+ LOG.info("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+ minSize + ", " + blockSize + ")");
long bytesRemaining = length;
@@ -264,7 +269,11 @@ public abstract class FileInputFormat<K,
}
protected long computeSplitSize(long goalSize, long minSize, long blockSize)
{
- return Math.max(minSize, Math.min(goalSize, blockSize));
+ if (goalSize > blockSize) {
+ return Math.max(minSize, Math.max(goalSize, blockSize));
+ } else {
+ return Math.max(minSize, Math.min(goalSize, blockSize));
+ }
}
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
@@ -283,12 +292,11 @@ public abstract class FileInputFormat<K,
}
/**
- * Sets the given comma separated paths as the list of inputs for the
- * map-reduce job.
+ * Sets the given comma separated paths as the list of inputs for the BSP
job.
*
* @param conf Configuration of the job
* @param commaSeparatedPaths Comma separated paths to be set as the list of
- * inputs for the map-reduce job.
+ * inputs for the BSP job.
*/
public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
setInputPaths(conf,
@@ -296,12 +304,11 @@ public abstract class FileInputFormat<K,
}
/**
- * Add the given comma separated paths to the list of inputs for the
- * map-reduce job.
+ * Add the given comma separated paths to the list of inputs for the BSP job.
*
* @param conf The configuration of the job
* @param commaSeparatedPaths Comma separated paths to be added to the list
of
- * inputs for the map-reduce job.
+ * inputs for the BSP job.
*/
public static void addInputPaths(BSPJob conf, String commaSeparatedPaths) {
for (String str : getPathStrings(commaSeparatedPaths)) {
@@ -310,12 +317,11 @@ public abstract class FileInputFormat<K,
}
/**
- * Set the array of {@link Path}s as the list of inputs for the map-reduce
- * job.
+ * Set the array of {@link Path}s as the list of inputs for the BSP job.
*
* @param conf Configuration of the job.
* @param inputPaths the {@link Path}s of the input directories/files for the
- * map-reduce job.
+ * BSP job.
*/
public static void setInputPaths(BSPJob conf, Path... inputPaths) {
Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
@@ -330,11 +336,10 @@ public abstract class FileInputFormat<K,
}
/**
- * Add a {@link Path} to the list of inputs for the map-reduce job.
+ * Add a {@link Path} to the list of inputs for the BSP job.
*
* @param conf The configuration of the job
- * @param path {@link Path} to be added to the list of inputs for the
- * map-reduce job.
+ * @param path {@link Path} to be added to the list of inputs for the BSP
job.
*/
public static void addInputPath(BSPJob conf, Path p) {
Path path = new Path(conf.getWorkingDirectory(), p);
@@ -384,10 +389,10 @@ public abstract class FileInputFormat<K,
}
/**
- * Get the list of input {@link Path}s for the map-reduce job.
+ * Get the list of input {@link Path}s for the BSP job.
*
* @param conf The configuration of the job
- * @return the list of input {@link Path}s for the map-reduce job.
+ * @return the list of input {@link Path}s for the BSP job.
*/
public static Path[] getInputPaths(BSPJob conf) {
String dirs = conf.getConf().get("bsp.input.dir", "");