Check out this patch - it uses a StAX parser and is more extensible.

Alan Ho

----- Original Message ----
From: Colin Evans <[EMAIL PROTECTED]>
To: [email protected]
Sent: Tuesday, March 4, 2008 11:27:34 AM
Subject: Re: map/reduce function on xml string

    Here's the code.  If folks are interested, I can submit it as a patchas 
well.



Prasan Ary wrote:  Colin,  Is it possible that you share some of the code with 
us?     thx,  PrasanColin Evans <[EMAIL PROTECTED]> wrote:  We ended up 
subclassing TextInputFormat and adding a custom RecordReader that starts and 
ends record reads on tags. TheStreamXmlRecordReader class is a good reference 
for this.Prasan Ary wrote:        Hi All,
I am writing a java implementation for my map/reduce function on hadoop.
Input to this is a xml file, and the map function has to process a well formed 
xml records. So far I have been unable to split the xml file at xml record 
boundary to feed into my map function.
Can anybody point me to resources where forcing file split at desired boundary 
is explained ?

thx,
Pra.


---------------------------------
Be a better friend, newshound, and know-it-all with Yahoo! Mobile. Try it now.

        

       
---------------------------------
Looking for last minute shopping deals?  Find them fast with Yahoo! Search.
  


-----Inline Attachment Follows-----

package com.metaweb.hadoop.util;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;

import java.io.IOException;

/**
 * Reads records that are delimited by a specifc begin/end tag.
 */
public class XmlInputFormat extends TextInputFormat {
    public static final String START_TAG_KEY = "xmlinput.start";
    public static final String END_TAG_KEY = "xmlinput.end";

    public void configure(JobConf jobConf) {
        super.configure(jobConf);
    }

    public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, 
Reporter reporter) throws IOException {
        return new XmlRecordReader((FileSplit) inputSplit, jobConf);
    }

    public static class XmlRecordReader implements RecordReader {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();

        public XmlRecordReader(FileSplit split, JobConf jobConf) throws 
IOException {
            startTag = jobConf.get("xmlinput.start").getBytes("utf-8");
            endTag = jobConf.get("xmlinput.end").getBytes("utf-8");

            // open the file and seek to the start of the split
            start = split.getStart();
            end = start + split.getLength();
            Path file = split.getPath();
            FileSystem fs = file.getFileSystem(jobConf);
            fsin = fs.open(split.getPath());
            fsin.seek(start);
        }

        public boolean next(WritableComparable key, Writable value) throws 
IOException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        buffer.write(startTag);
                        if (readUntilMatch(endTag, true)) {
                            ((Text) key).set(Long.toString(fsin.getPos()));
                            ((Text) value).set(buffer.getData(), 0, 
buffer.getLength());
                            return true;
                        }
                    }
                    finally {
                        buffer.reset();
                    }
                }
            }
            return false;
        }

        public WritableComparable createKey() {
            return new Text();
        }

        public Writable createValue() {
            return new Text();
        }

        public long getPos() throws IOException {
            return fsin.getPos();
        }

        public void close() throws IOException {
            fsin.close();
        }

        public float getProgress() throws IOException {
            return ((float) (fsin.getPos() - start)) / ((float) (end - start));
        }

        /////////////////////////////////////////////////

        private boolean readUntilMatch(byte[] match, boolean withinBlock) 
throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();
                // end of file:
                if (b == -1) return false;
                // save to buffer:
                if (withinBlock) buffer.write(b);

                // check if we're matching:
                if (b == match[i]) {
                    i++;
                    if (i >= match.length) return true;
                } else i = 0;
                // see if we've passed the stop point:
                if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;
            }
        }
    }
}








      Looking for the perfect gift? Give the gift of Flickr! 

http://www.flickr.com/gift/

Reply via email to