Hello Madhu, I really appreciate your efforts. I am sorry I did not respond back.Actually I was struggling with it, so had anything to let you know.Many thanks.
Regards, Mohammad Tariq On Thu, Jun 21, 2012 at 12:37 PM, madhu phatak <phatak....@gmail.com> wrote: > Hi, > Jira for the new API code > https://issues.apache.org/jira/browse/HADOOP-8521 > > > On Tue, Jun 19, 2012 at 6:11 PM, madhu phatak <phatak....@gmail.com> wrote: >> >> Hi, >> Yes you have the class, but it's for old API. >> >> Please find the code below for ported classes for new API. I have not >> tested the code,try to use these classes and let me know if its working for >> you. >> >> >> StreamInputFormat (new API) >> >> package org.apache.hadoop.streaming; >> import java.io.IOException; >> import java.lang.reflect.Constructor; >> >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.FSDataInputStream; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapred.Reporter; >> import org.apache.hadoop.mapreduce.InputSplit; >> import org.apache.hadoop.mapreduce.RecordReader; >> import org.apache.hadoop.mapreduce.TaskAttemptContext; >> import org.apache.hadoop.mapreduce.lib.input.FileSplit; >> import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; >> import org.apache.hadoop.streaming.StreamUtil; >> >> /** >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> >> >> >> /** An input format that selects a RecordReader based on a JobConf >> property. >> * This should be used only for non-standard record reader such as >> * StreamXmlRecordReader. For all other standard >> * record readers, the appropriate input format classes should be used. >> */ >> public class StreamInputFormat extends KeyValueTextInputFormat { >> >> @Override >> public RecordReader<Text, Text> createRecordReader(InputSplit >> genericSplit, >> TaskAttemptContext context) throws IOException { >> >> Configuration conf = context.getConfiguration(); >> String c = conf.get("stream.recordreader.class"); >> if (c == null || c.indexOf("LineRecordReader") >= 0) { >> return super.createRecordReader(genericSplit, context); >> } >> >> // handling non-standard record reader (likely StreamXmlRecordReader) >> FileSplit split = (FileSplit) genericSplit; >> //LOG.info("getRecordReader start.....split=" + split); >> context.setStatus(split.toString()); >> context.progress(); >> >> // Open the file and seek to the start of the split >> FileSystem fs = split.getPath().getFileSystem(conf); >> FSDataInputStream in = fs.open(split.getPath()); >> >> // Factory dispatch based on available params.. >> Class readerClass; >> >> { >> readerClass = StreamUtil.goodClassOrNull(conf, c, null); >> if (readerClass == null) { >> throw new RuntimeException("Class not found: " + c); >> } >> } >> Constructor ctor; >> try { >> ctor = readerClass.getConstructor(new Class[] { >> FSDataInputStream.class, >> FileSplit.class, >> TaskAttemptContext.class, Configuration.class, FileSystem.class }); >> } catch (NoSuchMethodException nsm) { >> throw new RuntimeException(nsm); >> } >> >> RecordReader<Text, Text> reader; >> try { >> reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { >> in, split, >> context, >> conf, fs }); >> } catch (Exception nsm) { >> throw new RuntimeException(nsm); >> } >> return reader; >> >> >> } >> >> } >> >> StreamXmlRecordReader >> >> /** >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> package org.apache.hadoop.streaming; >> >> import java.io.*; >> import java.util.regex.*; >> >> import org.apache.hadoop.io.DataOutputBuffer; >> import org.apache.hadoop.io.Writable; >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.io.WritableComparable; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.fs.FSDataInputStream; >> import org.apache.hadoop.mapred.Reporter; >> import org.apache.hadoop.mapred.FileSplit; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapreduce.TaskAttemptContext; >> >> /** A way to interpret XML fragments as Mapper input records. >> * Values are XML subtrees delimited by configurable tags. >> * Keys could be the value of a certain attribute in the XML subtree, >> * but this is left to the stream processor application. >> * >> * The name-value properties that StreamXmlRecordReader understands are: >> * String begin (chars marking beginning of record) >> * String end (chars marking end of record) >> * int maxrec (maximum record size) >> * int lookahead(maximum lookahead to sync CDATA) >> * boolean slowmatch >> */ >> public class StreamXmlRecordReader extends StreamBaseRecordReader { >> >> public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, >> TaskAttemptContext context, >> Configuration conf, FileSystem fs) throws >> IOException { >> super(in, split, context, conf, fs); >> >> beginMark_ = checkJobGet(CONF_NS + "begin"); >> endMark_ = checkJobGet(CONF_NS + "end"); >> >> maxRecSize_ = conf_.getInt(CONF_NS + "maxrec", 50 * 1000); >> lookAhead_ = conf_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_); >> synched_ = false; >> >> slowMatch_ = conf_.getBoolean(CONF_NS + "slowmatch", false); >> if (slowMatch_) { >> beginPat_ = makePatternCDataOrMark(beginMark_); >> endPat_ = makePatternCDataOrMark(endMark_); >> } >> init(); >> } >> >> public void init() throws IOException { >> LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " >> end_=" + end_ + " length_=" >> + length_ + " start_ > in_.getPos() =" + (start_ > >> in_.getPos()) + " " + start_ + " > " >> + in_.getPos()); >> if (start_ > in_.getPos()) { >> in_.seek(start_); >> } >> pos_ = start_; >> bin_ = new BufferedInputStream(in_); >> seekNextRecordBoundary(); >> } >> >> int numNext = 0; >> >> public synchronized boolean next(Text key, Text value) throws >> IOException { >> numNext++; >> if (pos_ >= end_) { >> return false; >> } >> >> DataOutputBuffer buf = new DataOutputBuffer(); >> if (!readUntilMatchBegin()) { >> return false; >> } >> if (pos_ >= end_ || !readUntilMatchEnd(buf)) { >> return false; >> } >> >> // There is only one elem..key/value splitting is not done here. >> byte[] record = new byte[buf.getLength()]; >> System.arraycopy(buf.getData(), 0, record, 0, record.length); >> >> numRecStats(record, 0, record.length); >> >> key.set(record); >> value.set(""); >> >> return true; >> } >> >> public void seekNextRecordBoundary() throws IOException { >> readUntilMatchBegin(); >> } >> >> boolean readUntilMatchBegin() throws IOException { >> if (slowMatch_) { >> return slowReadUntilMatch(beginPat_, false, null); >> } else { >> return fastReadUntilMatch(beginMark_, false, null); >> } >> } >> >> private boolean readUntilMatchEnd(DataOutputBuffer buf) throws >> IOException { >> if (slowMatch_) { >> return slowReadUntilMatch(endPat_, true, buf); >> } else { >> return fastReadUntilMatch(endMark_, true, buf); >> } >> } >> >> private boolean slowReadUntilMatch(Pattern markPattern, boolean >> includePat, >> DataOutputBuffer outBufOrNull) throws >> IOException { >> byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)]; >> int read = 0; >> bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); //mark to invalidate >> if we read more >> read = bin_.read(buf); >> if (read == -1) return false; >> >> String sbuf = new String(buf, 0, read, "UTF-8"); >> Matcher match = markPattern.matcher(sbuf); >> >> firstMatchStart_ = NA; >> firstMatchEnd_ = NA; >> int bufPos = 0; >> int state = synched_ ? CDATA_OUT : CDATA_UNK; >> int s = 0; >> >> while (match.find(bufPos)) { >> int input; >> if (match.group(1) != null) { >> input = CDATA_BEGIN; >> } else if (match.group(2) != null) { >> input = CDATA_END; >> firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it >> } else { >> input = RECORD_MAYBE; >> } >> if (input == RECORD_MAYBE) { >> if (firstMatchStart_ == NA) { >> firstMatchStart_ = match.start(); >> firstMatchEnd_ = match.end(); >> } >> } >> state = nextState(state, input, match.start()); >> if (state == RECORD_ACCEPT) { >> break; >> } >> bufPos = match.end(); >> s++; >> } >> if (state != CDATA_UNK) { >> synched_ = true; >> } >> boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT >> || state == CDATA_UNK); >> if (matched) { >> int endPos = includePat ? firstMatchEnd_ : firstMatchStart_; >> bin_.reset(); >> >> for (long skiplen = endPos; skiplen > 0; ) { >> skiplen -= bin_.skip(skiplen); // Skip succeeds as we have read >> this buffer >> } >> >> pos_ += endPos; >> if (outBufOrNull != null) { >> outBufOrNull.writeBytes(sbuf.substring(0,endPos)); >> } >> } >> return matched; >> } >> >> // states >> final static int CDATA_IN = 10; >> final static int CDATA_OUT = 11; >> final static int CDATA_UNK = 12; >> final static int RECORD_ACCEPT = 13; >> // inputs >> final static int CDATA_BEGIN = 20; >> final static int CDATA_END = 21; >> final static int RECORD_MAYBE = 22; >> >> /* also updates firstMatchStart_;*/ >> int nextState(int state, int input, int bufPos) { >> switch (state) { >> case CDATA_UNK: >> case CDATA_OUT: >> switch (input) { >> case CDATA_BEGIN: >> return CDATA_IN; >> case CDATA_END: >> if (state == CDATA_OUT) { >> //System.out.println("buggy XML " + bufPos); >> } >> return CDATA_OUT; >> case RECORD_MAYBE: >> return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT; >> } >> break; >> case CDATA_IN: >> return (input == CDATA_END) ? CDATA_OUT : CDATA_IN; >> } >> throw new IllegalStateException(state + " " + input + " " + bufPos + " >> " + splitName_); >> } >> >> Pattern makePatternCDataOrMark(String escapedMark) { >> StringBuffer pat = new StringBuffer(); >> addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN >> addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END >> addGroup(pat, escapedMark); // RECORD_MAYBE >> return Pattern.compile(pat.toString()); >> } >> >> void addGroup(StringBuffer pat, String escapedGroup) { >> if (pat.length() > 0) { >> pat.append("|"); >> } >> pat.append("("); >> pat.append(escapedGroup); >> pat.append(")"); >> } >> >> boolean fastReadUntilMatch(String textPat, boolean includePat, >> DataOutputBuffer outBufOrNull) throws IOException { >> byte[] cpat = textPat.getBytes("UTF-8"); >> int m = 0; >> boolean match = false; >> int msup = cpat.length; >> int LL = 120000 * 10; >> >> bin_.mark(LL); // large number to invalidate mark >> while (true) { >> int b = bin_.read(); >> if (b == -1) break; >> >> byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8 >> if (c == cpat[m]) { >> m++; >> if (m == msup) { >> match = true; >> break; >> } >> } else { >> bin_.mark(LL); // rest mark so we could jump back if we found a >> match >> if (outBufOrNull != null) { >> outBufOrNull.write(cpat, 0, m); >> outBufOrNull.write(c); >> } >> pos_ += m + 1; // skip m chars, +1 for 'c' >> m = 0; >> } >> } >> if (!includePat && match) { >> bin_.reset(); >> } else if (outBufOrNull != null) { >> outBufOrNull.write(cpat); >> pos_ += msup; >> } >> return match; >> } >> >> String checkJobGet(String prop) throws IOException { >> String val = conf_.get(prop); >> if (val == null) { >> throw new IOException("JobConf: missing required property: " + >> prop); >> } >> return val; >> } >> >> String beginMark_; >> String endMark_; >> >> Pattern beginPat_; >> Pattern endPat_; >> >> boolean slowMatch_; >> int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should >> be more than max record size >> int maxRecSize_; >> >> BufferedInputStream bin_; // Wrap FSDataInputStream for efficient >> backward seeks >> long pos_; // Keep track on position with respect encapsulated >> FSDataInputStream >> >> final static int NA = -1; >> int firstMatchStart_ = 0; // candidate record boundary. Might just be >> CDATA. >> int firstMatchEnd_ = 0; >> >> boolean synched_; >> } >> >> StreamBaseRecordReader >> >> /** >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> package org.apache.hadoop.streaming; >> >> import java.io.*; >> >> import org.apache.hadoop.io.Text; >> import org.apache.hadoop.io.Writable; >> import org.apache.hadoop.io.WritableComparable; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.fs.FSDataInputStream; >> import org.apache.hadoop.mapred.Reporter; >> import org.apache.hadoop.mapred.RecordReader; >> import org.apache.hadoop.mapred.FileSplit; >> import org.apache.hadoop.mapred.JobConf; >> import org.apache.hadoop.mapreduce.TaskAttemptContext; >> import org.apache.commons.logging.*; >> >> /** >> * Shared functionality for hadoopStreaming formats. >> * A custom reader can be defined to be a RecordReader with the >> constructor below >> * and is selected with the option bin/hadoopStreaming -inputreader ... >> * @see StreamXmlRecordReader >> */ >> public abstract class StreamBaseRecordReader implements RecordReader<Text, >> Text> { >> >> protected static final Log LOG = >> LogFactory.getLog(StreamBaseRecordReader.class.getName()); >> >> // custom JobConf properties for this class are prefixed with this >> namespace >> final static String CONF_NS = "stream.recordreader."; >> >> public StreamBaseRecordReader(FSDataInputStream in, FileSplit split, >> TaskAttemptContext context, >> Configuration conf, FileSystem fs) throws >> IOException { >> in_ = in; >> split_ = split; >> start_ = split_.getStart(); >> length_ = split_.getLength(); >> end_ = start_ + length_; >> splitName_ = split_.getPath().getName(); >> this.context_ = context; >> conf_ = conf; >> fs_ = fs; >> >> statusMaxRecordChars_ = conf.getInt(CONF_NS + "statuschars", 200); >> } >> >> /// RecordReader API >> >> /** Read a record. Implementation should call numRecStats at the end >> */ >> public abstract boolean next(Text key, Text value) throws IOException; >> >> /** Returns the current position in the input. */ >> public synchronized long getPos() throws IOException { >> return in_.getPos(); >> } >> >> /** Close this to future operations.*/ >> public synchronized void close() throws IOException { >> in_.close(); >> } >> >> public float getProgress() throws IOException { >> if (end_ == start_) { >> return 1.0f; >> } else { >> return ((float)(in_.getPos() - start_)) / ((float)(end_ - start_)); >> } >> } >> >> public Text createKey() { >> return new Text(); >> } >> >> public Text createValue() { >> return new Text(); >> } >> >> /// StreamBaseRecordReader API >> >> /** Implementation should seek forward in_ to the first byte of the next >> record. >> * The initial byte offset in the stream is arbitrary. >> */ >> public abstract void seekNextRecordBoundary() throws IOException; >> >> void numRecStats(byte[] record, int start, int len) throws IOException { >> numRec_++; >> if (numRec_ == nextStatusRec_) { >> String recordStr = new String(record, start, Math.min(len, >> statusMaxRecordChars_), "UTF-8"); >> nextStatusRec_ += 100;//*= 10; >> String status = getStatus(recordStr); >> LOG.info(status); >> context_.setStatus(status); >> } >> } >> >> long lastMem = 0; >> >> String getStatus(CharSequence record) { >> long pos = -1; >> try { >> pos = getPos(); >> } catch (IOException io) { >> } >> String recStr; >> if (record.length() > statusMaxRecordChars_) { >> recStr = record.subSequence(0, statusMaxRecordChars_) + "..."; >> } else { >> recStr = record.toString(); >> } >> String unqualSplit = split_.getPath().getName() + ":" + >> split_.getStart() + "+" + split_.getLength(); >> String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + >> pos + " " + unqualSplit >> + " Processing record=" + recStr; >> status += " " + splitName_; >> return status; >> } >> >> FSDataInputStream in_; >> FileSplit split_; >> long start_; >> long end_; >> long length_; >> String splitName_; >> TaskAttemptContext context_; >> Configuration conf_; >> FileSystem fs_; >> int numRec_ = 0; >> int nextStatusRec_ = 1; >> int statusMaxRecordChars_; >> >> } >> >> >> >> On Tue, Jun 19, 2012 at 5:58 PM, Mohammad Tariq <donta...@gmail.com> >> wrote: >>> >>> But I have downloaded "hadoop-streaming-0.20.205.0.jar" and it >>> contains StreamXmlRecordReader.class file. This means it should >>> support StreamInputFormat. >>> >>> Regards, >>> Mohammad Tariq >>> >>> >>> On Tue, Jun 19, 2012 at 5:54 PM, Mohammad Tariq <donta...@gmail.com> >>> wrote: >>> > Thanks Madhu. I'll do that. >>> > >>> > Regards, >>> > Mohammad Tariq >>> > >>> > >>> > On Tue, Jun 19, 2012 at 5:43 PM, madhu phatak <phatak....@gmail.com> >>> > wrote: >>> >> Seems like StreamInputFormat not yet ported to new API.That's why you >>> >> are >>> >> not able to set as InputFormatClass. You can file a jira for this >>> >> issue. >>> >> >>> >> >>> >> On Tue, Jun 19, 2012 at 4:49 PM, Mohammad Tariq <donta...@gmail.com> >>> >> wrote: >>> >>> >>> >>> My driver function looks like this - >>> >>> >>> >>> public static void main(String[] args) throws IOException, >>> >>> InterruptedException, ClassNotFoundException { >>> >>> // TODO Auto-generated method stub >>> >>> >>> >>> Configuration conf = new Configuration(); >>> >>> Job job = new Job(); >>> >>> conf.set("stream.recordreader.class", >>> >>> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >>> >>> conf.set("stream.recordreader.begin", "<info>"); >>> >>> conf.set("stream.recordreader.end", "</info>"); >>> >>> job.setInputFormatClass(StreamInputFormat.class); >>> >>> job.setOutputKeyClass(Text.class); >>> >>> job.setOutputValueClass(IntWritable.class); >>> >>> FileInputFormat.addInputPath(job, new >>> >>> Path("/mapin/demo.xml")); >>> >>> FileOutputFormat.setOutputPath(job, new >>> >>> Path("/mapout/demo")); >>> >>> job.waitForCompletion(true); >>> >>> } >>> >>> >>> >>> Could you please out my mistake?? >>> >>> >>> >>> Regards, >>> >>> Mohammad Tariq >>> >>> >>> >>> >>> >>> On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <donta...@gmail.com> >>> >>> wrote: >>> >>> > Hello Madhu, >>> >>> > >>> >>> > Thanks for the response. Actually I was trying to use >>> >>> > the >>> >>> > new API (Job). Have you tried that. I was not able to set the >>> >>> > InputFormat using the Job API. >>> >>> > >>> >>> > Regards, >>> >>> > Mohammad Tariq >>> >>> > >>> >>> > >>> >>> > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak >>> >>> > <phatak....@gmail.com> >>> >>> > wrote: >>> >>> >> Hi, >>> >>> >> Set the following properties in driver class >>> >>> >> >>> >>> >> jobConf.set("stream.recordreader.class", >>> >>> >> "org.apache.hadoop.streaming.StreamXmlRecordReader"); >>> >>> >> jobConf.set("stream.recordreader.begin", >>> >>> >> "start-tag"); >>> >>> >> jobConf.set("stream.recordreader.end", >>> >>> >> "end-tag"); >>> >>> >> >>> >>> >> jobConf.setInputFormat(StreamInputFormat,class); >>> >>> >> >>> >>> >> In Mapper, xml record will come as key of type Text,so your >>> >>> >> mapper >>> >>> >> will >>> >>> >> look like >>> >>> >> >>> >>> >> public class MyMapper<K,V> implements Mapper<Text,Text,K,V> >>> >>> >> >>> >>> >> >>> >>> >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq >>> >>> >> <donta...@gmail.com> >>> >>> >> wrote: >>> >>> >>> >>> >>> >>> Hello list, >>> >>> >>> >>> >>> >>> Could anyone, who has written MapReduce jobs to process >>> >>> >>> xml >>> >>> >>> documents stored in there cluster using "StreamXmlRecordReader" >>> >>> >>> share >>> >>> >>> his/her experience??...or if you can provide me some pointers >>> >>> >>> addressing that..Many thanks. >>> >>> >>> >>> >>> >>> Regards, >>> >>> >>> Mohammad Tariq >>> >>> >> >>> >>> >> >>> >>> >> >>> >>> >> >>> >>> >> -- >>> >>> >> https://github.com/zinnia-phatak-dev/Nectar >>> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> -- >>> >> https://github.com/zinnia-phatak-dev/Nectar >>> >> >> >> >> >> >> -- >> https://github.com/zinnia-phatak-dev/Nectar >> > > > > -- > https://github.com/zinnia-phatak-dev/Nectar >