Author: ddas
Date: Fri Aug 29 01:19:36 2008
New Revision: 690142
URL: http://svn.apache.org/viewvc?rev=690142&view=rev
Log:
HADOOP-3828. Provides a way to write skipped records to DFS. Contributed by
Sharad Agarwal.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Aug 29 01:19:36 2008
@@ -108,6 +108,9 @@
HADOOP-3754. Add a thrift interface to access HDFS. (dhruba via omalley)
+ HADOOP-3828. Provides a way to write skipped records to DFS.
+ (Sharad Agarwal via ddas)
+
IMPROVEMENTS
HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
Modified:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingBadRecords.java
Fri Aug 29 01:19:36 2008
@@ -121,13 +121,12 @@
public void testDisableSkip() throws Exception {
JobConf clusterConf = createJobConf();
createInput();
-
+ int attSkip =0;
+ SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
- int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
- +1+MAPPER_BAD_RECORDS.size();
- int reducerAttempts =
SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
- +1+REDUCER_BAD_RECORDS.size();
+ int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
+ int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
"-output", getOutputDir().toString(),
@@ -135,6 +134,7 @@
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+ "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
"-jobconf", "mapred.map.max.attempts="+mapperAttempts,
"-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
"-jobconf", "mapred.skip.mode.enabled=false",
@@ -157,13 +157,12 @@
public void testSkip() throws Exception {
JobConf clusterConf = createJobConf();
createInput();
-
+ int attSkip =0;
+ SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
- int mapperAttempts = SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
- +1+MAPPER_BAD_RECORDS.size();
- int reducerAttempts =
SkipBadRecords.getAttemptsToStartSkipping(clusterConf)
- +1+REDUCER_BAD_RECORDS.size();
+ int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
+ int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
String[] args = new String[] {
"-input", (new Path(getInputDir(), "text.txt")).toString(),
@@ -172,6 +171,7 @@
"-reducer", badReducer,
"-verbose",
"-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
+ "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
"-jobconf", "mapred.map.max.attempts="+mapperAttempts,
"-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
"-jobconf", "mapred.map.tasks=1",
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Aug
29 01:19:36 2008
@@ -45,7 +45,9 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -53,6 +55,7 @@
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.IFile.Reader;
import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
@@ -143,19 +146,14 @@
private RecordReader<K,V> rawIn;
private Counters.Counter inputByteCounter;
private Counters.Counter inputRecordCounter;
- private Iterator<Long> skipFailedRecIndexIterator;
- private TaskUmbilicalProtocol umbilical;
- private long recIndex = -1;
private long beforePos = -1;
private long afterPos = -1;
- TrackedRecordReader(RecordReader<K,V> raw, Counters counters,
- TaskUmbilicalProtocol umbilical) {
+ TrackedRecordReader(RecordReader<K,V> raw, Counters counters)
+ throws IOException{
rawIn = raw;
- this.umbilical = umbilical;
inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
- skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
}
public K createKey() {
@@ -169,28 +167,22 @@
public synchronized boolean next(K key, V value)
throws IOException {
boolean ret = moveToNext(key, value);
- if(isSkipping() && ret) {
- long nextRecIndex = skipFailedRecIndexIterator.next();
- long skip = nextRecIndex - recIndex;
- for(int i=0;i<skip && ret;i++) {
- ret = moveToNext(key, value);
- }
- getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
- reportNextRecordRange(umbilical, nextRecIndex);
- }
if (ret) {
- inputRecordCounter.increment(1);
- inputByteCounter.increment(afterPos - beforePos);
+ incrCounters();
}
return ret;
}
+
+ protected void incrCounters() {
+ inputRecordCounter.increment(1);
+ inputByteCounter.increment(afterPos - beforePos);
+ }
- private synchronized boolean moveToNext(K key, V value)
+ protected synchronized boolean moveToNext(K key, V value)
throws IOException {
setProgress(getProgress());
beforePos = getPos();
boolean ret = rawIn.next(key, value);
- recIndex++;
afterPos = getPos();
return ret;
}
@@ -202,6 +194,68 @@
}
}
+ /**
+ * This class skips the records based on the failed ranges from previous
+ * attempts.
+ */
+ class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
+ private SkipRangeIterator skipIt;
+ private SequenceFile.Writer skipWriter;
+ private TaskUmbilicalProtocol umbilical;
+ private Counters.Counter skipRecCounter;
+ private long recIndex = -1;
+
+ SkippingRecordReader(RecordReader<K,V> raw, Counters counters,
+ TaskUmbilicalProtocol umbilical) throws IOException{
+ super(raw,counters);
+ this.umbilical = umbilical;
+ this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
+ skipIt = getFailedRanges().skipRangeIterator();
+ }
+
+ public synchronized boolean next(K key, V value)
+ throws IOException {
+ boolean ret = moveToNext(key, value);
+ long nextRecIndex = skipIt.next();
+ long skip = nextRecIndex - recIndex;
+ for(int i=0;i<skip && ret;i++) {
+ writeSkippedRec(key, value);
+ ret = moveToNext(key, value);
+ }
+ //close the skip writer once all the ranges are skipped
+ if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
+ skipWriter.close();
+ }
+ skipRecCounter.increment(skip);
+ reportNextRecordRange(umbilical, nextRecIndex);
+ if (ret) {
+ incrCounters();
+ }
+ return ret;
+ }
+
+ protected synchronized boolean moveToNext(K key, V value)
+ throws IOException {
+ recIndex++;
+ return super.moveToNext(key, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void writeSkippedRec(K key, V value) throws IOException{
+ if(skipWriter==null) {
+ Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+ Path skipFile = new Path(skipDir, getTaskID().toString());
+ skipWriter =
+ SequenceFile.createWriter(
+ skipFile.getFileSystem(conf), conf, skipFile,
+ (Class<K>) createKey().getClass(),
+ (Class<V>) createValue().getClass(),
+ CompressionType.BLOCK, getReporter(umbilical));
+ }
+ skipWriter.append(key, value);
+ }
+ }
+
@Override
@SuppressWarnings("unchecked")
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
@@ -244,7 +298,9 @@
RecordReader rawIn = // open input
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
- RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
+ RecordReader in = isSkipping() ?
+ new SkippingRecordReader(rawIn, getCounters(), umbilical) :
+ new TrackedRecordReader(rawIn, getCounters());
job.setBoolean("mapred.skip.on", isSkipping());
MapRunnable runner =
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri
Aug 29 01:19:36 2008
@@ -56,16 +56,20 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.IFile.*;
import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapred.Task.Counter;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -232,6 +236,10 @@
@Override
public VALUE next() {
reduceInputValueCounter.increment(1);
+ return moveToNext();
+ }
+
+ protected VALUE moveToNext() {
return super.next();
}
@@ -243,9 +251,14 @@
private class SkippingReduceValuesIterator<KEY,VALUE>
extends ReduceValuesIterator<KEY,VALUE> {
- private Iterator<Long> skipFailedRecIndexIterator;
+ private SkipRangeIterator skipIt;
private TaskUmbilicalProtocol umbilical;
+ private Counters.Counter skipGroupCounter;
+ private Counters.Counter skipRecCounter;
private long recIndex = -1;
+ private Class<KEY> keyClass;
+ private Class<VALUE> valClass;
+ private SequenceFile.Writer skipWriter;
public SkippingReduceValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
@@ -253,7 +266,13 @@
TaskUmbilicalProtocol umbilical) throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
this.umbilical = umbilical;
- skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
+ this.skipGroupCounter =
+ getCounters().findCounter(Counter.REDUCE_SKIPPED_GROUPS);
+ this.skipRecCounter =
+ getCounters().findCounter(Counter.REDUCE_SKIPPED_RECORDS);
+ this.keyClass = keyClass;
+ this.valClass = valClass;
+ skipIt = getFailedRanges().skipRangeIterator();
mayBeSkip();
}
@@ -264,15 +283,38 @@
private void mayBeSkip() throws IOException {
recIndex++;
- long nextRecIndex = skipFailedRecIndexIterator.next();
+ long nextRecIndex = skipIt.next();
long skip = nextRecIndex - recIndex;
+ long skipRec = 0;
for(int i=0;i<skip && super.more();i++) {
+ while (hasNext()) {
+ writeSkippedRec(getKey(), moveToNext());
+ skipRec++;
+ }
super.nextKey();
recIndex++;
}
- getCounters().incrCounter(Counter.REDUCE_SKIPPED_RECORDS, skip);
+ //close the skip writer once all the ranges are skipped
+ if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
+ skipWriter.close();
+ }
+ skipGroupCounter.increment(skip);
+ skipRecCounter.increment(skipRec);
reportNextRecordRange(umbilical, nextRecIndex);
}
+
+ @SuppressWarnings("unchecked")
+ private void writeSkippedRec(KEY key, VALUE value) throws IOException{
+ if(skipWriter==null) {
+ Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+ Path skipFile = new Path(skipDir, getTaskID().toString());
+ skipWriter = SequenceFile.createWriter(
+ skipFile.getFileSystem(conf), conf, skipFile,
+ keyClass, valClass,
+ CompressionType.BLOCK, getReporter(umbilical));
+ }
+ skipWriter.append(key, value);
+ }
}
@Override
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SkipBadRecords.java
Fri Aug 29 01:19:36 2008
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
/**
* Utility class for skip bad records functionality. It contains various
@@ -33,6 +34,7 @@
"mapred.skip.map.auto.incr.proc.count";
private static final String AUTO_INCR_REDUCE_PROC_COUNT =
"mapred.skip.reduce.auto.incr.proc.count";
+ private static final String OUT_PATH = "mapred.skip.out.dir";
/**
* Is skipping of bad records enabled. If it is enabled
@@ -158,4 +160,31 @@
conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
}
+ /**
+ * Get the directory to which skipped records are written. By default it is
+ * the sub directory of the output _logs directory.
+ * @param conf the configuration.
+ * @return path skip output directory. Null is returned if this is not set
+ * and output directory is also not set.
+ */
+ public static Path getSkipOutputPath(Configuration conf) {
+ String name = conf.get(OUT_PATH);
+ if(name!=null) {
+ return new Path(name);
+ }
+ Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
+ return outPath==null ? null : new Path(outPath,
+ "_logs"+Path.SEPARATOR+"skip");
+ }
+
+ /**
+ * Set the directory to which skipped records are written. By default it is
+ * the sub directory of the output _logs directory.
+ * @param conf the configuration.
+ * @param path skip output directory path
+ */
+ public static void setSkipOutputPath(JobConf conf, Path path) {
+ conf.set(OUT_PATH, path.toString());
+ }
+
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SortedRanges.java Fri
Aug 29 01:19:36 2008
@@ -35,28 +35,28 @@
* Provides the SkipRangeIterator, which skips the Ranges
* stored in this object.
*/
-public class SortedRanges implements Writable{
+class SortedRanges implements Writable{
private static final Log LOG =
LogFactory.getLog(SortedRanges.class);
- private SortedSet<Range> ranges = new TreeSet<Range>();
- private int indicesCount;
+ private TreeSet<Range> ranges = new TreeSet<Range>();
+ private long indicesCount;
/**
* Get Iterator which skips the stored ranges.
* The Iterator.next() call return the index starting from 0.
- * @return Iterator<Long>
+ * @return SkipRangeIterator
*/
- public Iterator<Long> skipRangeIterator(){
- return new SkipRangeIterator();
+ synchronized SkipRangeIterator skipRangeIterator(){
+ return new SkipRangeIterator(ranges.iterator());
}
/**
* Get the no of indices stored in the ranges.
* @return indices count
*/
- public synchronized int getIndicesCount() {
+ synchronized long getIndicesCount() {
return indicesCount;
}
@@ -69,7 +69,7 @@
* If the range is of 0 length, doesn't do anything.
* @param range Range to be added.
*/
- public synchronized void add(Range range){
+ synchronized void add(Range range){
if(range.isEmpty()) {
return;
}
@@ -123,7 +123,7 @@
* If range is of 0 length, doesn't do anything.
* @param range Range to be removed.
*/
- public synchronized void remove(Range range) {
+ synchronized void remove(Range range) {
if(range.isEmpty()) {
return;
}
@@ -177,7 +177,8 @@
}
}
- public void readFields(DataInput in) throws IOException {
+ public synchronized void readFields(DataInput in) throws IOException {
+ indicesCount = in.readLong();
ranges = new TreeSet<Range>();
int size = in.readInt();
for(int i=0;i<size;i++) {
@@ -187,7 +188,8 @@
}
}
- public void write(DataOutput out) throws IOException {
+ public synchronized void write(DataOutput out) throws IOException {
+ out.writeLong(indicesCount);
out.writeInt(ranges.size());
Iterator<Range> it = ranges.iterator();
while(it.hasNext()) {
@@ -215,7 +217,7 @@
private long startIndex;
private long length;
- public Range(long startIndex, long length) {
+ Range(long startIndex, long length) {
if(length<0) {
throw new RuntimeException("length can't be negative");
}
@@ -223,7 +225,7 @@
this.length = length;
}
- public Range() {
+ Range() {
this(0,0);
}
@@ -231,7 +233,7 @@
* Get the start index. Start index in inclusive.
* @return startIndex.
*/
- public long getStartIndex() {
+ long getStartIndex() {
return startIndex;
}
@@ -239,7 +241,7 @@
* Get the end index. End index is exclusive.
* @return endIndex.
*/
- public long getEndIndex() {
+ long getEndIndex() {
return startIndex + length;
}
@@ -247,7 +249,7 @@
* Get Length.
* @return length
*/
- public long getLength() {
+ long getLength() {
return length;
}
@@ -256,7 +258,7 @@
* @return <code>true</code> if empty
* <code>false</code> otherwise.
*/
- public boolean isEmpty() {
+ boolean isEmpty() {
return length==0;
}
@@ -299,17 +301,25 @@
/**
* Index Iterator which skips the stored ranges.
*/
- private class SkipRangeIterator implements Iterator<Long> {
- Iterator<Range> rangeIterator = ranges.iterator();
+ static class SkipRangeIterator implements Iterator<Long> {
+ Iterator<Range> rangeIterator;
Range range = new Range();
long currentIndex = -1;
/**
+ * Constructor
+ * @param rangeIterator the iterator which gives the ranges.
+ */
+ SkipRangeIterator(Iterator<Range> rangeIterator) {
+ this.rangeIterator = rangeIterator;
+ }
+
+ /**
* Returns true till the index reaches Long.MAX_VALUE.
* @return <code>true</code> next index exists.
* <code>false</code> otherwise.
*/
- public boolean hasNext() {
+ public synchronized boolean hasNext() {
return currentIndex<Long.MAX_VALUE;
}
@@ -339,6 +349,15 @@
}
/**
+ * Get whether all the ranges have been skipped.
+ * @return <code>true</code> if all ranges have been skipped.
+ * <code>false</code> otherwise.
+ */
+ synchronized boolean skippedAllRanges() {
+ return !rangeIterator.hasNext() && currentIndex>=range.getEndIndex();
+ }
+
+ /**
* Remove is not supported. Doesn't apply.
*/
public void remove() {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Aug 29
01:19:36 2008
@@ -73,6 +73,7 @@
REDUCE_INPUT_GROUPS,
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS,
+ REDUCE_SKIPPED_GROUPS,
REDUCE_SKIPPED_RECORDS
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Fri Aug 29 01:19:36 2008
@@ -179,6 +179,7 @@
void init(JobID jobId) {
this.startTime = System.currentTimeMillis();
this.id = new TaskID(jobId, isMapTask(), partition);
+ this.skipping = startSkipping();
}
////////////////////////////////////
@@ -489,10 +490,7 @@
machinesWhereFailed.add(trackerHostName);
LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
failedRanges.add(status.getNextRecordRange());
- if(SkipBadRecords.getEnabled(conf) &&
- numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
- skipping = true;
- }
+ skipping = startSkipping();
} else {
numKilledTasks++;
}
@@ -502,6 +500,17 @@
kill();
}
}
+
+ /**
+ * Get whether to start skipping mode.
+ */
+ private boolean startSkipping() {
+ if(SkipBadRecords.getEnabled(conf) &&
+ numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
+ return true;
+ }
+ return false;
+ }
/**
* Finalize the <b>completed</b> task; note that this might not be the first
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
Fri Aug 29 01:19:36 2008
@@ -13,5 +13,6 @@
REDUCE_INPUT_RECORDS.name= Reduce input records
REDUCE_OUTPUT_RECORDS.name= Reduce output records
REDUCE_SKIPPED_RECORDS.name= Reduce skipped records
+REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java?rev=690142&r1=690141&r2=690142&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestBadRecords.java Fri
Aug 29 01:19:36 2008
@@ -35,7 +35,9 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
public class TestBadRecords extends ClusterMapReduceTestCase {
@@ -71,6 +73,7 @@
conf.setNumReduceTasks(1);
conf.setInt("mapred.task.timeout", 30*1000);
+ SkipBadRecords.setAttemptsToStartSkipping(conf,0);
//the no of attempts to successfully complete the task depends
//on the no of bad records.
conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
@@ -105,6 +108,59 @@
throws Exception{
LOG.info(runningJob.getCounters().toString());
assertTrue(runningJob.isSuccessful());
+
+ //validate counters
+ Counters counters = runningJob.getCounters();
+ assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS).
+ getCounter(),mapperBadRecords.size());
+
+ int mapRecs = input.size() - mapperBadRecords.size();
+ assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).
+ getCounter(),mapRecs);
+ assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).
+ getCounter(),mapRecs);
+
+ int redRecs = mapRecs - redBadRecords.size();
+ assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS).
+ getCounter(),redBadRecords.size());
+ assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS).
+ getCounter(),redBadRecords.size());
+ assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS).
+ getCounter(),redRecs);
+ assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS).
+ getCounter(),redRecs);
+ assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).
+ getCounter(),redRecs);
+
+ //validate skipped records
+ Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+ Path[] skips = FileUtil.stat2Paths(getFileSystem().listStatus(skipDir));
+ List<String> mapSkipped = new ArrayList<String>();
+ List<String> redSkipped = new ArrayList<String>();
+ for(Path skipPath : skips) {
+ LOG.info("skipPath: " + skipPath);
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(
+ getFileSystem(), skipPath, conf);
+ Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Object value = ReflectionUtils.newInstance(reader.getValueClass(),
+ conf);
+ key = reader.next(key);
+ while(key!=null) {
+ value = reader.getCurrentValue(value);
+ LOG.debug("key:"+key+" value:"+value.toString());
+ if(skipPath.getName().contains("_r_")) {
+ redSkipped.add(value.toString());
+ } else {
+ mapSkipped.add(value.toString());
+ }
+ key = reader.next(key);
+ }
+ reader.close();
+ }
+ assertTrue(mapSkipped.containsAll(mapperBadRecords));
+ assertTrue(redSkipped.containsAll(redBadRecords));
+
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
new OutputLogFilter()));