[ 
https://issues.apache.org/jira/browse/CRUNCH-278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13793967#comment-13793967
 ] 

Gabriel Reid commented on CRUNCH-278:
-------------------------------------

I think that there's still something that I think I'm not totally getting, or 
I'm looking at this from the wrong angle.

Taking two different cases of the same kind of thing, I think we need to be 
able to distinguish how we want them to be dealt with, as follows.

In this example, we want the DoFns leading up to the toBundle() call to be run 
in a separate job:
{code:java}
PTable<K,V> hugeTable = pipeline.read(...);
PTable<K,V> muchSmallerTable = hugeTable.parallelDo(myFilterFn);
// Now MapsideJoinStrategy will call muchSmallerTable.toBundle(), but we want
// myFilterFn to have run in a separate MR job leading up to this MapsideJoin 
because
// it's reducing a huge amount of data into something that fits in memory
PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, muchSmallerTable);
{code}

and in this example we want the DoFns to be run in memory while directly 
reading in smallTable from the Source
{code:java}
PTable<K,V> smallTable = pipeline.read(...);
PTable<K,V> filteredSmallTable = smallTable.parallelDo(myFilterFn);
// Now MapsideJoinStrategy will call filteredSmallTable.toBundle(), and we want
// myFilterFn to be run in each of the mappers while materializing the contents 
// of smallTable, because there's no need to kick of a separate MR job for that
// due to the small size of the data
PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, 
filteredSmallTable);
{code}

I think that the point is that there needs to be the ability in the API to set 
a spot where we can say "everything from here on in will be run in memory". We 
can do that with something on ParallelDoOptions, but I think we can run into 
the same problem again where it's hard to define what will (or even what 
should) happen if you want to write a PCollection to storage if it's got some 
in-memory operations defined somewhere further upstream in the Pipeline.

FWIW, I'm pretty much convinced that the MaterializedPCollection isn't the way 
to go for this.

> Improvements to MapsideJoin code
> --------------------------------
>
>                 Key: CRUNCH-278
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-278
>             Project: Crunch
>          Issue Type: Bug
>          Components: Core, MapReduce Patterns
>            Reporter: Josh Wills
>            Assignee: Josh Wills
>         Attachments: CRUNCH-278.patch
>
>
> The fact that we have special-case code in the MapsideJoinStrategy for the 
> in-memory and MR-based Pipeline instances has always bugged me, so I set out 
> to eliminate the distinction between the two impls by creating a new 
> interface, ReadableSourceBundle<T>, that encapsulates the MR and in-memory 
> specific logic for doing mapside joins in order to remove the special-case 
> code in MapsideJoinStrategy and hopefully make other implementations that use 
> our mapside-join patterns much easier to test.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to