[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591613#comment-14591613
 ] 

Fabian Hueske commented on FLINK-2105:
--------------------------------------

Hi, 
I'm commenting on your assumptions first:

bq. In the case where one input side is broadcast and you try to perform a 
sort-merge outer join on the same side that was broadcast (or a full outer 
join), you don't know whether to emit (x, null) or whether there is maybe a 
matching key x on the right side on some other node.

A Left-outer join can be executed using a repartition-repartition or 
forward-broadcast ship strategy (right-outer join is switched, full outer join 
only possible using repartition-repartition)

bq. Similarly, the same problem occurs if you were to perform a sort-merge 
based cogroup, with one side being broadcast.

CoGroup is always executed as repartition-repartition (as you observed later).

bq. Is there even a case in the current implementation where you would 
broadcast one side and perform a sort-merge join as opposed to a hash join with 
the broadcast side as the build side? This scenario wouldn't make a lot of 
sense IMO; I don't think we found the option for that in the source code 
either. (E.g. there is no BROADCAST_SORT_MERGE in the JoinHint enum.)

That can be beneficial if a sorted result can be reused. Broadcast-forward and 
HybridHash are more common, though.

bq. there must be some component in the flink runtime which decides which 
partitioning makes sense for which operator and operator strategy. For example, 
if the optimizer chooses the left side shall be broadcast, then the MatchDriver 
should perform a HYBRIDHASH_BUILD_FIRST join, and so on

bq. keeping track of which side was broadcast, repartitioned, sorted or grouped 
doesn't appear to be the responsibility of the Driver implementation or, in the 
case of the MatchDriver, the iterator implementations that perform the 
sort-merge or hash joins, correct?

The optimizer takes care of choosing the execution strategies. It tracks the 
physical properties (sorting, partitioning) of intermediate results and 
partitions the data in an appropriate way and chooses the local and driver 
strategies. All of that is already decided and fixed when the program is 
executed. The driver does not decide anything and only does what the optimizer 
told it to do.

I would recommend to not think about partitioning yet and focus on the local 
join algorithm. You can safely assume, that the data is partitioned in a 
suitable way and work the local join algorithm. It is important that you cover 
your implementation with a lot of unit tests to make sure it works as expected. 
Once the local algorithm is done, it needs to be integrated into a Driver and 
all the optimizer integration needs to be done.

> Implement Sort-Merge Outer Join algorithm
> -----------------------------------------
>
>                 Key: FLINK-2105
>                 URL: https://issues.apache.org/jira/browse/FLINK-2105
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment. 
> This issue proposes to implement a sort-merge outer join algorithm that can 
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator 
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also 
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are 
> reused or new objects are created. I would start with the NonReusing variant 
> which is safer from a user's point of view and should also be easier to 
> implement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to