Author: ddas
Date: Thu May 28 11:37:05 2009
New Revision: 779572
URL: http://svn.apache.org/viewvc?rev=779572&view=rev
Log:
HADOOP-5539. Fixes a problem to do with not preserving intermediate output
compression for merged data. Contributed by Jothi Padmanabhan and Billy Pearson.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu May 28 11:37:05 2009
@@ -93,6 +93,10 @@
momentary spurts in memory usage due to java's fork() model.
(yhemanth)
+ HADOOP-5539. Fixes a problem to do with not preserving intermediate
+ output compression for merged data.
+ (Jothi Padmanabhan and Billy Pearson via ddas)
+
Release 0.20.0 - 2009-04-15
INCOMPATIBLE CHANGES
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
Thu May 28 11:37:05 2009
@@ -1415,7 +1415,7 @@
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
- keyClass, valClass,
+ keyClass, valClass, codec,
segmentList, job.getInt("io.sort.factor", 100),
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter,
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Merger.java
Thu May 28 11:37:05 2009
@@ -65,6 +65,23 @@
}
public static <K extends Object, V extends Object>
+ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class<K> keyClass, Class<V> valueClass,
+ CompressionCodec codec,
+ List<Segment<K, V>> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator<K> comparator, Progressable
reporter,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
+ throws IOException {
+ return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+ false, codec).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter);
+
+ }
+
+ public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
@@ -110,6 +127,25 @@
readsCounter, writesCounter);
}
+
+ static <K extends Object, V extends Object>
+ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class<K> keyClass, Class<V> valueClass,
+ CompressionCodec codec,
+ List<Segment<K, V>> segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator<K> comparator, Progressable reporter,
+ boolean sortSegments,
+ Counters.Counter readsCounter,
+ Counters.Counter writesCounter)
+ throws IOException {
+ return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter);
+}
+
public static <K extends Object, V extends Object>
void writeFile(RawKeyValueIterator records, Writer<K, V> writer,
Progressable progressable, Configuration conf)
@@ -267,6 +303,13 @@
}
}
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment<K, V>> segments, RawComparator<K> comparator,
+ Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+ this(conf, fs, segments, comparator, reporter, sortSegments);
+ this.codec = codec;
+ }
+
public void close() throws IOException {
Segment<K, V> segment;
while((segment = pop()) != null) {
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=779572&r1=779571&r2=779572&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Thu May 28 11:37:05 2009
@@ -2234,7 +2234,7 @@
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
RawKeyValueIterator diskMerge = Merger.merge(
- job, fs, keyClass, valueClass, diskSegments,
+ job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null);
diskSegments.clear();