Hi all,

I am given a task to extract data from a big file to HDFS.
The input is a 1G text file contains millions of lines. The line starts with # which indicates a record. Subsequence lines which don't start with # are data of that record.
E.g:
# 1 2 A3 LOCS 43
4 FS 23 ....
5 SDF ....
# 3 4 D8
9 FS 45 ...
# 8 DFD 9
1 FS LL
2 LI O

the above file contains 3 records

The actual file contains around 1.5 million records.
The task is to extract those records, each into a text file and store in HDFS.

I've written a MapReduce program to do this job. But it doesn't seem to run as fast as I can imagine. Furthermore, it eats up all system resources in NameNode machine after few hours.
I tried to restart the program few times but it still can't finish the job.

The following describes what I did

First, I wrote a standalone program to split up the file into 100 smaller files which contains equal number of records (not lines). Then I use 100 split files as the inputs for my MapReduce program.
This is my configuration and pseudo code for the MapReduce program

_Configuration:_
Running Hadoop 0.13.0 on 3 machines
*machine 1*: Penitum D 3.2Ghz, 2G RAM. Running as a namenode and a jobtracker (1G each) *machine 2*: Dual Core AMD Opteron(tm) Processor 170, 2G Ram. Running as a datanode and a tasktracker (1G each) with configuration:
   mapred.tasktracker.tasks.maximum = 4
   mapred.child.java.opts = -Xmx150m
*machine 3*: Pentinum 4 HT 3.0 Ghz, 2G Ram. Running as a datanode and a tasktracker (1G each) with configuration:
   mapred.tasktracker.tasks.maximum = 4
   mapred.child.java.opts = -Xmx150

The MapReduce program is triggered in machine 1

_Pseudocode:
_CustomRecordReader:

constructor(file)
begin
   this.file = file;
end

function next(key, value)
begin
   ((Text) key). set(file.getUri().toString())
end

CustomInputFormat:
function getSplits(jobConf, numSplits):
begin
   splits = empty list;
inputPaths = jobConf.getInputPaths(); // which returns only one, which contains 100 split files
   fs = FileSystem.get(jobConf);
   for each path in inputPaths
   begin
      for each file in fs.listPaths(path)
      begin
splits.add(new FileSplits(file, 0, 1, job)); // just want to use the name of the file
      end
   end
end

function getRecordReader(split, jobConf, reporter)
begin
   return new CustomeRecordReader(((FileSplit) split).getPath());
end

Mapper:

function map(key, value, out, reporter)
begin
   file = new Path(((Text) key).toString());
   recCount = 0;
   start reading the file
   for each set of lines which forms a record
   begin
      recCount++
      out.collect(recCount, recordLines in string)
   end
   close the file
end
Reducer:

function reduce(key, values, out, reporter)
begin
// there'd be more than one values as same recCount may be produced by multiple mappers
   for each value in values
   begin
      recordLines = value
      xmlRecord = convertToXml(recordLines)
      fileTemp = save xmlRecord to temp file
      copy fileTemp to HDFS using fileSystem.copyFromLocalFile
// The reason I have to save xmlRecord to a temp file because if using Sequence.Writer, the text appears in the HDFS file is not pure text
      // Is there other solution?
   end
end


Driver:
Input format: CustomInputFormat
Output format: nulloutputformat
Number of mapper: 7
Number of reducer: 17
SpeculativeExecution: true

Sorry for my lengthy post.
Any suggestion and comments are highly appreciated and hope our discussion will bring up more understanding about Hadoop and MapReduce.

Cheers,

Trung

Reply via email to