Author: edwardyoon
Date: Fri Sep 28 01:15:23 2012
New Revision: 1391277
URL: http://svn.apache.org/viewvc?rev=1391277&view=rev
Log:
Revert HAMA-647 patch.
Added:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1391277&r1=1391276&r2=1391277&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Fri
Sep 28 01:15:23 2012
@@ -175,37 +175,17 @@ public abstract class FileInputFormat<K,
}
/**
- * Splits files returned by {@link #listStatus(JobConf)} when they're too
big.
+ * Splits files returned by {@link #listStatus(BSPJob)} when they're too big.
*/
@Override
public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
FileStatus[] files = listStatus(job);
- long totalSize = 0; // compute total size
- for (int i = 0; i < files.length; i++) { // check we have valid files
- FileStatus file = files[i];
- if (file.isDir()) {
- final Path path = file.getPath();
- if (path.getName().equals("hama-partitions")
- || (job.get("bsp.partitioning.dir") != null && path.getName()
- .equals(job.get("bsp.partitioning.dir")))) {
- // if we find the partitioning dir, just remove it.
- LOG.warn("Removing already existing partitioning directory " + path);
- FileSystem fileSystem = path.getFileSystem(job.getConf());
- if (!fileSystem.delete(path, true)) {
- LOG.error("Remove failed.");
- }
- // remove this file from our initial list
- files[i] = null;
- } else {
- throw new IOException("Not a file (dir): " + path);
- }
- }
- totalSize += file.getLen();
- }
+ long totalSize = computeTotalSize(job, files);
+ long goalSize = computeGoalSize(numSplits, totalSize);
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
- long goalSize = 0;
+
// take the short circuit path if we have already partitioned
if (numSplits == files.length) {
for (FileStatus file : files) {
@@ -215,13 +195,9 @@ public abstract class FileInputFormat<K,
}
}
return splits.toArray(new FileSplit[splits.size()]);
- } else if (files.length == 1) {
- goalSize = totalSize / (numSplits == 0 ? 1 : numSplits - 1);
- } else {
- goalSize = totalSize
- / (numSplits == 0 ? 1 : numSplits - files.length / 2 + 1);
}
- LOG.debug("numSplits: " + numSplits);
+
+ LOG.info("numSplits: " + numSplits);
long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
minSplitSize);
@@ -268,6 +244,37 @@ public abstract class FileInputFormat<K,
return splits.toArray(new FileSplit[splits.size()]);
}
+ protected long computeTotalSize(BSPJob job, FileStatus[] files)
+ throws IOException {
+ long totalSize = 0L;
+ for (int i = 0; i < files.length; i++) { // check we have valid files
+ FileStatus file = files[i];
+ if (file.isDir()) {
+ final Path path = file.getPath();
+ if (path.getName().equals("hama-partitions")
+ || (job.get("bsp.partitioning.dir") != null && path.getName()
+ .equals(job.get("bsp.partitioning.dir")))) {
+ // if we find the partitioning dir, just remove it.
+ LOG.warn("Removing already existing partitioning directory " + path);
+ FileSystem fileSystem = path.getFileSystem(job.getConf());
+ if (!fileSystem.delete(path, true)) {
+ LOG.error("Remove failed.");
+ }
+ // remove this file from our initial list
+ files[i] = null;
+ } else {
+ throw new IOException("Not a file (dir): " + path);
+ }
+ }
+ totalSize += file.getLen();
+ }
+ return totalSize;
+ }
+
+ protected long computeGoalSize(int numSplits, long totalSize) {
+ return totalSize / (numSplits == 0 ? 1 : numSplits);
+ }
+
protected long computeSplitSize(long goalSize, long minSize, long blockSize)
{
if (goalSize > blockSize) {
return Math.max(minSize, Math.max(goalSize, blockSize));
Added:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java?rev=1391277&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
(added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
Fri Sep 28 01:15:23 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import junit.framework.TestCase;
+
+public class TestFileInputFormat extends TestCase {
+
+ public void testComputeGoalSize() throws Exception {
+
+ TextInputFormat input = new TextInputFormat();
+ assertEquals(1000, input.computeGoalSize(10, 10000));
+
+ }
+}