Author: cutting Date: Mon Jun 25 15:59:54 2007 New Revision: 550635 URL: http://svn.apache.org/viewvc?view=rev&rev=550635 Log: HADOOP-1515. Add MultiFileInputFormat, which packs multiple files per split. Contributed by Enis Soztutar.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java Modified: lucene/hadoop/trunk/CHANGES.txt Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=550635&r1=550634&r2=550635 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 25 15:59:54 2007 @@ -251,6 +251,9 @@ to file properties to be through a new FileStatus interface. (Dhruba Borthakur via cutting) + 77. HADOOP-1515. Add MultiFileInputFormat, which can pack multiple, + typically small, input files into each split. (Enis Soztutar via cutting) + Release 0.13.0 - 2007-06-08 Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java?view=auto&rev=550635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java Mon Jun 25 15:59:54 2007 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * An abstract [EMAIL PROTECTED] InputFormat} that returns [EMAIL PROTECTED] MultiFileSplit}'s + * in [EMAIL PROTECTED] #getSplits(JobConf, int)} method. Splits are constructed from + * the files under the input paths. Each split returned contains <i>nearly</i> + * equal content length. <br> + * Subclasses implement [EMAIL PROTECTED] #getRecordReader(InputSplit, JobConf, Reporter)} + * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s. + * @see MultiFileSplit + */ +public abstract class MultiFileInputFormat extends FileInputFormat { + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + + MultiFileSplit[] splits = new MultiFileSplit[numSplits]; + Path[] paths = listPaths(job); + long[] lengths = new long[paths.length]; + long totLength = 0; + for(int i=0; i<paths.length; i++) { + FileSystem fs = paths[i].getFileSystem(job); + lengths[i] = fs.getContentLength(paths[i]); + totLength += lengths[i]; + } + float avgLengthPerSplit = ((float)totLength) / numSplits; + long cumulativeLength = 0; + + int startIndex = 0; + + for(int i=0; i<numSplits; i++) { + int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength + , startIndex, lengths); + Path[] splitPaths = new Path[splitSize]; + long[] splitLengths = new long[splitSize]; + System.arraycopy(paths, startIndex, splitPaths , 0, splitSize); + System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize); + splits[i] = new MultiFileSplit(job, splitPaths, splitLengths); + startIndex += splitSize; + for(long l: splitLengths) { + cumulativeLength += l; + } + } + return splits; + + } + + private int findSize(int splitIndex, float avgLengthPerSplit + , long cumulativeLength , int startIndex, long[] lengths) { + + if(splitIndex == lengths.length - 1) + return lengths.length - startIndex; + + long goalLength = (long)((splitIndex + 1) * avgLengthPerSplit); + int partialLength = 0; + // accumulate till just above the goal length; + for(int i = startIndex; i < lengths.length; i++) { + partialLength += lengths[i]; + if(partialLength + cumulativeLength >= goalLength) { + return i - startIndex + 1; + } + } + return lengths.length - startIndex; + } + + public abstract RecordReader getRecordReader(InputSplit split, + JobConf job, Reporter reporter) + throws IOException; +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java?view=auto&rev=550635 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java Mon Jun 25 15:59:54 2007 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +/** + * A sub-collection of input files. Unlike [EMAIL PROTECTED] FileSplit}, MultiFileSplit + * class does not represent a split of a file, but a split of input files + * into smaller sets. The atomic unit of split is a file. <br> + * MultiFileSplit can be used to implement [EMAIL PROTECTED] RecordReader}'s, with + * reading one record per file. + * @see FileSplit + * @see MultiFileInputFormat + */ +public class MultiFileSplit implements InputSplit { + + private Path[] paths; + private long[] lengths; + private long totLength; + private JobConf job; + + MultiFileSplit() {} + + public MultiFileSplit(JobConf job, Path[] files, long[] lengths) { + this.job = job; + this.lengths = lengths; + this.paths = files; + this.totLength = 0; + for(long length : lengths) { + totLength += length; + } + } + + public long getLength() { + return totLength; + } + + /** Returns an array containing the lengths of the files in + * the split*/ + public long[] getLengths() { + return lengths; + } + + /** Returns the length of the i<sup>th</sup> Path */ + public long getLength(int i) { + return lengths[i]; + } + + /** Returns the number of Paths in the split */ + public int getNumPaths() { + return paths.length; + } + + /** Returns the i<sup>th</sup> Path */ + public Path getPath(int i) { + return paths[i]; + } + + /** Returns all the Paths in the split */ + public Path[] getPaths() { + return paths; + } + + public String[] getLocations() throws IOException { + HashSet<String> hostSet = new HashSet<String>(); + for (Path file : paths) { + String[][] hints = FileSystem.get(job) + .getFileCacheHints(file, 0, FileSystem.get(job).getLength(file)); + if (hints != null && hints.length > 0) { + addToSet(hostSet, hints[0]); + } + } + return hostSet.toArray(new String[hostSet.size()]); + } + + private void addToSet(Set<String> set, String[] array) { + for(String s:array) + set.add(s); + } + + public void readFields(DataInput in) throws IOException { + int arrLength = in.readInt(); + lengths = new long[arrLength]; + for(int i=0; i<arrLength;i++) { + lengths[i] = in.readLong(); + } + int filesLength = in.readInt(); + paths = new Path[filesLength]; + for(int i=0; i<filesLength;i++) { + paths[i] = new Path(Text.readString(in)); + } + } + + public void write(DataOutput out) throws IOException { + out.writeInt(lengths.length); + for(long length : lengths) + out.writeLong(length); + out.writeInt(paths.length); + for(Path p : paths) { + Text.writeString(out, p.toString()); + } + } +} + Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java?view=auto&rev=550635 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java Mon Jun 25 15:59:54 2007 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.BitSet; +import java.util.HashMap; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import junit.framework.TestCase; + +public class TestMultiFileInputFormat extends TestCase{ + + private static JobConf job = new JobConf(); + private static final Log LOG = LogFactory.getLog(TestMultiFileInputFormat.class); + + private static final int MAX_SPLIT_COUNT = 10000; + private static final int SPLIT_COUNT_INCR = 6000; + private static final int MAX_BYTES = 1024; + private static final int MAX_NUM_FILES = 10000; + private static final int NUM_FILES_INCR = 8000; + + private Random rand = new Random(System.currentTimeMillis()); + private HashMap<String, Long> lengths = new HashMap<String, Long>(); + + /** Dummy class to extend MultiFileInputFormat*/ + private class DummyMultiFileInputFormat extends MultiFileInputFormat { + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job + , Reporter reporter) throws IOException { + return null; + } + } + + private Path initFiles(FileSystem fs, int numFiles) throws IOException{ + Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + Path multiFileDir = new Path(dir, "test.multifile"); + fs.delete(multiFileDir); + fs.mkdirs(multiFileDir); + LOG.info("Creating " + numFiles + " file(s) in " + multiFileDir); + for(int i=0; i<numFiles ;i++) { + Path path = new Path(multiFileDir, "file_" + i); + FSDataOutputStream out = fs.create(path); + int numBytes = rand.nextInt(MAX_BYTES); + for(int j=0; j< numBytes; j++) { + out.write(rand.nextInt()); + } + out.close(); + if(LOG.isDebugEnabled()) { + LOG.debug("Created file " + path + " with length " + numBytes); + } + lengths.put(path.getName(), new Long(numBytes)); + } + job.setInputPath(multiFileDir); + return multiFileDir; + } + + public void testFormat() throws IOException { + if(LOG.isInfoEnabled()) { + LOG.info("Test started"); + LOG.info("Max split count = " + MAX_SPLIT_COUNT); + LOG.info("Split count increment = " + SPLIT_COUNT_INCR); + LOG.info("Max bytes per file = " + MAX_BYTES); + LOG.info("Max number of files = " + MAX_NUM_FILES); + LOG.info("Number of files increment = " + NUM_FILES_INCR); + } + + MultiFileInputFormat format = new DummyMultiFileInputFormat(); + FileSystem fs = FileSystem.getLocal(job); + + for(int numFiles = 1; numFiles< MAX_NUM_FILES ; + numFiles+= (NUM_FILES_INCR / 2) + rand.nextInt(NUM_FILES_INCR / 2)) { + + Path dir = initFiles(fs, numFiles); + BitSet bits = new BitSet(numFiles); + for(int i=1;i< MAX_SPLIT_COUNT ;i+= rand.nextInt(SPLIT_COUNT_INCR) + 1) { + LOG.info("Running for Num Files=" + numFiles + ", split count=" + i); + + MultiFileSplit[] splits = (MultiFileSplit[])format.getSplits(job, i); + bits.clear(); + + for(MultiFileSplit split : splits) { + long splitLength = 0; + for(Path p : split.getPaths()) { + long length = fs.getContentLength(p); + assertEquals(length, lengths.get(p.getName()).longValue()); + splitLength += length; + String name = p.getName(); + int index = Integer.parseInt( + name.substring(name.lastIndexOf("file_") + 5)); + assertFalse(bits.get(index)); + bits.set(index); + } + assertEquals(splitLength, split.getLength()); + } + } + assertEquals(bits.cardinality(), numFiles); + fs.delete(dir); + } + LOG.info("Test Finished"); + } + + public static void main(String[] args) throws Exception{ + TestMultiFileInputFormat test = new TestMultiFileInputFormat(); + test.testFormat(); + } +}