Do you ever close your DataOutputBuffer? -- Kris J.
On Tue, Aug 18, 2009 at 7:35 AM, Wasim Bari <[email protected]> wrote: > > Hi, > I tried anotherway to implement the InputFileFormat which returns > <Key,MultipleLines> as record to mapper. > > I used this logic: Used a LineRecordReader to read file line by line and > keep storing these lines in buffer. > when i encouters an empty string , Set the buffer to value and return the > result. Please see the attached code. > > > But i get Java Heap error. apparently its because of buffer writing, but > data is not so big and i am unable to find the solution. > > Please have a look and guide me. > > regards, > > ============================================ > package initial; > import java.io.IOException; > import org.apache.hadoop.io.DataOutputBuffer; > import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.FileSplit; > import org.apache.hadoop.mapred.InputSplit; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.RecordReader; > import org.apache.hadoop.mapred.Reporter; > import org.apache.hadoop.mapred.TextInputFormat; > import org.apache.log4j.Logger; > @SuppressWarnings("deprecation") > public class PTextInputFormat1 extends TextInputFormat { > > public void configure(JobConf jobConf) { > super.configure(jobConf); > } > public RecordReader<LongWritable, Text> getRecordReader(InputSplit > inputSplit, JobConf jobConf, > Reporter reporter) throws IOException { > return new PTextRecordReader((FileSplit) inputSplit, jobConf); > } > public static class PTextRecordReader implements RecordReader<LongWritable, > Text> { > private static final Logger sLogger = > Logger.getLogger(PTextRecordReader.class); > > private DataOutputBuffer buffer = new DataOutputBuffer(); > private JobConf job; > private FileSplit FSplit; > private long start; > private long end; > private int count; > org.apache.hadoop.mapred.LineRecordReader lineRecordReader; > public PTextRecordReader(FileSplit split, JobConf jobConf) throws > IOException { > > FSplit=split; > start = split.getStart(); > job = jobConf; > lineRecordReader = new > org.apache.hadoop.mapred.LineRecordReader(job,FSplit); > end = start + split.getLength(); > } > public boolean next(LongWritable key, Text value) throws IOException { > > if (lineRecordReader.next(key, value)){ > while (value.toString().length()!=0){ > buffer.write(value.getBytes()); > numberOfLines++; > } > key.set(count++); > value.set(buffer.getData(), 0, buffer.getLength()); > buffer.reset(); > return true; > } > buffer.reset(); > return false; > > } > > public LongWritable createKey() { > return new LongWritable(); > } > public Text createValue() { > return new Text(); > } > public long getStart() { > return start; > } > public long getEnd() { > return end; > } > public long getPos() throws IOException { > return lineRecordReader.getPos(); > } > > public float getProgress() throws IOException { > return lineRecordReader.getProgress(); > } > @Override > public void close() throws IOException { > lineRecordReader.close(); > > } > } > } >
