[
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14647531#comment-14647531
]
ASF GitHub Bot commented on FLINK-2105:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/907#discussion_r35863850
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
---
@@ -138,63 +122,23 @@ public void abort() {
* key, and then calls the match stub with the cross product of the
values.
*
* @throws Exception Forwards all exceptions from the user code and the
I/O system.
- *
* @see
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
org.apache.flink.util.Collector)
*/
@Override
- public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O>
matchFunction, final Collector<O> collector)
- throws Exception
- {
- if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
- // consume all remaining keys (hack to prevent
remaining inputs during iterations, lets get rid of this soon)
- while (this.iterator1.nextKey());
- while (this.iterator2.nextKey());
-
- return false;
- }
+ public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2,
O> matchFunction, final Collector<O> collector)
+ throws Exception;
- final TypePairComparator<T1, T2> comparator = this.comp;
- comparator.setReference(this.iterator1.getCurrent());
- T2 current2 = this.iterator2.getCurrent();
-
- // zig zag
- while (true) {
- // determine the relation between the (possibly
composite) keys
- final int comp =
comparator.compareToReference(current2);
-
- if (comp == 0) {
- break;
- }
-
- if (comp < 0) {
- if (!this.iterator2.nextKey()) {
- return false;
- }
- current2 = this.iterator2.getCurrent();
- }
- else {
- if (!this.iterator1.nextKey()) {
- return false;
- }
-
comparator.setReference(this.iterator1.getCurrent());
- }
- }
-
- // here, we have a common key! call the match function with the
cross product of the
- // values
- final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 =
this.iterator1.getValues();
- final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 =
this.iterator2.getValues();
-
+ protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2>
values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
throws Exception {
--- End diff --
matchFunction -> joinFunction
> Implement Sort-Merge Outer Join algorithm
> -----------------------------------------
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
> Issue Type: Sub-task
> Components: Local Runtime
> Reporter: Fabian Hueske
> Assignee: Ricky Pogalz
> Priority: Minor
> Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment.
> This issue proposes to implement a sort-merge outer join algorithm that can
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are
> reused or new objects are created. I would start with the NonReusing variant
> which is safer from a user's point of view and should also be easier to
> implement.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)