Hi,
my hadoop version is Hadoop 0.20.2-cdh3u3 and I want to define new
InputFormat in hadoop book , but there is error
"class org.apache.hadoop.streaming.WholeFileInputFormat not
org.apache.hadoop.mapred.InputFormat"
Hadoop version is 0.20, but the streaming still depend on 0.10 mapred api?
the detail:
*************************************************************************************************************************************************************
javac -classpath
/usr/lib/hadoop/hadoop-core-0.20.2-cdh3u3.jar:/usr/lib/hadoop/lib/* -d class7
./*.java
cd class7
jar uf
/usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar
org/apache/hadoop/streaming/*.class
hadoop jar
/usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar
-inputformat WholeFileInputFormat -mapper xmlmappertest.py -file
xmlmappertest.py -input /user/hdfs/tarcatalog -output
/user/hive/external/catalog -jobconf mapred.map.tasks=108
13/03/15 16:27:51 WARN streaming.StreamJob: -jobconf option is deprecated,
please use -D instead.
Exception in thread "main" java.lang.RuntimeException: class
org.apache.hadoop.streaming.WholeFileInputFormat not
org.apache.hadoop.mapred.InputFormat
at
org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1070)
at org.apache.hadoop.mapred.JobConf.setInputFormat(JobConf.java:609)
at org.apache.hadoop.streaming.StreamJob.setJobConf(StreamJob.java:707)
at org.apache.hadoop.streaming.StreamJob.run(StreamJob.java:122)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at
org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:50)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
*****************************************************************the code from
hadoop book*******************************************************************
WholeFileInputFormat.java
// cc WholeFileInputFormat An InputFormat for reading a whole file as a record
importjava.io.IOException;
importorg.apache.hadoop.fs.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.JobContext;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.*;
//vv WholeFileInputFormat
publicclassWholeFileInputFormat
extendsFileInputFormat<NullWritable,BytesWritable>{
@Override
protectedbooleanisSplitable(JobContextcontext,Pathfile){
returnfalse;
}
@Override
publicRecordReader<NullWritable,BytesWritable>createRecordReader(
InputSplitsplit,TaskAttemptContextcontext)throwsIOException,
InterruptedException{
WholeFileRecordReaderreader=newWholeFileRecordReader();
reader.initialize(split,context);
returnreader;
}
}
//^^ WholeFileInputFormat
WholeFileRecordReader.java
// cc WholeFileRecordReader The RecordReader used by WholeFileInputFormat for
reading a whole file as a record
importjava.io.IOException;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.BytesWritable;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
//vv WholeFileRecordReader
classWholeFileRecordReaderextendsRecordReader<NullWritable,BytesWritable>{
privateFileSplitfileSplit;
privateConfigurationconf;
privateBytesWritablevalue=newBytesWritable();
privatebooleanprocessed=false;
@Override
publicvoidinitialize(InputSplitsplit,TaskAttemptContextcontext)
throwsIOException,InterruptedException{
this.fileSplit=(FileSplit)split;
this.conf=context.getConfiguration();
}
@Override
publicbooleannextKeyValue()throwsIOException,InterruptedException{
if(!processed){
byte[]contents=newbyte[(int)fileSplit.getLength()];
Pathfile=fileSplit.getPath();
FileSystemfs=file.getFileSystem(conf);
FSDataInputStreamin=null;
try{
in=fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents,0,contents.length);
}finally{
IOUtils.closeStream(in);
}
processed=true;
returntrue;
}
returnfalse;
}
@Override
publicNullWritablegetCurrentKey()throwsIOException,InterruptedException{
returnNullWritable.get();
}
@Override
publicBytesWritablegetCurrentValue()throwsIOException,
InterruptedException{
returnvalue;
}
@Override
publicfloatgetProgress()throwsIOException{
returnprocessed?1.0f:0.0f;
}
@Override
publicvoidclose()throwsIOException{
// do nothing
}
}
//^^ WholeFileRecordReader