Author: cdouglas
Date: Tue Dec 16 02:17:02 2008
New Revision: 727006
URL: http://svn.apache.org/viewvc?rev=727006&view=rev
Log:
HADOOP-4845. Modify the reduce input byte counter to record only the
compressed size and add a human-readable label. Contributed by Yongqiang He
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 02:17:02 2008
@@ -428,6 +428,9 @@
HADOOP-3921. Fixed clover (code coverage) target to work with JDK 6.
(tomwhite via nigel)
+ HADOOP-4845. Modify the reduce input byte counter to record only the
+ compressed size and add a human-readable label. (Yongqiang He via cdouglas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Tue Dec 16 02:17:02 2008
@@ -111,8 +111,8 @@
private Progress copyPhase;
private Progress sortPhase;
private Progress reducePhase;
- private Counters.Counter reduceInputBytes =
- getCounters().findCounter(Counter.REDUCE_INPUT_BYTES);
+ private Counters.Counter reduceShuffleBytes =
+ getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
private Counters.Counter reduceInputKeyCounter =
getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
private Counters.Counter reduceInputValueCounter =
@@ -380,7 +380,6 @@
throw new IOException("Task: " + getTaskID() +
" - The reduce copier failed", reduceCopier.mergeThrowable);
}
- reduceInputBytes.increment(reduceCopier.reducerInputBytes);
}
copyPhase.complete(); // copy is already complete
setPhase(TaskStatus.Phase.SORT);
@@ -919,7 +918,7 @@
byte[] data;
final boolean inMemory;
- long size;
+ long compressedSize;
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
Configuration conf, Path file, long size) {
@@ -928,14 +927,14 @@
this.conf = conf;
this.file = file;
- this.size = size;
+ this.compressedSize = size;
this.data = null;
this.inMemory = false;
}
- public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data) {
+ public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data,
int compressedLength) {
this.mapId = mapId;
this.mapAttemptId = mapAttemptId;
@@ -943,7 +942,7 @@
this.conf = null;
this.data = data;
- this.size = data.length;
+ this.compressedSize = compressedLength;
this.inMemory = true;
}
@@ -1262,7 +1261,7 @@
}
// The size of the map-output
- long bytes = mapOutput.size;
+ long bytes = mapOutput.compressedSize;
// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
@@ -1467,7 +1466,7 @@
byte[] shuffleData = new byte[mapOutputLength];
MapOutput mapOutput =
new MapOutput(mapOutputLoc.getTaskId(),
- mapOutputLoc.getTaskAttemptId(), shuffleData);
+ mapOutputLoc.getTaskAttemptId(), shuffleData,
compressedLength);
int bytesRead = 0;
try {
@@ -1741,12 +1740,10 @@
return numInFlight > maxInFlight;
}
- long reducerInputBytes = 0;
public boolean fetchOutputs() throws IOException {
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
- long bytesTransferred = 0;
DecimalFormat mbpsFormat = new DecimalFormat("0.00");
final Progress copyPhase =
reduceTask.getProgress().phase();
@@ -1938,12 +1935,11 @@
if (cr.getSuccess()) { // a successful copy
numCopied++;
lastProgressTime = System.currentTimeMillis();
- bytesTransferred += cr.getSize();
- reducerInputBytes += cr.getSize();
+ reduceShuffleBytes.increment(cr.getSize());
long secsSinceStart =
(System.currentTimeMillis()-startTime)/1000+1;
- float mbs = ((float)bytesTransferred)/(1024*1024);
+ float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
float transferRate = mbs/secsSinceStart;
copyPhase.startNextPhase();
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
Tue Dec 16 02:17:02 2008
@@ -63,7 +63,7 @@
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
- REDUCE_INPUT_BYTES,
+ REDUCE_SHUFFLE_BYTES,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
REDUCE_SKIPPED_GROUPS,
Modified:
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=727006&r1=727005&r2=727006&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
(original)
+++
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
Tue Dec 16 02:17:02 2008
@@ -10,6 +10,7 @@
COMBINE_INPUT_RECORDS.name= Combine input records
COMBINE_OUTPUT_RECORDS.name= Combine output records
REDUCE_INPUT_GROUPS.name= Reduce input groups
+REDUCE_SHUFFLE_BYTES.name= Reduce shuffle bytes
REDUCE_INPUT_RECORDS.name= Reduce input records
REDUCE_OUTPUT_RECORDS.name= Reduce output records
REDUCE_SKIPPED_RECORDS.name= Reduce skipped records