Kayla,
Here is the custom xml reader.
---------------------------------------------------------
Colin Evans <[EMAIL PROTECTED]> wrote:
Date: Tue, 04 Mar 2008 11:27:34 -0800
From: Colin Evans <[EMAIL PROTECTED]>
To: [email protected]
Subject: Re: map/reduce function on xml string
Here's the code. If folks are interested, I can submit it as a patch as well.
Prasan Ary wrote:
Colin, Is it possible that you share some of the code with us? thx,
Prasan Colin Evans <[EMAIL PROTECTED]> wrote: We ended up subclassing
TextInputFormat and adding a custom RecordReader that starts and ends record
reads on tags. The StreamXmlRecordReader class is a good reference for this.
Prasan Ary wrote:
Hi All, I am writing a java implementation for my map/reduce function on
hadoop. Input to this is a xml file, and the map function has to process a
well formed xml records. So far I have been unable to split the xml file at xml
record boundary to feed into my map function. Can anybody point me to
resources where forcing file split at desired boundary is explained ? thx,
Pra. --------------------------------- Be a better friend, newshound, and
know-it-all with Yahoo! Mobile. Try it now.
--------------------------------- Looking for last minute
shopping deals? Find them fast with Yahoo! Search.
package com.metaweb.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
public void configure(JobConf jobConf) {
super.configure(jobConf);
}
public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
return new XmlRecordReader((FileSplit) inputSplit, jobConf);
}
public static class XmlRecordReader implements RecordReader {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
startTag = jobConf.get("xmlinput.start").getBytes("utf-8");
endTag = jobConf.get("xmlinput.end").getBytes("utf-8");
// open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}
public boolean next(WritableComparable key, Writable value) throws IOException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
((Text) key).set(Long.toString(fsin.getPos()));
((Text) value).set(buffer.getData(), 0, buffer.getLength());
return true;
}
}
finally {
buffer.reset();
}
}
}
return false;
}
public WritableComparable createKey() {
return new Text();
}
public Writable createValue() {
return new Text();
}
public long getPos() throws IOException {
return fsin.getPos();
}
public void close() throws IOException {
fsin.close();
}
public float getProgress() throws IOException {
return ((float) (fsin.getPos() - start)) / ((float) (end - start));
}
/////////////////////////////////////////////////
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws
IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) return false;
// save to buffer:
if (withinBlock) buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we've passed the stop point:
if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}
}
}
---------------------------------
Be a better friend, newshound, and know-it-all with Yahoo! Mobile. Try it now.package com.metaweb.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
/**
* Reads records that are delimited by a specifc begin/end tag.
*/
public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
public void configure(JobConf jobConf) {
super.configure(jobConf);
}
public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
return new XmlRecordReader((FileSplit) inputSplit, jobConf);
}
public static class XmlRecordReader implements RecordReader {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
public XmlRecordReader(FileSplit split, JobConf jobConf) throws
IOException {
startTag = jobConf.get("xmlinput.start").getBytes("utf-8");
endTag = jobConf.get("xmlinput.end").getBytes("utf-8");
// open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}
public boolean next(WritableComparable key, Writable value) throws
IOException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
((Text) key).set(Long.toString(fsin.getPos()));
((Text) value).set(buffer.getData(), 0,
buffer.getLength());
return true;
}
}
finally {
buffer.reset();
}
}
}
return false;
}
public WritableComparable createKey() {
return new Text();
}
public Writable createValue() {
return new Text();
}
public long getPos() throws IOException {
return fsin.getPos();
}
public void close() throws IOException {
fsin.close();
}
public float getProgress() throws IOException {
return ((float) (fsin.getPos() - start)) / ((float) (end - start));
}
/////////////////////////////////////////////////
private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) return false;
// save to buffer:
if (withinBlock) buffer.write(b);
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) return true;
} else i = 0;
// see if we've passed the stop point:
if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;
}
}
}
}