[
https://issues.apache.org/jira/browse/FLINK-2576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941060#comment-14941060
]
ASF GitHub Bot commented on FLINK-2576:
---------------------------------------
Github user jkovacs commented on a diff in the pull request:
https://github.com/apache/flink/pull/1138#discussion_r41014523
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends
FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2,
OUT, FT> {
+
+ public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+ private OuterJoinType outerJoinType;
+
+ public OuterJoinOperatorBase(UserCodeWrapper<FT> udf,
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name,
OuterJoinType outerJoinType) {
+ super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+ this.outerJoinType = outerJoinType;
+ }
+
+ public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1,
IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name,
OuterJoinType outerJoinType) {
+ super(new UserCodeObjectWrapper<FT>(udf), operatorInfo,
keyPositions1, keyPositions2, name);
+ this.outerJoinType = outerJoinType;
+ }
+
+ public OuterJoinOperatorBase(Class<? extends FT> udf,
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name,
OuterJoinType outerJoinType) {
+ super(new UserCodeClassWrapper<FT>(udf), operatorInfo,
keyPositions1, keyPositions2, name);
+ this.outerJoinType = outerJoinType;
+ }
+
+ public void setOuterJoinType(OuterJoinType outerJoinType) {
+ this.outerJoinType = outerJoinType;
+ }
+
+ public OuterJoinType getOuterJoinType() {
+ return outerJoinType;
+ }
+
+ @Override
+ protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2>
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig)
throws Exception {
+ TypeInformation<IN1> leftInformation =
getOperatorInfo().getFirstInputType();
+ TypeInformation<IN2> rightInformation =
getOperatorInfo().getSecondInputType();
+ TypeInformation<OUT> outInformation =
getOperatorInfo().getOutputType();
+
+ TypeComparator<IN1> leftComparator = buildComparatorFor(0,
executionConfig, leftInformation);
+ TypeComparator<IN2> rightComparator = buildComparatorFor(1,
executionConfig, rightInformation);
+
+ TypeSerializer<IN1> leftSerializer =
leftInformation.createSerializer(executionConfig);
+ TypeSerializer<IN2> rightSerializer =
rightInformation.createSerializer(executionConfig);
+
+ OuterJoinListIterator<IN1, IN2> outerJoinIterator =
+ new OuterJoinListIterator<>(leftInput,
leftSerializer, leftComparator,
+ rightInput, rightSerializer,
rightComparator, outerJoinType);
+
+ //
--------------------------------------------------------------------
+ // Run UDF
+ //
--------------------------------------------------------------------
+ FlatJoinFunction<IN1, IN2, OUT> function =
userFunction.getUserCodeObject();
+
+ FunctionUtils.setFunctionRuntimeContext(function,
runtimeContext);
+ FunctionUtils.openFunction(function, this.parameters);
+
+
+ List<OUT> result = new ArrayList<>();
+ Collector<OUT> collector = new CopyingListCollector<>(result,
outInformation.createSerializer(executionConfig));
+
+ while (outerJoinIterator.next()) {
+ IN1 left = outerJoinIterator.getLeft();
+ IN2 right = outerJoinIterator.getRight();
+ function.join(left == null ? null :
leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right),
collector);
+ }
+
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> TypeComparator<T> buildComparatorFor(int input,
ExecutionConfig executionConfig, TypeInformation<T> typeInformation) {
+ TypeComparator<T> comparator;
+ if (typeInformation instanceof AtomicType) {
+ comparator = ((AtomicType<T>)
typeInformation).createComparator(true, executionConfig);
+ } else if (typeInformation instanceof CompositeType) {
+ int[] keyPositions = getKeyColumns(input);
+ boolean[] orders = new boolean[keyPositions.length];
+ Arrays.fill(orders, true);
+
+ comparator = ((CompositeType<T>)
typeInformation).createComparator(keyPositions, orders, 0, executionConfig);
+ } else {
+ throw new RuntimeException("Type information for input
of type " + typeInformation.getClass()
+ .getCanonicalName() + " is not
supported. Could not generate a comparator.");
+ }
+ return comparator;
+ }
+
+ private static class OuterJoinListIterator<IN1, IN2> {
+
+
+ private static enum MatchStatus {
+ NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED,
FIRST_EMPTY, SECOND_EMPTY
+ }
+
+ private OuterJoinType outerJoinType;
+
+ private ListKeyGroupedIterator<IN1> leftGroupedIterator;
+ private ListKeyGroupedIterator<IN2> rightGroupedIterator;
+ private Iterable<IN1> currLeftSubset;
+ private ResettableIterator currLeftIterator;
+ private Iterable<IN2> currRightSubset;
+ private ResettableIterator currRightIterator;
+
+ private MatchStatus matchStatus;
+ private GenericPairComparator<IN1, IN2> pairComparator;
+
+ private IN1 leftReturn;
+ private IN2 rightReturn;
+
+ public OuterJoinListIterator(List<IN1> leftInput,
TypeSerializer<IN1> leftSerializer, final TypeComparator<IN1> leftComparator,
+ List<IN2> rightInput, TypeSerializer<IN2>
rightSerializer, final TypeComparator<IN2> rightComparator,
+ OuterJoinType outerJoinType) {
+ this.outerJoinType = outerJoinType;
+ pairComparator = new
GenericPairComparator<>(leftComparator, rightComparator);
+ leftGroupedIterator = new
ListKeyGroupedIterator<>(leftInput, leftSerializer, leftComparator);
+ rightGroupedIterator = new
ListKeyGroupedIterator<>(rightInput, rightSerializer, rightComparator);
+ //
----------------------------------------------------------------
+ // Sort
+ //
----------------------------------------------------------------
+ Collections.sort(leftInput, new Comparator<IN1>() {
+ @Override
+ public int compare(IN1 o1, IN1 o2) {
+ return leftComparator.compare(o1, o2);
+ }
+ });
+
+ Collections.sort(rightInput, new Comparator<IN2>() {
+ @Override
+ public int compare(IN2 o1, IN2 o2) {
+ return rightComparator.compare(o1, o2);
+ }
+ });
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean next() throws IOException {
+ boolean hasMoreElements;
+ if ((currLeftIterator == null ||
!currLeftIterator.hasNext()) && (currRightIterator == null ||
!currRightIterator.hasNext())) {
+ hasMoreElements = nextGroups(outerJoinType);
+ if (hasMoreElements) {
+ if (outerJoinType !=
OuterJoinType.LEFT) {
+ currLeftIterator = new
ListIteratorWrapper(currLeftSubset.iterator());
+ }
+ leftReturn = (IN1)
currLeftIterator.next();
+ if (outerJoinType !=
OuterJoinType.RIGHT) {
+ currRightIterator = new
ListIteratorWrapper(currRightSubset.iterator());
+ }
+ rightReturn = (IN2)
currRightIterator.next();
+ return true;
+ } else {
+ //no more elements
+ return false;
+ }
+ } else if (currLeftIterator.hasNext() &&
!currRightIterator.hasNext()) {
--- End diff --
Technically true, but I believe from the control flow that scenario is
impossible, since either both iterators get reassigned something non-null at
the same time, or both remain null and the method returns `false` (no more
elements). @r-pogalz can you confirm that or can we make this more explicit?
> Add outer joins to API and Optimizer
> ------------------------------------
>
> Key: FLINK-2576
> URL: https://issues.apache.org/jira/browse/FLINK-2576
> Project: Flink
> Issue Type: Sub-task
> Components: Java API, Optimizer, Scala API
> Reporter: Ricky Pogalz
> Priority: Minor
> Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and
> to the optimizer of Flink.
> Initially, the execution strategy should be a sort-merge outer join
> (FLINK-2105) but can later be extended to hash joins for left/right outer
> joins.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)