Hello Madhu,

           I really appreciate your efforts. I am sorry I did not
respond back.Actually I was struggling with it, so had anything to let
you know.Many thanks.

Regards,
    Mohammad Tariq


On Thu, Jun 21, 2012 at 12:37 PM, madhu phatak <phatak....@gmail.com> wrote:
> Hi,
>  Jira for the new API code
> https://issues.apache.org/jira/browse/HADOOP-8521
>
>
> On Tue, Jun 19, 2012 at 6:11 PM, madhu phatak <phatak....@gmail.com> wrote:
>>
>> Hi,
>>  Yes you have the class, but it's for old API.
>>
>>  Please find the code below  for ported classes for new API. I have not
>> tested the code,try to use these classes and let me know if its working for
>> you.
>>
>>
>> StreamInputFormat (new API)
>>
>> package org.apache.hadoop.streaming;
>> import java.io.IOException;
>> import java.lang.reflect.Constructor;
>>
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.FSDataInputStream;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapred.JobConf;
>> import org.apache.hadoop.mapred.Reporter;
>> import org.apache.hadoop.mapreduce.InputSplit;
>> import org.apache.hadoop.mapreduce.RecordReader;
>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>> import org.apache.hadoop.mapreduce.lib.input.FileSplit;
>> import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
>> import org.apache.hadoop.streaming.StreamUtil;
>>
>> /**
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>>
>>
>>
>> /** An input format that selects a RecordReader based on a JobConf
>> property.
>>  *  This should be used only for non-standard record reader such as
>>  *  StreamXmlRecordReader. For all other standard
>>  *  record readers, the appropriate input format classes should be used.
>>  */
>> public class StreamInputFormat extends KeyValueTextInputFormat {
>>
>> @Override
>> public RecordReader<Text, Text> createRecordReader(InputSplit
>> genericSplit,
>> TaskAttemptContext context) throws IOException {
>>
>> Configuration conf = context.getConfiguration();
>> String c = conf.get("stream.recordreader.class");
>> if (c == null || c.indexOf("LineRecordReader") >= 0) {
>> return super.createRecordReader(genericSplit, context);
>> }
>>
>> // handling non-standard record reader (likely StreamXmlRecordReader)
>> FileSplit split = (FileSplit) genericSplit;
>> //LOG.info("getRecordReader start.....split=" + split);
>> context.setStatus(split.toString());
>> context.progress();
>>
>> // Open the file and seek to the start of the split
>> FileSystem fs = split.getPath().getFileSystem(conf);
>> FSDataInputStream in = fs.open(split.getPath());
>>
>> // Factory dispatch based on available params..
>> Class readerClass;
>>
>> {
>> readerClass = StreamUtil.goodClassOrNull(conf, c, null);
>> if (readerClass == null) {
>> throw new RuntimeException("Class not found: " + c);
>> }
>> }
>> Constructor ctor;
>>    try {
>>      ctor = readerClass.getConstructor(new Class[] {
>> FSDataInputStream.class,
>>                                                      FileSplit.class,
>> TaskAttemptContext.class, Configuration.class, FileSystem.class });
>>    } catch (NoSuchMethodException nsm) {
>>      throw new RuntimeException(nsm);
>>    }
>>
>>    RecordReader<Text, Text> reader;
>>    try {
>>      reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] {
>> in, split,
>>                                                              context,
>> conf, fs });
>>    } catch (Exception nsm) {
>>      throw new RuntimeException(nsm);
>>    }
>>    return reader;
>>
>>
>> }
>>
>> }
>>
>> StreamXmlRecordReader
>>
>> /**
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package org.apache.hadoop.streaming;
>>
>> import java.io.*;
>> import java.util.regex.*;
>>
>> import org.apache.hadoop.io.DataOutputBuffer;
>> import org.apache.hadoop.io.Writable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.io.WritableComparable;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.fs.FSDataInputStream;
>> import org.apache.hadoop.mapred.Reporter;
>> import org.apache.hadoop.mapred.FileSplit;
>> import org.apache.hadoop.mapred.JobConf;
>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>
>> /** A way to interpret XML fragments as Mapper input records.
>>  *  Values are XML subtrees delimited by configurable tags.
>>  *  Keys could be the value of a certain attribute in the XML subtree,
>>  *  but this is left to the stream processor application.
>>  *
>>  *  The name-value properties that StreamXmlRecordReader understands are:
>>  *    String begin (chars marking beginning of record)
>>  *    String end   (chars marking end of record)
>>  *    int maxrec   (maximum record size)
>>  *    int lookahead(maximum lookahead to sync CDATA)
>>  *    boolean slowmatch
>>  */
>> public class StreamXmlRecordReader extends StreamBaseRecordReader {
>>
>>   public StreamXmlRecordReader(FSDataInputStream in, FileSplit split,
>> TaskAttemptContext context,
>>                                Configuration conf, FileSystem fs) throws
>> IOException {
>>     super(in, split, context, conf, fs);
>>
>>     beginMark_ = checkJobGet(CONF_NS + "begin");
>>     endMark_ = checkJobGet(CONF_NS + "end");
>>
>>     maxRecSize_ = conf_.getInt(CONF_NS + "maxrec", 50 * 1000);
>>     lookAhead_ = conf_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
>>     synched_ = false;
>>
>>     slowMatch_ = conf_.getBoolean(CONF_NS + "slowmatch", false);
>>     if (slowMatch_) {
>>       beginPat_ = makePatternCDataOrMark(beginMark_);
>>       endPat_ = makePatternCDataOrMark(endMark_);
>>     }
>>     init();
>>   }
>>
>>   public void init() throws IOException {
>>     LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + "
>> end_=" + end_ + " length_="
>>              + length_ + " start_ > in_.getPos() =" + (start_ >
>> in_.getPos()) + " " + start_ + " > "
>>              + in_.getPos());
>>     if (start_ > in_.getPos()) {
>>       in_.seek(start_);
>>     }
>>     pos_ = start_;
>>     bin_ = new BufferedInputStream(in_);
>>     seekNextRecordBoundary();
>>   }
>>
>>   int numNext = 0;
>>
>>   public synchronized boolean next(Text key, Text value) throws
>> IOException {
>>     numNext++;
>>     if (pos_ >= end_) {
>>       return false;
>>     }
>>
>>     DataOutputBuffer buf = new DataOutputBuffer();
>>     if (!readUntilMatchBegin()) {
>>       return false;
>>     }
>>     if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
>>       return false;
>>     }
>>
>>     // There is only one elem..key/value splitting is not done here.
>>     byte[] record = new byte[buf.getLength()];
>>     System.arraycopy(buf.getData(), 0, record, 0, record.length);
>>
>>     numRecStats(record, 0, record.length);
>>
>>     key.set(record);
>>     value.set("");
>>
>>     return true;
>>   }
>>
>>   public void seekNextRecordBoundary() throws IOException {
>>     readUntilMatchBegin();
>>   }
>>
>>   boolean readUntilMatchBegin() throws IOException {
>>     if (slowMatch_) {
>>       return slowReadUntilMatch(beginPat_, false, null);
>>     } else {
>>       return fastReadUntilMatch(beginMark_, false, null);
>>     }
>>   }
>>
>>   private boolean readUntilMatchEnd(DataOutputBuffer buf) throws
>> IOException {
>>     if (slowMatch_) {
>>       return slowReadUntilMatch(endPat_, true, buf);
>>     } else {
>>       return fastReadUntilMatch(endMark_, true, buf);
>>     }
>>   }
>>
>>   private boolean slowReadUntilMatch(Pattern markPattern, boolean
>> includePat,
>>                                      DataOutputBuffer outBufOrNull) throws
>> IOException {
>>     byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
>>     int read = 0;
>>     bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); //mark to invalidate
>> if we read more
>>     read = bin_.read(buf);
>>     if (read == -1) return false;
>>
>>     String sbuf = new String(buf, 0, read, "UTF-8");
>>     Matcher match = markPattern.matcher(sbuf);
>>
>>     firstMatchStart_ = NA;
>>     firstMatchEnd_ = NA;
>>     int bufPos = 0;
>>     int state = synched_ ? CDATA_OUT : CDATA_UNK;
>>     int s = 0;
>>
>>     while (match.find(bufPos)) {
>>       int input;
>>       if (match.group(1) != null) {
>>         input = CDATA_BEGIN;
>>       } else if (match.group(2) != null) {
>>         input = CDATA_END;
>>         firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
>>       } else {
>>         input = RECORD_MAYBE;
>>       }
>>       if (input == RECORD_MAYBE) {
>>         if (firstMatchStart_ == NA) {
>>           firstMatchStart_ = match.start();
>>           firstMatchEnd_ = match.end();
>>         }
>>       }
>>       state = nextState(state, input, match.start());
>>       if (state == RECORD_ACCEPT) {
>>         break;
>>       }
>>       bufPos = match.end();
>>       s++;
>>     }
>>     if (state != CDATA_UNK) {
>>       synched_ = true;
>>     }
>>     boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT
>> || state == CDATA_UNK);
>>     if (matched) {
>>       int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
>>       bin_.reset();
>>
>>       for (long skiplen = endPos; skiplen > 0; ) {
>>         skiplen -= bin_.skip(skiplen); // Skip succeeds as we have read
>> this buffer
>>       }
>>
>>       pos_ += endPos;
>>       if (outBufOrNull != null) {
>>         outBufOrNull.writeBytes(sbuf.substring(0,endPos));
>>       }
>>     }
>>     return matched;
>>   }
>>
>>   // states
>>   final static int CDATA_IN = 10;
>>   final static int CDATA_OUT = 11;
>>   final static int CDATA_UNK = 12;
>>   final static int RECORD_ACCEPT = 13;
>>   // inputs
>>   final static int CDATA_BEGIN = 20;
>>   final static int CDATA_END = 21;
>>   final static int RECORD_MAYBE = 22;
>>
>>   /* also updates firstMatchStart_;*/
>>   int nextState(int state, int input, int bufPos) {
>>     switch (state) {
>>     case CDATA_UNK:
>>     case CDATA_OUT:
>>       switch (input) {
>>       case CDATA_BEGIN:
>>         return CDATA_IN;
>>       case CDATA_END:
>>         if (state == CDATA_OUT) {
>>           //System.out.println("buggy XML " + bufPos);
>>         }
>>         return CDATA_OUT;
>>       case RECORD_MAYBE:
>>         return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
>>       }
>>       break;
>>     case CDATA_IN:
>>       return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
>>     }
>>     throw new IllegalStateException(state + " " + input + " " + bufPos + "
>> " + splitName_);
>>   }
>>
>>   Pattern makePatternCDataOrMark(String escapedMark) {
>>     StringBuffer pat = new StringBuffer();
>>     addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
>>     addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
>>     addGroup(pat, escapedMark); // RECORD_MAYBE
>>     return Pattern.compile(pat.toString());
>>   }
>>
>>   void addGroup(StringBuffer pat, String escapedGroup) {
>>     if (pat.length() > 0) {
>>       pat.append("|");
>>     }
>>     pat.append("(");
>>     pat.append(escapedGroup);
>>     pat.append(")");
>>   }
>>
>>   boolean fastReadUntilMatch(String textPat, boolean includePat,
>> DataOutputBuffer outBufOrNull) throws IOException {
>>     byte[] cpat = textPat.getBytes("UTF-8");
>>     int m = 0;
>>     boolean match = false;
>>     int msup = cpat.length;
>>     int LL = 120000 * 10;
>>
>>     bin_.mark(LL); // large number to invalidate mark
>>     while (true) {
>>       int b = bin_.read();
>>       if (b == -1) break;
>>
>>       byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
>>       if (c == cpat[m]) {
>>         m++;
>>         if (m == msup) {
>>           match = true;
>>           break;
>>         }
>>       } else {
>>         bin_.mark(LL); // rest mark so we could jump back if we found a
>> match
>>         if (outBufOrNull != null) {
>>           outBufOrNull.write(cpat, 0, m);
>>           outBufOrNull.write(c);
>>         }
>>         pos_ += m + 1; // skip m chars, +1 for 'c'
>>         m = 0;
>>       }
>>     }
>>     if (!includePat && match) {
>>       bin_.reset();
>>     } else if (outBufOrNull != null) {
>>       outBufOrNull.write(cpat);
>>       pos_ += msup;
>>     }
>>     return match;
>>   }
>>
>>   String checkJobGet(String prop) throws IOException {
>>     String val = conf_.get(prop);
>>     if (val == null) {
>>       throw new IOException("JobConf: missing required property: " +
>> prop);
>>     }
>>     return val;
>>   }
>>
>>   String beginMark_;
>>   String endMark_;
>>
>>   Pattern beginPat_;
>>   Pattern endPat_;
>>
>>   boolean slowMatch_;
>>   int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should
>> be more than max record size
>>   int maxRecSize_;
>>
>>   BufferedInputStream bin_; // Wrap FSDataInputStream for efficient
>> backward seeks
>>   long pos_; // Keep track on position with respect encapsulated
>> FSDataInputStream
>>
>>   final static int NA = -1;
>>   int firstMatchStart_ = 0; // candidate record boundary. Might just be
>> CDATA.
>>   int firstMatchEnd_ = 0;
>>
>>   boolean synched_;
>> }
>>
>> StreamBaseRecordReader
>>
>> /**
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package org.apache.hadoop.streaming;
>>
>> import java.io.*;
>>
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.io.Writable;
>> import org.apache.hadoop.io.WritableComparable;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.fs.FSDataInputStream;
>> import org.apache.hadoop.mapred.Reporter;
>> import org.apache.hadoop.mapred.RecordReader;
>> import org.apache.hadoop.mapred.FileSplit;
>> import org.apache.hadoop.mapred.JobConf;
>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>> import org.apache.commons.logging.*;
>>
>> /**
>>  * Shared functionality for hadoopStreaming formats.
>>  * A custom reader can be defined to be a RecordReader with the
>> constructor below
>>  * and is selected with the option bin/hadoopStreaming -inputreader ...
>>  * @see StreamXmlRecordReader
>>  */
>> public abstract class StreamBaseRecordReader implements RecordReader<Text,
>> Text> {
>>
>>   protected static final Log LOG =
>> LogFactory.getLog(StreamBaseRecordReader.class.getName());
>>
>>   // custom JobConf properties for this class are prefixed with this
>> namespace
>>   final static String CONF_NS = "stream.recordreader.";
>>
>>   public StreamBaseRecordReader(FSDataInputStream in, FileSplit split,
>> TaskAttemptContext context,
>>                                 Configuration conf, FileSystem fs) throws
>> IOException {
>>     in_ = in;
>>     split_ = split;
>>     start_ = split_.getStart();
>>     length_ = split_.getLength();
>>     end_ = start_ + length_;
>>     splitName_ = split_.getPath().getName();
>>     this.context_ = context;
>>     conf_ = conf;
>>     fs_ = fs;
>>
>>     statusMaxRecordChars_ = conf.getInt(CONF_NS + "statuschars", 200);
>>   }
>>
>>   /// RecordReader API
>>
>>   /** Read a record. Implementation should call numRecStats at the end
>>    */
>>   public abstract boolean next(Text key, Text value) throws IOException;
>>
>>   /** Returns the current position in the input. */
>>   public synchronized long getPos() throws IOException {
>>     return in_.getPos();
>>   }
>>
>>   /** Close this to future operations.*/
>>   public synchronized void close() throws IOException {
>>     in_.close();
>>   }
>>
>>   public float getProgress() throws IOException {
>>     if (end_ == start_) {
>>       return 1.0f;
>>     } else {
>>       return ((float)(in_.getPos() - start_)) / ((float)(end_ - start_));
>>     }
>>   }
>>
>>   public Text createKey() {
>>     return new Text();
>>   }
>>
>>   public Text createValue() {
>>     return new Text();
>>   }
>>
>>   /// StreamBaseRecordReader API
>>
>>   /** Implementation should seek forward in_ to the first byte of the next
>> record.
>>    *  The initial byte offset in the stream is arbitrary.
>>    */
>>   public abstract void seekNextRecordBoundary() throws IOException;
>>
>>   void numRecStats(byte[] record, int start, int len) throws IOException {
>>     numRec_++;
>>     if (numRec_ == nextStatusRec_) {
>>       String recordStr = new String(record, start, Math.min(len,
>> statusMaxRecordChars_), "UTF-8");
>>       nextStatusRec_ += 100;//*= 10;
>>       String status = getStatus(recordStr);
>>       LOG.info(status);
>>       context_.setStatus(status);
>>     }
>>   }
>>
>>   long lastMem = 0;
>>
>>   String getStatus(CharSequence record) {
>>     long pos = -1;
>>     try {
>>       pos = getPos();
>>     } catch (IOException io) {
>>     }
>>     String recStr;
>>     if (record.length() > statusMaxRecordChars_) {
>>       recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
>>     } else {
>>       recStr = record.toString();
>>     }
>>     String unqualSplit = split_.getPath().getName() + ":" +
>>                          split_.getStart() + "+" + split_.getLength();
>>     String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" +
>> pos + " " + unqualSplit
>>       + " Processing record=" + recStr;
>>     status += " " + splitName_;
>>     return status;
>>   }
>>
>>   FSDataInputStream in_;
>>   FileSplit split_;
>>   long start_;
>>   long end_;
>>   long length_;
>>   String splitName_;
>>   TaskAttemptContext context_;
>>   Configuration conf_;
>>   FileSystem fs_;
>>   int numRec_ = 0;
>>   int nextStatusRec_ = 1;
>>   int statusMaxRecordChars_;
>>
>> }
>>
>>
>>
>> On Tue, Jun 19, 2012 at 5:58 PM, Mohammad Tariq <donta...@gmail.com>
>> wrote:
>>>
>>> But I have downloaded "hadoop-streaming-0.20.205.0.jar" and it
>>> contains StreamXmlRecordReader.class file. This means it should
>>> support StreamInputFormat.
>>>
>>> Regards,
>>>     Mohammad Tariq
>>>
>>>
>>> On Tue, Jun 19, 2012 at 5:54 PM, Mohammad Tariq <donta...@gmail.com>
>>> wrote:
>>> > Thanks Madhu. I'll do that.
>>> >
>>> > Regards,
>>> >     Mohammad Tariq
>>> >
>>> >
>>> > On Tue, Jun 19, 2012 at 5:43 PM, madhu phatak <phatak....@gmail.com>
>>> > wrote:
>>> >> Seems like StreamInputFormat not yet ported to new API.That's why you
>>> >> are
>>> >> not able to set as InputFormatClass. You can file a  jira for this
>>> >> issue.
>>> >>
>>> >>
>>> >> On Tue, Jun 19, 2012 at 4:49 PM, Mohammad Tariq <donta...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> My driver function looks like this -
>>> >>>
>>> >>> public static void main(String[] args) throws IOException,
>>> >>> InterruptedException, ClassNotFoundException {
>>> >>>                // TODO Auto-generated method stub
>>> >>>
>>> >>>                Configuration conf = new Configuration();
>>> >>>                Job job = new Job();
>>> >>>                conf.set("stream.recordreader.class",
>>> >>> "org.apache.hadoop.streaming.StreamXmlRecordReader");
>>> >>>                conf.set("stream.recordreader.begin", "<info>");
>>> >>>                conf.set("stream.recordreader.end", "</info>");
>>> >>>                job.setInputFormatClass(StreamInputFormat.class);
>>> >>>                job.setOutputKeyClass(Text.class);
>>> >>>                job.setOutputValueClass(IntWritable.class);
>>> >>>                FileInputFormat.addInputPath(job, new
>>> >>> Path("/mapin/demo.xml"));
>>> >>>                FileOutputFormat.setOutputPath(job, new
>>> >>> Path("/mapout/demo"));
>>> >>>                job.waitForCompletion(true);
>>> >>>        }
>>> >>>
>>> >>> Could you please out my mistake??
>>> >>>
>>> >>> Regards,
>>> >>>     Mohammad Tariq
>>> >>>
>>> >>>
>>> >>> On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <donta...@gmail.com>
>>> >>> wrote:
>>> >>> > Hello Madhu,
>>> >>> >
>>> >>> >             Thanks for the response. Actually I was trying to use
>>> >>> > the
>>> >>> > new API (Job). Have you tried that. I was not able to set the
>>> >>> > InputFormat using the Job API.
>>> >>> >
>>> >>> > Regards,
>>> >>> >     Mohammad Tariq
>>> >>> >
>>> >>> >
>>> >>> > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak
>>> >>> > <phatak....@gmail.com>
>>> >>> > wrote:
>>> >>> >> Hi,
>>> >>> >>  Set the following properties in driver class
>>> >>> >>
>>> >>> >>   jobConf.set("stream.recordreader.class",
>>> >>> >> "org.apache.hadoop.streaming.StreamXmlRecordReader");
>>> >>> >> jobConf.set("stream.recordreader.begin",
>>> >>> >> "start-tag");
>>> >>> >> jobConf.set("stream.recordreader.end",
>>> >>> >> "end-tag");
>>> >>> >>
>>> >>> >> jobConf.setInputFormat(StreamInputFormat,class);
>>> >>> >>
>>> >>> >>  In Mapper, xml record will come as key of type Text,so your
>>> >>> >> mapper
>>> >>> >> will
>>> >>> >> look like
>>> >>> >>
>>> >>> >>   public class MyMapper<K,V>  implements Mapper<Text,Text,K,V>
>>> >>> >>
>>> >>> >>
>>> >>> >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq
>>> >>> >> <donta...@gmail.com>
>>> >>> >> wrote:
>>> >>> >>>
>>> >>> >>> Hello list,
>>> >>> >>>
>>> >>> >>>        Could anyone, who has written MapReduce jobs to process
>>> >>> >>> xml
>>> >>> >>> documents stored in there cluster using "StreamXmlRecordReader"
>>> >>> >>> share
>>> >>> >>> his/her experience??...or if you can provide me some pointers
>>> >>> >>> addressing that..Many thanks.
>>> >>> >>>
>>> >>> >>> Regards,
>>> >>> >>>     Mohammad Tariq
>>> >>> >>
>>> >>> >>
>>> >>> >>
>>> >>> >>
>>> >>> >> --
>>> >>> >> https://github.com/zinnia-phatak-dev/Nectar
>>> >>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> https://github.com/zinnia-phatak-dev/Nectar
>>> >>
>>
>>
>>
>>
>> --
>> https://github.com/zinnia-phatak-dev/Nectar
>>
>
>
>
> --
> https://github.com/zinnia-phatak-dev/Nectar
>

Reply via email to