HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by Vinayakumar B.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/645ebb96 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/645ebb96 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/645ebb96 Branch: refs/heads/HDFS-7285 Commit: 645ebb965b88cb3018fb1588268cfaf8db837431 Parents: cc02446 Author: Tsuyoshi Ozawa <[email protected]> Authored: Fri Feb 27 17:46:07 2015 +0900 Committer: Zhe Zhang <[email protected]> Committed: Mon Mar 2 09:13:53 2015 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../main/java/org/apache/hadoop/io/MapFile.java | 143 +++++++++++++++++++ .../java/org/apache/hadoop/io/TestMapFile.java | 56 ++++++++ 3 files changed, 202 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 1d9a6d4..6d4da77 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -445,6 +445,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11510. Expose truncate API via FileContext. (yliu) + HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles + to one MapFile. (Vinayakumar B via ozawa) + IMPROVEMENTS HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka) http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java index 84c9dcc..ee76458 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java @@ -25,6 +25,7 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -824,6 +825,148 @@ public class MapFile { return cnt; } + /** + * Class to merge multiple MapFiles of same Key and Value types to one MapFile + */ + public static class Merger { + private Configuration conf; + private WritableComparator comparator = null; + private Reader[] inReaders; + private Writer outWriter; + private Class<Writable> valueClass = null; + private Class<WritableComparable> keyClass = null; + + public Merger(Configuration conf) throws IOException { + this.conf = conf; + } + + /** + * Merge multiple MapFiles to one Mapfile + * + * @param inMapFiles + * @param outMapFile + * @throws IOException + */ + public void merge(Path[] inMapFiles, boolean deleteInputs, + Path outMapFile) throws IOException { + try { + open(inMapFiles, outMapFile); + mergePass(); + } finally { + close(); + } + if (deleteInputs) { + for (int i = 0; i < inMapFiles.length; i++) { + Path path = inMapFiles[i]; + delete(path.getFileSystem(conf), path.toString()); + } + } + } + + /* + * Open all input files for reading and verify the key and value types. And + * open Output file for writing + */ + @SuppressWarnings("unchecked") + private void open(Path[] inMapFiles, Path outMapFile) throws IOException { + inReaders = new Reader[inMapFiles.length]; + for (int i = 0; i < inMapFiles.length; i++) { + Reader reader = new Reader(inMapFiles[i], conf); + if (keyClass == null || valueClass == null) { + keyClass = (Class<WritableComparable>) reader.getKeyClass(); + valueClass = (Class<Writable>) reader.getValueClass(); + } else if (keyClass != reader.getKeyClass() + || valueClass != reader.getValueClass()) { + throw new HadoopIllegalArgumentException( + "Input files cannot be merged as they" + + " have different Key and Value classes"); + } + inReaders[i] = reader; + } + + if (comparator == null) { + Class<? extends WritableComparable> cls; + cls = keyClass.asSubclass(WritableComparable.class); + this.comparator = WritableComparator.get(cls, conf); + } else if (comparator.getKeyClass() != keyClass) { + throw new HadoopIllegalArgumentException( + "Input files cannot be merged as they" + + " have different Key class compared to" + + " specified comparator"); + } + + outWriter = new MapFile.Writer(conf, outMapFile, + MapFile.Writer.keyClass(keyClass), + MapFile.Writer.valueClass(valueClass)); + } + + /** + * Merge all input files to output map file.<br> + * 1. Read first key/value from all input files to keys/values array. <br> + * 2. Select the least key and corresponding value. <br> + * 3. Write the selected key and value to output file. <br> + * 4. Replace the already written key/value in keys/values arrays with the + * next key/value from the selected input <br> + * 5. Repeat step 2-4 till all keys are read. <br> + */ + private void mergePass() throws IOException { + // re-usable array + WritableComparable[] keys = new WritableComparable[inReaders.length]; + Writable[] values = new Writable[inReaders.length]; + // Read first key/value from all inputs + for (int i = 0; i < inReaders.length; i++) { + keys[i] = ReflectionUtils.newInstance(keyClass, null); + values[i] = ReflectionUtils.newInstance(valueClass, null); + if (!inReaders[i].next(keys[i], values[i])) { + // Handle empty files + keys[i] = null; + values[i] = null; + } + } + + do { + int currentEntry = -1; + WritableComparable currentKey = null; + Writable currentValue = null; + for (int i = 0; i < keys.length; i++) { + if (keys[i] == null) { + // Skip Readers reached EOF + continue; + } + if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) { + currentEntry = i; + currentKey = keys[i]; + currentValue = values[i]; + } + } + if (currentKey == null) { + // Merge Complete + break; + } + // Write the selected key/value to merge stream + outWriter.append(currentKey, currentValue); + // Replace the already written key/value in keys/values arrays with the + // next key/value from the selected input + if (!inReaders[currentEntry].next(keys[currentEntry], + values[currentEntry])) { + // EOF for this file + keys[currentEntry] = null; + values[currentEntry] = null; + } + } while (true); + } + + private void close() throws IOException { + for (int i = 0; i < inReaders.length; i++) { + IOUtils.closeStream(inReaders[i]); + inReaders[i] = null; + } + if (outWriter != null) { + outWriter.close(); + outWriter = null; + } + } + } public static void main(String[] args) throws Exception { String usage = "Usage: MapFile inFile outFile"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/645ebb96/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java index ced74fb..3f14de0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java @@ -21,6 +21,10 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -730,4 +734,56 @@ public class TestMapFile { reader.close(); } } + + @Test + public void testMerge() throws Exception { + final String TEST_METHOD_KEY = "testMerge.mapfile"; + int SIZE = 10; + int ITERATIONS = 5; + Path[] in = new Path[5]; + List<Integer> expected = new ArrayList<Integer>(); + for (int j = 0; j < 5; j++) { + try (MapFile.Writer writer = createWriter(TEST_METHOD_KEY + "." + j, + IntWritable.class, Text.class)) { + in[j] = new Path(TEST_DIR, TEST_METHOD_KEY + "." + j); + for (int i = 0; i < SIZE; i++) { + expected.add(i + j); + writer.append(new IntWritable(i + j), new Text("Value:" + (i + j))); + } + } + } + // Sort expected values + Collections.sort(expected); + // Merge all 5 files + MapFile.Merger merger = new MapFile.Merger(conf); + merger.merge(in, true, new Path(TEST_DIR, TEST_METHOD_KEY)); + + try (MapFile.Reader reader = createReader(TEST_METHOD_KEY, + IntWritable.class)) { + int start = 0; + // test iteration + Text startValue = new Text("Value:" + start); + int i = 0; + while (i++ < ITERATIONS) { + Iterator<Integer> expectedIterator = expected.iterator(); + IntWritable key = new IntWritable(start); + Text value = startValue; + IntWritable prev = new IntWritable(start); + while (reader.next(key, value)) { + assertTrue("Next key should be always equal or more", + prev.get() <= key.get()); + assertEquals(expectedIterator.next().intValue(), key.get()); + prev.set(key.get()); + } + reader.reset(); + } + } + + // inputs should be deleted + for (int j = 0; j < in.length; j++) { + Path path = in[j]; + assertFalse("inputs should be deleted", + path.getFileSystem(conf).exists(path)); + } + } }
