At this time I don't see any way to do this in map/reduce. I guess I have to go to back plan A of writing a primitive java tool scaled horizontally using java executors and vertically processed across multiple machines.
pmg wrote: > > First make smaller chunks of your big files (small enough that one chunk > can > be stored in memory). Hadoop's block size is set to 64MB by default. If > this > seems ok according to the RAM you have, then simply run Identity Mapper > only > job on for both Files A and B. The output will be smaller files with the > names part-0001, part-0002 etc. For simplicty let us call chunks of File A > as A1, A2, A3... and chunks of B as B1, B2, B3 > >>> I am planning to run this on amazon elastic map with large cpu so RAM I >>> think would not be a problem. > I can have smaller input files outside map/reduce so I guess we don't have > to run this phase to get small file chunks as A1, A2, A3... and chunks of > B as B1, B2, B3 > > Create a file (or write a program that will generate this file) that > contains the cross product of these chunks- > A1 B1 > A1 B2 > A1 B3 > .. > A2 B1 > A2 B2 > A2 B3 > .. > >>> Correct me If I am wrong. the actual FileA that gets divided into chunks >>> A1,A2...has around 600K file records. FileB that gets divided into B1, >>> B2....has around 2 million file record. So I guess we looking at file >>> record size of cartesian product of 600K * 2Millions. We are looking at >>> peta bytes of data. This would be a hard sell :) > > > Tarandeep wrote: >> >> hey I think I got your question wrong. My solution won't let you achieve >> what you intended. your example made it clear. >> >> Since it is a cross product, the contents of one of the files has to be >> in >> memory for iteration, but since size is big, so might not be possible, so >> how about this solution and this will scale too- >> >> First make smaller chunks of your big files (small enough that one chunk >> can >> be stored in memory). Hadoop's block size is set to 64MB by default. If >> this >> seems ok according to the RAM you have, then simply run Identity Mapper >> only >> job on for both Files A and B. The output will be smaller files with the >> names part-0001, part-0002 etc. For simplicty let us call chunks of File >> A >> as A1, A2, A3... and chunks of B as B1, B2, B3 >> >> Create a file (or write a program that will generate this file) that >> contains the cross product of these chunks- >> A1 B1 >> A1 B2 >> A1 B3 >> .. >> A2 B1 >> A2 B2 >> A2 B3 >> .. >> >> Now run a Map only job (no reducer). Use NLineInputFormat and set N = 1. >> give input to your job this file. NLineInputFormat will give each mapper >> a >> line from this file. So for example, lets say a mapper got the line A1 >> B3, >> which means take cross product of the contents of chunk A1 and chunk B1. >> >> you can read one of the chunk completely and store in memory as a list or >> array. And then read second chunk and do the comparison. >> >> Now, as you would have guessed, instead of creating chunks, you can >> actually >> calculate offsets in the files (after an interval of say 64MB) and can >> achieve the same effect. HDFS allows seeking to an offset in a file so >> that >> will work too. >> >> -Tarandeep >> >> >> >> On Fri, Jun 19, 2009 at 4:33 PM, pmg <parmod.me...@gmail.com> wrote: >> >>> >>> Thanks Tarandeep for prompt reply. >>> >>> Let me give you an example structure of FileA and FileB >>> >>> FileA >>> ------- >>> >>> 123 ABC 1 >>> >>> >>> FileB >>> ----- >>> 123 ABC 2 >>> 456 BNF 3 >>> >>> Both the files are tab delimited. Every record is not simply compared >>> with >>> each record in FileB. There's heuristic I am going to run for the >>> comparison >>> and score the results along with output. So my output file is like this >>> >>> Output >>> -------- >>> >>> 123 ABC 1 123 ABC 2 10 >>> 123 ABC 1 456 BNF 3 20 >>> >>> first 3 columns in the output file are from FileA, next three columns >>> are >>> from FileB and the last column is their comparison score. >>> >>> So basically you are saying we can use two map/reduce jobs for FileA and >>> other for FileB >>> >>> map (FileA) -> reduce (FileA)-> map (FileB) -> reduce (FileB) >>> >>> For the first file FileA I map them with <k,V> (I can't use bloom filter >>> because comparison between each record from FileA is not a straight >>> comparison with every record in FileB - They are compared using >>> heuristic >>> and scored them for their quantitative comparison and stored) >>> >>> In the FileA reduce I store it in the distributed cache. Once this is >>> done >>> map the FileB in the second map and in the FileB reduce read in the >>> FileA >>> from the distributed cache and do my heuristics for every <K,V) from >>> FileB >>> and store my result >>> >>> thanks >>> >>> >>> Tarandeep wrote: >>> > >>> > oh my bad, I was not clear- >>> > >>> > For FileB, you will be running a second map reduce job. In mapper, you >>> can >>> > use the Bloom Filter, created in first map reduce job (if you wish to >>> use) >>> > to eliminate the lines whose keys dont match. Mapper will emit >>> key,value >>> > pair, where key is teh field on which you want to do comparison and >>> value >>> > is >>> > the whole line. >>> > >>> > when the key,value pairs go to reducers, then you have lines from >>> FileB >>> > sorted on the field yon want to use for comparison. Now you can read >>> > contents of FileA (note that if you ran first job with N reducers, you >>> > will >>> > have N paritions of FileA and you want to read only the partition >>> meant >>> > for >>> > this reducer). Content of FileA is also sorted on the field, Now you >>> can >>> > easily compare the lines from two files. >>> > >>> > CloudBase- cloudbase.sourceforge.net has code for doing join this >>> fashion. >>> > >>> > Let me know if you need more clarification. >>> > >>> > -Tarandeep >>> > >>> > On Fri, Jun 19, 2009 at 3:45 PM, pmg <parmod.me...@gmail.com> wrote: >>> > >>> >> >>> >> thanks tarandeep >>> >> >>> >> Correct if I am wrong that when I map FileA mapper created key,value >>> pair >>> >> and sends across to the reducer. If so then how can I compare when >>> FileB >>> >> is >>> >> not even mapped yet. >>> >> >>> >> >>> >> Tarandeep wrote: >>> >> > >>> >> > On Fri, Jun 19, 2009 at 2:41 PM, pmg <parmod.me...@gmail.com> >>> wrote: >>> >> > >>> >> >> >>> >> >> For the sake of simplification I have simplified my input into two >>> >> files >>> >> >> 1. >>> >> >> FileA 2. FileB >>> >> >> >>> >> >> As I said earlier I want to compare every record of FileA against >>> >> every >>> >> >> record in FileB I know this is n2 but this is the process. I wrote >>> a >>> >> >> simple >>> >> >> InputFormat and RecordReader. It seems each file is read serially >>> one >>> >> >> after >>> >> >> another. How can my record read have reference to both files at >>> the >>> >> same >>> >> >> line so that I can create cross list of FileA and FileB for the >>> >> mapper. >>> >> >> >>> >> >> Basically the way I see is to get mapper one record from FileA and >>> all >>> >> >> records from FileB so that mapper can compare n2 and forward them >>> to >>> >> >> reducer. >>> >> > >>> >> > >>> >> > It will be hard (and inefficient) to do this in Mapper using some >>> >> custom >>> >> > intput format. What you can do is use Semi Join technique- >>> >> > >>> >> > Since File A is smaller, run a map reduce job that will output >>> >> key,value >>> >> > pair where key is the field or set of fields on which you want to >>> do >>> >> the >>> >> > comparison and value is the whole line. >>> >> > >>> >> > The reducer is simply an Identity reducer which writes the files. >>> So >>> >> your >>> >> > fileA has been partitioned on the field(s). you can also create >>> bloom >>> >> > filter >>> >> > on this field and store it in Distributed Cache. >>> >> > >>> >> > Now read FileB, load Bloom filter into memory and see if the field >>> from >>> >> > line >>> >> > of FileB is present in Bloom filter, if yes emit Key,Value pair >>> else >>> >> not. >>> >> > >>> >> > At reducers, you get the contents of FileB partitioned just like >>> >> contents >>> >> > of >>> >> > fileA were partitioned and at a particular reducer you get lines >>> sorted >>> >> on >>> >> > the field you want to do the comparison, At this point you read the >>> >> > contents >>> >> > of FileA that reached this reducer and since its contents were >>> sorted >>> >> as >>> >> > well, you can quickly go over the two lists. >>> >> > >>> >> > -Tarandeep >>> >> > >>> >> >> >>> >> >> >>> >> >> thanks >>> >> >> >>> >> >> >>> >> >> >>> >> >> pmg wrote: >>> >> >> > >>> >> >> > Thanks owen. Are there any examples that I can look at? >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > owen.omalley wrote: >>> >> >> >> >>> >> >> >> On Jun 18, 2009, at 10:56 AM, pmg wrote: >>> >> >> >> >>> >> >> >>> Each line from FileA gets compared with every line from >>> FileB1, >>> >> >> >>> FileB2 etc. >>> >> >> >>> etc. FileB1, FileB2 etc. are in a different input directory >>> >> >> >> >>> >> >> >> In the general case, I'd define an InputFormat that takes two >>> >> >> >> directories, computes the input splits for each directory and >>> >> >> >> generates a new list of InputSplits that is the cross-product >>> of >>> >> the >>> >> >> >> two lists. So instead of FileSplit, it would use a >>> FileSplitPair >>> >> that >>> >> >> >> gives the FileSplit for dir1 and the FileSplit for dir2 and the >>> >> record >>> >> >> >> reader would return a TextPair with left and right records (ie. >>> >> >> >> lines). Clearly, you read the first line of split1 and cross it >>> by >>> >> >> >> each line from split2, then move to the second line of split1 >>> and >>> >> >> >> process each line from split2, etc. >>> >> >> >> >>> >> >> >> You'll need to ensure that you don't overwhelm the system with >>> >> either >>> >> >> >> too many input splits (ie. maps). Also don't forget that N^2/M >>> >> grows >>> >> >> >> much faster with the size of the input (N) than the M machines >>> can >>> >> >> >> handle in a fixed amount of time. >>> >> >> >> >>> >> >> >>> Two input directories >>> >> >> >>> >>> >> >> >>> 1. input1 directory with a single file of 600K records - FileA >>> >> >> >>> 2. input2 directory segmented into different files with >>> 2Million >>> >> >> >>> records - >>> >> >> >>> FileB1, FileB2 etc. >>> >> >> >> >>> >> >> >> In this particular case, it would be right to load all of FileA >>> >> into >>> >> >> >> memory and process the chunks of FileB/part-*. Then it would be >>> >> much >>> >> >> >> faster than needing to re-read the file over and over again, >>> but >>> >> >> >> otherwise it would be the same. >>> >> >> >> >>> >> >> >> -- Owen >>> >> >> >> >>> >> >> >> >>> >> >> > >>> >> >> > >>> >> >> >>> >> >> -- >>> >> >> View this message in context: >>> >> >> http://www.nabble.com/multiple-file-input-tp24095358p24119228.html >>> >> >> Sent from the Hadoop core-user mailing list archive at Nabble.com. >>> >> >> >>> >> >> >>> >> > >>> >> > >>> >> >>> >> -- >>> >> View this message in context: >>> >> http://www.nabble.com/multiple-file-input-tp24095358p24119864.html >>> >> Sent from the Hadoop core-user mailing list archive at Nabble.com. >>> >> >>> >> >>> > >>> > >>> >>> -- >>> View this message in context: >>> http://www.nabble.com/multiple-file-input-tp24095358p24120283.html >>> Sent from the Hadoop core-user mailing list archive at Nabble.com. >>> >>> >> >> > > -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24126978.html Sent from the Hadoop core-user mailing list archive at Nabble.com.