Author: ddas
Date: Thu May 28 11:33:56 2009
New Revision: 779571
URL: http://svn.apache.org/viewvc?rev=779571&view=rev
Log:
HADOOP-5539. Fixes a problem to do with not preserving intermediate output
compression for merged data. Contributed by Jothi Padmanabhan and Billy Pearson.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 28 11:33:56 2009
@@ -796,6 +796,10 @@
momentary spurts in memory usage due to java's fork() model.
(yhemanth)
+ HADOOP-5539. Fixes a problem to do with not preserving intermediate
+ output compression for merged data.
+ (Jothi Padmanabhan and Billy Pearson via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu May
28 11:33:56 2009
@@ -1434,7 +1434,7 @@
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
- keyClass, valClass,
+ keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Thu May
28 11:33:56 2009
@@ -97,6 +97,25 @@
mergePhase);
}
+ public static <K extends Object, V extends Object>
+ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class<K> keyClass, Class<V> valueClass,
+ CompressionCodec codec,
+ List<Segment<K, V>> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator<K> comparator, Progressable reporter,
+ boolean sortSegments,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
@@ -116,6 +135,27 @@
mergePhase);
}
+
+ static <K extends Object, V extends Object>
+ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class<K> keyClass, Class<V> valueClass,
+ CompressionCodec codec,
+ List<Segment<K, V>> segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator<K> comparator, Progressable reporter,
+ boolean sortSegments,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+}
+
public static <K extends Object, V extends Object>
void writeFile(RawKeyValueIterator records, Writer<K, V> writer,
Progressable progressable, Configuration conf)
@@ -326,6 +366,13 @@
}
}
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment<K, V>> segments, RawComparator<K> comparator,
+ Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+ this(conf, fs, segments, comparator, reporter, sortSegments);
+ this.codec = codec;
+ }
+
public void close() throws IOException {
Segment<K, V> segment;
while((segment = pop()) != null) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=779571&r1=779570&r2=779571&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu
May 28 11:33:56 2009
@@ -2323,7 +2323,7 @@
memDiskSegments.clear();
Progress mergePhase = (sortPhaseFinished) ? null : sortPhase;
RawKeyValueIterator diskMerge = Merger.merge(
- job, fs, keyClass, valueClass, diskSegments,
+ job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
mergePhase);