At this time I don't see any way to do this in map/reduce. I guess I have to
go to back plan A of writing a primitive java tool scaled horizontally using
java executors and vertically processed across multiple machines.  



pmg wrote:
> 
> First make smaller chunks of your big files (small enough that one chunk
> can
> be stored in memory). Hadoop's block size is set to 64MB by default. If
> this
> seems ok according to the RAM you have, then simply run Identity Mapper
> only
> job on for both Files A and B. The output will be smaller files with the
> names part-0001, part-0002 etc. For simplicty let us call chunks of File A
> as A1, A2, A3... and chunks of B as B1, B2, B3
> 
>>> I am planning to run this on amazon elastic map with large cpu so RAM I
>>> think would not be a problem.
> I can have smaller input files outside map/reduce so I guess we don't have
> to run this phase to get small file chunks as A1, A2, A3... and chunks of
> B as B1, B2, B3
> 
> Create a file (or write a program that will generate this file) that
> contains the cross product of these chunks-
> A1 B1
> A1 B2
> A1 B3
> ..
> A2 B1
> A2 B2
> A2 B3
> ..
> 
>>> Correct me If I am wrong. the actual FileA that gets divided into chunks
>>> A1,A2...has around 600K file records. FileB that gets divided into B1,
>>> B2....has around 2 million file record. So I guess we looking at file
>>> record size of cartesian product of 600K * 2Millions. We are looking at
>>> peta bytes of data. This would be a hard sell :)
> 
> 
> Tarandeep wrote:
>> 
>> hey I think I got your question wrong. My solution won't let you achieve
>> what you intended. your example made it clear.
>> 
>> Since it is a cross product, the contents of one of the files has to be
>> in
>> memory for iteration, but since size is big, so might not be possible, so
>> how about this solution and this will scale too-
>> 
>> First make smaller chunks of your big files (small enough that one chunk
>> can
>> be stored in memory). Hadoop's block size is set to 64MB by default. If
>> this
>> seems ok according to the RAM you have, then simply run Identity Mapper
>> only
>> job on for both Files A and B. The output will be smaller files with the
>> names part-0001, part-0002 etc. For simplicty let us call chunks of File
>> A
>> as A1, A2, A3... and chunks of B as B1, B2, B3
>> 
>> Create a file (or write a program that will generate this file) that
>> contains the cross product of these chunks-
>> A1 B1
>> A1 B2
>> A1 B3
>> ..
>> A2 B1
>> A2 B2
>> A2 B3
>> ..
>> 
>> Now run a Map only job (no reducer). Use NLineInputFormat and set N = 1.
>> give input to your job this file. NLineInputFormat will give each mapper
>> a
>> line from this file. So for example, lets say a mapper got the line A1
>> B3,
>> which means take cross product of the contents of chunk A1 and chunk B1.
>> 
>> you can read one of the chunk completely and store in memory as a list or
>> array. And then read second chunk and do the comparison.
>> 
>> Now, as you would have guessed, instead of creating chunks, you can
>> actually
>> calculate offsets in the files (after an interval of say 64MB) and can
>> achieve the same effect. HDFS allows seeking to an offset in a file so
>> that
>> will work too.
>> 
>> -Tarandeep
>> 
>> 
>> 
>> On Fri, Jun 19, 2009 at 4:33 PM, pmg <parmod.me...@gmail.com> wrote:
>> 
>>>
>>> Thanks Tarandeep for prompt reply.
>>>
>>> Let me give you an example structure of FileA and FileB
>>>
>>> FileA
>>> -------
>>>
>>> 123 ABC 1
>>>
>>>
>>> FileB
>>> -----
>>> 123 ABC 2
>>> 456 BNF 3
>>>
>>> Both the files are tab delimited. Every record is not simply compared
>>> with
>>> each record in FileB. There's heuristic I am going to run for the
>>> comparison
>>> and score the results along with output. So my output file is like this
>>>
>>> Output
>>> --------
>>>
>>> 123 ABC 1 123 ABC 2 10
>>> 123 ABC 1 456 BNF 3 20
>>>
>>> first 3 columns in the output file are from FileA, next three columns
>>> are
>>> from FileB and the last column is their comparison score.
>>>
>>> So basically you are saying we can use two map/reduce jobs for FileA and
>>> other for FileB
>>>
>>> map (FileA) -> reduce (FileA)-> map (FileB) -> reduce (FileB)
>>>
>>> For the first file FileA I map them with <k,V> (I can't use bloom filter
>>> because comparison between each record from FileA is not a straight
>>> comparison with every record in FileB - They are compared using
>>> heuristic
>>> and scored them for their quantitative comparison and stored)
>>>
>>> In the FileA reduce I store it in the distributed cache. Once this is
>>> done
>>> map the FileB in the second map and in the FileB reduce read in the
>>> FileA
>>> from the distributed cache and do my heuristics for every <K,V) from
>>> FileB
>>> and store my result
>>>
>>> thanks
>>>
>>>
>>> Tarandeep wrote:
>>> >
>>> > oh my bad, I was not clear-
>>> >
>>> > For FileB, you will be running a second map reduce job. In mapper, you
>>> can
>>> > use the Bloom Filter, created in first map reduce job (if you wish to
>>> use)
>>> > to eliminate the lines whose keys dont match. Mapper will emit
>>> key,value
>>> > pair, where key is teh field on which you want to do comparison and
>>> value
>>> > is
>>> > the whole line.
>>> >
>>> > when the key,value pairs go to reducers, then you have lines from
>>> FileB
>>> > sorted on the field yon want to use for comparison. Now you can read
>>> > contents of FileA (note that if you ran first job with N reducers, you
>>> > will
>>> > have N paritions of FileA and you want to read only the partition
>>> meant
>>> > for
>>> > this reducer). Content of FileA is also sorted on the field, Now you
>>> can
>>> > easily compare the lines from two files.
>>> >
>>> > CloudBase- cloudbase.sourceforge.net has code for doing join this
>>> fashion.
>>> >
>>> > Let me know if you need more clarification.
>>> >
>>> > -Tarandeep
>>> >
>>> > On Fri, Jun 19, 2009 at 3:45 PM, pmg <parmod.me...@gmail.com> wrote:
>>> >
>>> >>
>>> >> thanks tarandeep
>>> >>
>>> >> Correct if I am wrong that when I map FileA mapper created key,value
>>> pair
>>> >> and sends across to the reducer. If so then how can I compare when
>>> FileB
>>> >> is
>>> >> not even mapped yet.
>>> >>
>>> >>
>>> >> Tarandeep wrote:
>>> >> >
>>> >> > On Fri, Jun 19, 2009 at 2:41 PM, pmg <parmod.me...@gmail.com>
>>> wrote:
>>> >> >
>>> >> >>
>>> >> >> For the sake of simplification I have simplified my input into two
>>> >> files
>>> >> >> 1.
>>> >> >> FileA 2. FileB
>>> >> >>
>>> >> >> As I said earlier I want to compare every record of FileA against
>>> >> every
>>> >> >> record in FileB I know this is n2 but this is the process. I wrote
>>> a
>>> >> >> simple
>>> >> >> InputFormat and RecordReader. It seems each file is read serially
>>> one
>>> >> >> after
>>> >> >> another. How can my record read have reference to both files at
>>> the
>>> >> same
>>> >> >> line so that I can create cross list of FileA and FileB for the
>>> >> mapper.
>>> >> >>
>>> >> >> Basically the way I see is to get mapper one record from FileA and
>>> all
>>> >> >> records from FileB so that mapper can compare n2 and forward them
>>> to
>>> >> >> reducer.
>>> >> >
>>> >> >
>>> >> > It will be hard (and inefficient) to do this in Mapper using some
>>> >> custom
>>> >> > intput format. What you can do is use Semi Join technique-
>>> >> >
>>> >> > Since File A is smaller, run a map reduce job that will output
>>> >> key,value
>>> >> > pair where key is the field or set of fields on which you want to
>>> do
>>> >> the
>>> >> > comparison and value is the whole line.
>>> >> >
>>> >> > The reducer is simply an Identity reducer which writes the files.
>>> So
>>> >> your
>>> >> > fileA has been partitioned on the field(s). you can also create
>>> bloom
>>> >> > filter
>>> >> > on this field and store it in Distributed Cache.
>>> >> >
>>> >> > Now read FileB, load Bloom filter into memory and see if the field
>>> from
>>> >> > line
>>> >> > of FileB is present in Bloom filter, if yes emit Key,Value pair
>>> else
>>> >> not.
>>> >> >
>>> >> > At reducers, you get the contents of FileB partitioned just like
>>> >> contents
>>> >> > of
>>> >> > fileA were partitioned and at a particular reducer you get lines
>>> sorted
>>> >> on
>>> >> > the field you want to do the comparison, At this point you read the
>>> >> > contents
>>> >> > of FileA that reached this reducer and since its contents were
>>> sorted
>>> >> as
>>> >> > well, you can quickly go over the two lists.
>>> >> >
>>> >> > -Tarandeep
>>> >> >
>>> >> >>
>>> >> >>
>>> >> >> thanks
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >> pmg wrote:
>>> >> >> >
>>> >> >> > Thanks owen. Are there any examples that I can look at?
>>> >> >> >
>>> >> >> >
>>> >> >> >
>>> >> >> > owen.omalley wrote:
>>> >> >> >>
>>> >> >> >> On Jun 18, 2009, at 10:56 AM, pmg wrote:
>>> >> >> >>
>>> >> >> >>> Each line from FileA gets compared with every line from
>>> FileB1,
>>> >> >> >>> FileB2 etc.
>>> >> >> >>> etc. FileB1, FileB2 etc. are in a different input directory
>>> >> >> >>
>>> >> >> >> In the general case, I'd define an InputFormat that takes two
>>> >> >> >> directories, computes the input splits for each directory and
>>> >> >> >> generates a new list of InputSplits that is the cross-product
>>> of
>>> >> the
>>> >> >> >> two lists. So instead of FileSplit, it would use a
>>> FileSplitPair
>>> >> that
>>> >> >> >> gives the FileSplit for dir1 and the FileSplit for dir2 and the
>>> >> record
>>> >> >> >> reader would return a TextPair with left and right records (ie.
>>> >> >> >> lines). Clearly, you read the first line of split1 and cross it
>>> by
>>> >> >> >> each line from split2, then move to the second line of split1
>>> and
>>> >> >> >> process each line from split2, etc.
>>> >> >> >>
>>> >> >> >> You'll need to ensure that you don't overwhelm the system with
>>> >> either
>>> >> >> >> too many input splits (ie. maps). Also don't forget that N^2/M
>>> >> grows
>>> >> >> >> much faster with the size of the input (N) than the M machines
>>> can
>>> >> >> >> handle in a fixed amount of time.
>>> >> >> >>
>>> >> >> >>> Two input directories
>>> >> >> >>>
>>> >> >> >>> 1. input1 directory with a single file of 600K records - FileA
>>> >> >> >>> 2. input2 directory segmented into different files with
>>> 2Million
>>> >> >> >>> records -
>>> >> >> >>> FileB1, FileB2 etc.
>>> >> >> >>
>>> >> >> >> In this particular case, it would be right to load all of FileA
>>> >> into
>>> >> >> >> memory and process the chunks of FileB/part-*. Then it would be
>>> >> much
>>> >> >> >> faster than needing to re-read the file over and over again,
>>> but
>>> >> >> >> otherwise it would be the same.
>>> >> >> >>
>>> >> >> >> -- Owen
>>> >> >> >>
>>> >> >> >>
>>> >> >> >
>>> >> >> >
>>> >> >>
>>> >> >> --
>>> >> >> View this message in context:
>>> >> >> http://www.nabble.com/multiple-file-input-tp24095358p24119228.html
>>> >> >> Sent from the Hadoop core-user mailing list archive at Nabble.com.
>>> >> >>
>>> >> >>
>>> >> >
>>> >> >
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >> http://www.nabble.com/multiple-file-input-tp24095358p24119864.html
>>> >> Sent from the Hadoop core-user mailing list archive at Nabble.com.
>>> >>
>>> >>
>>> >
>>> >
>>>
>>> --
>>> View this message in context:
>>> http://www.nabble.com/multiple-file-input-tp24095358p24120283.html
>>> Sent from the Hadoop core-user mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/multiple-file-input-tp24095358p24126978.html
Sent from the Hadoop core-user mailing list archive at Nabble.com.

Reply via email to