Hi, my hadoop version is Hadoop 0.20.2-cdh3u3 and I want to define new InputFormat in hadoop book , but there is error "class org.apache.hadoop.streaming.WholeFileInputFormat not org.apache.hadoop.mapred.InputFormat"
Hadoop version is 0.20, but the streaming still depend on 0.10 mapred api? the detail: ************************************************************************************************************************************************************* javac -classpath /usr/lib/hadoop/hadoop-core-0.20.2-cdh3u3.jar:/usr/lib/hadoop/lib/* -d class7 ./*.java cd class7 jar uf /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar org/apache/hadoop/streaming/*.class hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar -inputformat WholeFileInputFormat -mapper xmlmappertest.py -file xmlmappertest.py -input /user/hdfs/tarcatalog -output /user/hive/external/catalog -jobconf mapred.map.tasks=108 13/03/15 16:27:51 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead. Exception in thread "main" java.lang.RuntimeException: class org.apache.hadoop.streaming.WholeFileInputFormat not org.apache.hadoop.mapred.InputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1070) at org.apache.hadoop.mapred.JobConf.setInputFormat(JobConf.java:609) at org.apache.hadoop.streaming.StreamJob.setJobConf(StreamJob.java:707) at org.apache.hadoop.streaming.StreamJob.run(StreamJob.java:122) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) at org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:50) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.hadoop.util.RunJar.main(RunJar.java:197) *****************************************************************the code from hadoop book******************************************************************* WholeFileInputFormat.java // cc WholeFileInputFormat An InputFormat for reading a whole file as a record importjava.io.IOException; importorg.apache.hadoop.fs.*; importorg.apache.hadoop.io.*; importorg.apache.hadoop.mapreduce.InputSplit; importorg.apache.hadoop.mapreduce.JobContext; importorg.apache.hadoop.mapreduce.RecordReader; importorg.apache.hadoop.mapreduce.TaskAttemptContext; importorg.apache.hadoop.mapreduce.lib.input.*; //vv WholeFileInputFormat publicclassWholeFileInputFormat extendsFileInputFormat<NullWritable,BytesWritable>{ @Override protectedbooleanisSplitable(JobContextcontext,Pathfile){ returnfalse; } @Override publicRecordReader<NullWritable,BytesWritable>createRecordReader( InputSplitsplit,TaskAttemptContextcontext)throwsIOException, InterruptedException{ WholeFileRecordReaderreader=newWholeFileRecordReader(); reader.initialize(split,context); returnreader; } } //^^ WholeFileInputFormat WholeFileRecordReader.java // cc WholeFileRecordReader The RecordReader used by WholeFileInputFormat for reading a whole file as a record importjava.io.IOException; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FSDataInputStream; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.BytesWritable; importorg.apache.hadoop.io.IOUtils; importorg.apache.hadoop.io.NullWritable; importorg.apache.hadoop.mapreduce.InputSplit; importorg.apache.hadoop.mapreduce.RecordReader; importorg.apache.hadoop.mapreduce.TaskAttemptContext; importorg.apache.hadoop.mapreduce.lib.input.FileSplit; //vv WholeFileRecordReader classWholeFileRecordReaderextendsRecordReader<NullWritable,BytesWritable>{ privateFileSplitfileSplit; privateConfigurationconf; privateBytesWritablevalue=newBytesWritable(); privatebooleanprocessed=false; @Override publicvoidinitialize(InputSplitsplit,TaskAttemptContextcontext) throwsIOException,InterruptedException{ this.fileSplit=(FileSplit)split; this.conf=context.getConfiguration(); } @Override publicbooleannextKeyValue()throwsIOException,InterruptedException{ if(!processed){ byte[]contents=newbyte[(int)fileSplit.getLength()]; Pathfile=fileSplit.getPath(); FileSystemfs=file.getFileSystem(conf); FSDataInputStreamin=null; try{ in=fs.open(file); IOUtils.readFully(in,contents,0,contents.length); value.set(contents,0,contents.length); }finally{ IOUtils.closeStream(in); } processed=true; returntrue; } returnfalse; } @Override publicNullWritablegetCurrentKey()throwsIOException,InterruptedException{ returnNullWritable.get(); } @Override publicBytesWritablegetCurrentValue()throwsIOException, InterruptedException{ returnvalue; } @Override publicfloatgetProgress()throwsIOException{ returnprocessed?1.0f:0.0f; } @Override publicvoidclose()throwsIOException{ // do nothing } } //^^ WholeFileRecordReader