[
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589892#comment-14589892
]
Johann Kovacs commented on FLINK-2105:
--------------------------------------
@[~aalexandrov] and watchers: Based on our understanding of flink internals and
[~fhueske]'s hints we decided for now to implement the outer join operator as a
special case of the sort merge inner join, as implemented in {{MatchDriver}}
and the mentioned iterator classes.
During discussion, the question came up of how and where to handle, or protect
against, cases where the partitioning of the inputs is invalid for the selected
operator and operator strategy (please see examples below). I believe the
broader question would be how the optimizer works internally and how to extend
it, which may be answered by completing
[these|https://cwiki.apache.org/confluence/display/FLINK/Adding+a+new+Operator+Step-by-step]
[wiki|https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals]
pages (hint hint ;)).
Our assumptions and conclusions so far follow below. Please let us know if any
of those are wrong:
* In the case where one input side is broadcast and you try to perform a
sort-merge outer join on the _same side that was broadcast_ (or a full outer
join), you don't know whether to emit (x, null) or whether there is maybe a
matching key x on the right side on some other node.
* Similarly, the same problem occurs if you were to perform a sort-merge based
cogroup, with one side being broadcast.
* Is there even a case in the current implementation where you would broadcast
one side and perform a sort-merge join as opposed to a hash join with the
broadcast side as the build side? This scenario wouldn't make a lot of sense
IMO; I don't think we found the option for that in the source code either.
(E.g. there is no {{BROADCAST_SORT_MERGE}} in the {{JoinHint}} enum.)
* Similarly, co group is only implemented as a sort-merge strategy, which knows
nothing about the actual partitioning of the data, thus it will only work for
both sides repartitioned.
Because of that we assume that:
* there must be some component in the flink runtime which decides which
partitioning makes sense for which operator and operator strategy. For example,
if the optimizer chooses the left side shall be broadcast, then the
{{MatchDriver}} should perform a {{HYBRIDHASH_BUILD_FIRST}} join, and so on
* Similarly, if it encounters a CoGroup operator, both sides will need to be
repartitioned
* We believe this is somehow part of the optimizer/dag package, but would
appreciate a hint where to look for this
Because of _this_:
* keeping track of which side was broadcast, repartitioned, sorted or grouped
doesn't appear to be the responsibility of the Driver implementation or, in the
case of the {{MatchDriver}}, the iterator implementations that perform the
sort-merge or hash joins, correct?
* Similarly, for example, the {{BuildFirst-}} and
{{BuildSecondHashMatchIterators}} don't actually check which side was broadcast
and which was repartitioned. Apparently it just assumes the optimizer did its
job correctly in constructing the data flow graph and stupidly does as is told.
Same for the CoGroup driver and iterator.
* I would assume we can make the same assumption for our outer join
implementation?
Thanks
> Implement Sort-Merge Outer Join algorithm
> -----------------------------------------
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
> Issue Type: Sub-task
> Components: Local Runtime
> Reporter: Fabian Hueske
> Assignee: Ricky Pogalz
> Priority: Minor
> Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment.
> This issue proposes to implement a sort-merge outer join algorithm that can
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are
> reused or new objects are created. I would start with the NonReusing variant
> which is safer from a user's point of view and should also be easier to
> implement.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)