[
https://issues.apache.org/jira/browse/CRUNCH-278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13793967#comment-13793967
]
Gabriel Reid commented on CRUNCH-278:
-------------------------------------
I think that there's still something that I think I'm not totally getting, or
I'm looking at this from the wrong angle.
Taking two different cases of the same kind of thing, I think we need to be
able to distinguish how we want them to be dealt with, as follows.
In this example, we want the DoFns leading up to the toBundle() call to be run
in a separate job:
{code:java}
PTable<K,V> hugeTable = pipeline.read(...);
PTable<K,V> muchSmallerTable = hugeTable.parallelDo(myFilterFn);
// Now MapsideJoinStrategy will call muchSmallerTable.toBundle(), but we want
// myFilterFn to have run in a separate MR job leading up to this MapsideJoin
because
// it's reducing a huge amount of data into something that fits in memory
PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, muchSmallerTable);
{code}
and in this example we want the DoFns to be run in memory while directly
reading in smallTable from the Source
{code:java}
PTable<K,V> smallTable = pipeline.read(...);
PTable<K,V> filteredSmallTable = smallTable.parallelDo(myFilterFn);
// Now MapsideJoinStrategy will call filteredSmallTable.toBundle(), and we want
// myFilterFn to be run in each of the mappers while materializing the contents
// of smallTable, because there's no need to kick of a separate MR job for that
// due to the small size of the data
PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left,
filteredSmallTable);
{code}
I think that the point is that there needs to be the ability in the API to set
a spot where we can say "everything from here on in will be run in memory". We
can do that with something on ParallelDoOptions, but I think we can run into
the same problem again where it's hard to define what will (or even what
should) happen if you want to write a PCollection to storage if it's got some
in-memory operations defined somewhere further upstream in the Pipeline.
FWIW, I'm pretty much convinced that the MaterializedPCollection isn't the way
to go for this.
> Improvements to MapsideJoin code
> --------------------------------
>
> Key: CRUNCH-278
> URL: https://issues.apache.org/jira/browse/CRUNCH-278
> Project: Crunch
> Issue Type: Bug
> Components: Core, MapReduce Patterns
> Reporter: Josh Wills
> Assignee: Josh Wills
> Attachments: CRUNCH-278.patch
>
>
> The fact that we have special-case code in the MapsideJoinStrategy for the
> in-memory and MR-based Pipeline instances has always bugged me, so I set out
> to eliminate the distinction between the two impls by creating a new
> interface, ReadableSourceBundle<T>, that encapsulates the MR and in-memory
> specific logic for doing mapside joins in order to remove the special-case
> code in MapsideJoinStrategy and hopefully make other implementations that use
> our mapside-join patterns much easier to test.
--
This message was sent by Atlassian JIRA
(v6.1#6144)