Author: edwardyoon
Date: Wed Apr 15 01:43:42 2015
New Revision: 1673611
URL: http://svn.apache.org/r1673611
Log:
HAMA-949: File splits based on number of input files
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1673611&r1=1673610&r2=1673611&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Apr
15 01:43:42 2015
@@ -349,8 +349,14 @@ public class BSPJobClient extends Config
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- InputSplit[] splits = job.getInputFormat().getSplits(job, maxTasks);
+ InputSplit[] splits = job.getInputFormat().getSplits(job, configured);
+ if (maxTasks < splits.length) {
+ throw new IOException(
+ "Job failed! The number of splits has exceeded the number of max
tasks. The number of splits: "
+ + splits.length + ", The number of max tasks: " + maxTasks);
+ }
+
/*
job = partition(job, splits, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
@@ -358,12 +364,6 @@ public class BSPJobClient extends Config
if (job.getBoolean("input.has.partitioned", false)) {
splits = job.getInputFormat().getSplits(job, maxTasks);
}
-
- if (maxTasks < splits.length) {
- throw new IOException(
- "Job failed! The number of splits has exceeded the number of max
tasks. The number of splits: "
- + splits.length + ", The number of max tasks: " + maxTasks);
- }
*/
job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
@@ -631,10 +631,7 @@ public class BSPJobClient extends Config
for (int i = 0; i < len; ++i) {
RawSplit split = new RawSplit();
split.readFields(in);
- if (split.getPartitionID() != Integer.MIN_VALUE)
- result[split.getPartitionID()] = split;
- else
- result[i] = split;
+ result[i] = split;
}
return result;
}
@@ -1080,7 +1077,6 @@ public class BSPJobClient extends Config
private String splitClass;
private BytesWritable bytes = new BytesWritable();
private String[] locations;
- private int partitionID = Integer.MIN_VALUE;
long dataLength;
public void setBytes(byte[] data, int offset, int length) {
@@ -1091,14 +1087,6 @@ public class BSPJobClient extends Config
splitClass = className;
}
- public void setPartitionID(int id) {
- this.partitionID = id;
- }
-
- public int getPartitionID() {
- return partitionID;
- }
-
public String getClassName() {
return splitClass;
}
@@ -1123,7 +1111,6 @@ public class BSPJobClient extends Config
public void readFields(DataInput in) throws IOException {
splitClass = Text.readString(in);
dataLength = in.readLong();
- partitionID = in.readInt();
bytes.readFields(in);
int len = WritableUtils.readVInt(in);
locations = new String[len];
@@ -1136,7 +1123,6 @@ public class BSPJobClient extends Config
public void write(DataOutput out) throws IOException {
Text.writeString(out, splitClass);
out.writeLong(dataLength);
- out.writeInt(partitionID);
bytes.write(out);
WritableUtils.writeVInt(out, locations.length);
for (String location : locations) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1673611&r1=1673610&r2=1673611&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Wed
Apr 15 01:43:42 2015
@@ -178,6 +178,18 @@ public abstract class FileInputFormat<K,
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
FileStatus[] files = listStatus(job);
+
+ // take the short circuit path if we have already partitioned
+ if (numSplits == files.length) {
+ for (FileStatus file : files) {
+ if (file != null) {
+ splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
+ new String[0]));
+ }
+ }
+ return splits.toArray(new FileSplit[splits.size()]);
+ }
+
for (FileStatus file : files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1673611&r1=1673610&r2=1673611&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Wed
Apr 15 01:43:42 2015
@@ -242,7 +242,6 @@ public class LocalBSPRunner implements J
String splitname = null;
BytesWritable realBytes = null;
if (splits != null) {
- LOG.debug(id + ", " + splits[id].getPartitionID());
splitname = splits[id].getClassName();
realBytes = splits[id].getBytes();
}