I believe PIG, and I know Cascading use a kind of 'spillable' list
that can be re-iterated across. PIG's version is a bit more
sophisticated last I looked.
that said, if you were using either one of them, you wouldn't need to
write your own many-to-many join.
cheers,
ckw
On May 28, 2009, at 8:14 AM, Todd Lipcon wrote:
One last possible trick to consider:
If you were to subclass SequenceFileRecordReader, you'd have access
to its
seek method, allowing you to rewind the reducer input. You could then
implement a block hash join with something like the following
pseudocode:
ahash = new HashMap<Key, Val>();
while (i have ram available) {
read a record
if the record is from table B, break
put the record into ahash
}
nextAPos = reader.getPos()
while (current record is an A record) {
skip to next record
}
firstBPos = reader.getPos()
while (current record has current key) {
read and join against ahash
process joined result
}
if firstBPos > nextAPos {
seek(nextAPos)
go back to top
}
On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon <t...@cloudera.com>
wrote:
Hi Stuart,
It seems to me like you have a few options.
Option 1: Just use a lot of RAM. Unless you really expect many
millions of
entries on both sides of the join, you might be able to get away with
buffering despite its inefficiency.
Option 2: Use LocalDirAllocator to find some local storage to spill
all of
the left table's records to disk in a MapFile format. Then as you
iterate
over the right table, do lookups in the MapFile. This is really the
same as
option 1, except that you're using disk as an extension of RAM.
Option 3: Convert this to a map-side merge join. Basically what you
need to
do is sort both tables by the join key, and partition them with the
same
partitioner into the same number of columns. This way you have an
equal
number of part-NNNNN files for both tables, and within each part-
NNNNN file
they're ordered by join key. In each map task, you open both tableA/
part-N
and tableB/part-N and do a sequential merge to perform the join. I
believe
the CompositeInputFormat class helps with this, though I've never
used it.
Option 4: Perform the join in several passes. Whichever table is
smaller,
break into pieces that fit in RAM. Unless my relational algebra is
off, A
JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B =
B1 UNION
B2.
Hope that helps
-Todd
On Thu, May 28, 2009 at 5:02 AM, Stuart White <stuart.whi...@gmail.com
>wrote:
I need to do a reduce-side join of two datasets. It's a many-to-
many
join; that is, each dataset can can multiple records with any given
key.
Every description of a reduce-side join I've seen involves
constructing your keys out of your mapper such that records from one
dataset will be presented to the reducers before records from the
second dataset. I should "hold on" to the value from the one
dataset
and remember it as I iterate across the values from the second
dataset.
This seems like it only works well for one-to-many joins (when one
of
your datasets will only have a single record with any given key).
This scales well because you're only remembering one value.
In a many-to-many join, if you apply this same algorithm, you'll
need
to remember all values from one dataset, which of course will be
problematic (and won't scale) when dealing with large datasets with
large numbers of records with the same keys.
Does an efficient algorithm exist for a many-to-many reduce-side
join?
--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com