Author: cutting Date: Tue Oct 2 14:45:45 2007 New Revision: 581398 URL: http://svn.apache.org/viewvc?rev=581398&view=rev Log: HADOOP-1851. Permit specification of map output compression type and codec, independent of the final output's compression parameters. Contributed by Arun.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 2 14:45:45 2007 @@ -70,6 +70,11 @@ namenodes and rebalancing processes to communicate with a primary namenode. (Hairong Kuang via dhruba) + HADOOP-1851. Permit specification of map output compression type + and codec, independent of the final output's compression + parameters. (Arun C Murthy via cutting) + + OPTIMIZATIONS HADOOP-1910. Reduce the number of RPCs that DistributedFileSystem.create() Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Oct 2 14:45:45 2007 @@ -722,15 +722,22 @@ <property> <name>mapred.output.compress</name> <value>false</value> - <description>Should the outputs of the reduces be compressed? + <description>Should the job outputs be compressed? + </description> +</property> + +<property> + <name>mapred.output.compression.type</name> + <value>RECORD</value> + <description>If the job outputs are to compressed as SequenceFiles, how should + they be compressed? Should be one of NONE, RECORD or BLOCK. </description> </property> <property> <name>mapred.output.compression.codec</name> <value>org.apache.hadoop.io.compress.DefaultCodec</value> - <description>If the reduce outputs are compressed, how should they be - compressed? + <description>If the job outputs are compressed, how should they be compressed? </description> </property> @@ -739,6 +746,22 @@ <value>false</value> <description>Should the outputs of the maps be compressed before being sent across the network. Uses SequenceFile compression. + </description> +</property> + +<property> + <name>mapred.map.output.compression.type</name> + <value>RECORD</value> + <description>If the map outputs are to compressed, how should they + be compressed? Should be one of NONE, RECORD or BLOCK. + </description> +</property> + +<property> + <name>mapred.map.output.compression.codec</name> + <value>org.apache.hadoop.io.compress.DefaultCodec</value> + <description>If the map outputs are compressed, how should they be + compressed? </description> </property> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java Tue Oct 2 14:45:45 2007 @@ -24,6 +24,8 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; /** A file-based map from keys to values. * @@ -87,6 +89,16 @@ /** Create the named map for keys of the named class. */ public Writer(Configuration conf, FileSystem fs, String dirName, + Class keyClass, Class valClass, + CompressionType compress, CompressionCodec codec, + Progressable progress) + throws IOException { + this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, + compress, codec, progress); + } + + /** Create the named map for keys of the named class. */ + public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress) throws IOException { this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress); @@ -112,6 +124,15 @@ SequenceFile.CompressionType compress, Progressable progress) throws IOException { + this(conf, fs, dirName, comparator, valClass, + compress, new DefaultCodec(), progress); + } + /** Create the named map using the named key comparator. */ + public Writer(Configuration conf, FileSystem fs, String dirName, + WritableComparator comparator, Class valClass, + SequenceFile.CompressionType compress, CompressionCodec codec, + Progressable progress) + throws IOException { this.comparator = comparator; this.lastKey = comparator.newKey(); @@ -126,7 +147,7 @@ Class keyClass = comparator.getKeyClass(); this.data = SequenceFile.createWriter - (fs, conf, dataFile, keyClass, valClass, compress, progress); + (fs, conf, dataFile, keyClass, valClass, compress, codec, progress); this.index = SequenceFile.createWriter (fs, conf, indexFile, keyClass, LongWritable.class, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Oct 2 14:45:45 2007 @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.*; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.lib.IdentityMapper; @@ -296,6 +297,7 @@ /** * Should the map outputs be compressed before transfer? * Uses the SequenceFile compression. + * @param compress should the map outputs be compressed? */ public void setCompressMapOutput(boolean compress) { setBoolean("mapred.compress.map.output", compress); @@ -303,60 +305,64 @@ /** * Are the outputs of the maps be compressed? - * @return are they compressed? + * @return <code>true</code> if the outputs of the maps are to be compressed, + * <code>false</code> otherwise */ public boolean getCompressMapOutput() { return getBoolean("mapred.compress.map.output", false); } /** - * Set the compression type for the map outputs. - * @param style NONE, RECORD, or BLOCK to control how the map outputs are - * compressed + * Set the [EMAIL PROTECTED] CompressionType} for the map outputs. + * @param style the [EMAIL PROTECTED] CompressionType} to control how the map outputs + * are compressed */ - public void setMapOutputCompressionType(SequenceFile.CompressionType style) { - set("map.output.compression.type", style.toString()); + public void setMapOutputCompressionType(CompressionType style) { + set("mapred.map.output.compression.type", style.toString()); } /** - * Get the compression type for the map outputs. - * @return the compression type, defaulting to job output compression type + * Get the [EMAIL PROTECTED] CompressionType} for the map outputs. + * @return the [EMAIL PROTECTED] CompressionType} for map outputs, defaulting to + * [EMAIL PROTECTED] CompressionType#RECORD} */ - public SequenceFile.CompressionType getMapOutputCompressionType() { - String val = get("map.output.compression.type", "RECORD"); - return SequenceFile.CompressionType.valueOf(val); + public CompressionType getMapOutputCompressionType() { + String val = get("mapred.map.output.compression.type", + CompressionType.RECORD.toString()); + return CompressionType.valueOf(val); } - + /** - * Set the given class as the compression codec for the map outputs. - * @param codecClass the CompressionCodec class that will compress the - * map outputs + * Set the given class as the [EMAIL PROTECTED] CompressionCodec} for the map outputs. + * @param codecClass the [EMAIL PROTECTED] CompressionCodec} class that will compress + * the map outputs */ - public void setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { - setCompressMapOutput(true); - setClass("mapred.output.compression.codec", codecClass, + public void + setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { + setClass("mapred.map.output.compression.codec", codecClass, CompressionCodec.class); } /** - * Get the codec for compressing the map outputs - * @param defaultValue the value to return if it is not set - * @return the CompressionCodec class that should be used to compress the - * map outputs + * Get the [EMAIL PROTECTED] CompressionCodec} for compressing the map outputs. + * @param defaultValue the [EMAIL PROTECTED] CompressionCodec} to return if not set + * @return the [EMAIL PROTECTED] CompressionCodec} class that should be used to compress the + * map outputs * @throws IllegalArgumentException if the class was specified, but not found */ - public Class<? extends CompressionCodec> getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { - String name = get("mapred.output.compression.codec"); - if (name == null) { - return defaultValue; - } else { + public Class<? extends CompressionCodec> + getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { + Class<? extends CompressionCodec> codecClass = defaultValue; + String name = get("mapred.map.output.compression.codec"); + if (name != null) { try { - return getClassByName(name).asSubclass(CompressionCodec.class); + codecClass = getClassByName(name).asSubclass(CompressionCodec.class); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Compression codec " + name + " was not found.", e); } } + return codecClass; } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Tue Oct 2 14:45:45 2007 @@ -28,8 +28,12 @@ import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; /** An [EMAIL PROTECTED] OutputFormat} that writes [EMAIL PROTECTED] MapFile}s. */ public class MapFileOutputFormat extends OutputFormatBase { @@ -39,13 +43,25 @@ throws IOException { Path file = new Path(job.getOutputPath(), name); + + CompressionCodec codec = null; + CompressionType compressionType = CompressionType.NONE; + if (getCompressOutput(job)) { + // find the kind of compression to do + compressionType = SequenceFileOutputFormat.getOutputCompressionType(job); + // find the right codec + Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); + codec = (CompressionCodec) + ReflectionUtils.newInstance(codecClass, job); + } + // ignore the progress parameter, since MapFile is local final MapFile.Writer out = new MapFile.Writer(job, file.getFileSystem(job), file.toString(), job.getOutputKeyClass(), job.getOutputValueClass(), - SequenceFile.getCompressionType(job), + compressionType, codec, progress); return new RecordWriter() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Tue Oct 2 14:45:45 2007 @@ -33,54 +33,62 @@ implements OutputFormat<K, V> { /** - * Set whether the output of the reduce is compressed - * @param val the new setting + * Set whether the output of the job is compressed. + * @param conf the [EMAIL PROTECTED] JobConf} to modify + * @param compress should the output of the job be compressed? */ - public static void setCompressOutput(JobConf conf, boolean val) { - conf.setBoolean("mapred.output.compress", val); + public static void setCompressOutput(JobConf conf, boolean compress) { + conf.setBoolean("mapred.output.compress", compress); } /** - * Is the reduce output compressed? - * @return true, if the output should be compressed + * Is the job output compressed? + * @param conf the [EMAIL PROTECTED] JobConf} to look in + * @return <code>true</code> if the job output should be compressed, + * <code>false</code> otherwise */ public static boolean getCompressOutput(JobConf conf) { return conf.getBoolean("mapred.output.compress", false); } /** - * Set the given class as the output compression codec. - * @param conf the JobConf to modify - * @param codecClass the CompressionCodec class that will compress the - * reduce outputs + * Set the [EMAIL PROTECTED] CompressionCodec} to be used to compress job outputs. + * @param conf the [EMAIL PROTECTED] JobConf} to modify + * @param codecClass the [EMAIL PROTECTED] CompressionCodec} to be used to + * compress the job outputs */ - public static void setOutputCompressorClass(JobConf conf, Class codecClass) { + public static void + setOutputCompressorClass(JobConf conf, + Class<? extends CompressionCodec> codecClass) { setCompressOutput(conf, true); conf.setClass("mapred.output.compression.codec", codecClass, CompressionCodec.class); } /** - * Get the codec for compressing the reduce outputs - * @param conf the Configuration to look in - * @param defaultValue the value to return if it is not set - * @return the CompressionCodec class that should be used to compress the - * reduce outputs + * Get the [EMAIL PROTECTED] CompressionCodec} for compressing the job outputs. + * @param conf the [EMAIL PROTECTED] JobConf} to look in + * @param defaultValue the [EMAIL PROTECTED] CompressionCodec} to return if not set + * @return the [EMAIL PROTECTED] CompressionCodec} to be used to compress the + * job outputs * @throws IllegalArgumentException if the class was specified, but not found */ - public static Class getOutputCompressorClass(JobConf conf, - Class defaultValue) { + public static Class<? extends CompressionCodec> + getOutputCompressorClass(JobConf conf, + Class<? extends CompressionCodec> defaultValue) { + Class<? extends CompressionCodec> codecClass = defaultValue; + String name = conf.get("mapred.output.compression.codec"); - if (name == null) { - return defaultValue; - } else { + if (name != null) { try { - return conf.getClassByName(name); + codecClass = + conf.getClassByName(name).asSubclass(CompressionCodec.class); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Compression codec " + name + " was not found.", e); } } + return codecClass; } public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Tue Oct 2 14:45:45 2007 @@ -46,7 +46,7 @@ CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { // find the kind of compression to do - compressionType = SequenceFile.getCompressionType(job); + compressionType = getOutputCompressionType(job); // find the right codec Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); @@ -88,5 +88,29 @@ } return parts; } + + /** + * Get the [EMAIL PROTECTED] CompressionType} for the output [EMAIL PROTECTED] SequenceFile}. + * @param conf the [EMAIL PROTECTED] JobConf} + * @return the [EMAIL PROTECTED] CompressionType} for the output [EMAIL PROTECTED] SequenceFile}, + * defaulting to [EMAIL PROTECTED] CompressionType#RECORD} + */ + public static CompressionType getOutputCompressionType(JobConf conf) { + String val = conf.get("mapred.output.compression.type", + CompressionType.RECORD.toString()); + return CompressionType.valueOf(val); + } + + /** + * Set the [EMAIL PROTECTED] CompressionType} for the output [EMAIL PROTECTED] SequenceFile}. + * @param conf the [EMAIL PROTECTED] JobConf} to modify + * @param style the [EMAIL PROTECTED] CompressionType} for the output + * [EMAIL PROTECTED] SequenceFile} + */ + public static void setOutputCompressionType(JobConf conf, + CompressionType style) { + conf.set("mapred.output.compression.type", style.toString()); + } + } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=581398&r1=581397&r2=581398&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Tue Oct 2 14:45:45 2007 @@ -259,8 +259,7 @@ public void configure(JobConf conf) { this.conf = conf; - compressInput = conf.getBoolean("mapred.compress.map.output", - false); + compressInput = conf.getCompressMapOutput(); taskId = conf.get("mapred.task.id"); }