[ 
https://issues.apache.org/jira/browse/FLINK-2576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948555#comment-14948555
 ] 

ASF GitHub Bot commented on FLINK-2576:
---------------------------------------

Github user r-pogalz commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1138#discussion_r41505034
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 ---
    @@ -0,0 +1,314 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators.base;
    +
    +import org.apache.commons.collections.ResettableIterator;
    +import org.apache.commons.collections.iterators.ListIteratorWrapper;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.functions.util.CopyingListCollector;
    +import org.apache.flink.api.common.functions.util.FunctionUtils;
    +import org.apache.flink.api.common.operators.BinaryOperatorInformation;
    +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
    +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
    +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
    +import org.apache.flink.api.common.operators.util.UserCodeWrapper;
    +import org.apache.flink.api.common.typeinfo.AtomicType;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.CompositeType;
    +import org.apache.flink.api.common.typeutils.GenericPairComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +
    +public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, 
OUT, FT> {
    +
    +   public static enum OuterJoinType {LEFT, RIGHT, FULL}
    +
    +   private OuterJoinType outerJoinType;
    +
    +   public OuterJoinOperatorBase(UserCodeWrapper<FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
    +                   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
    +           super(udf, operatorInfo, keyPositions1, keyPositions2, name);
    +           this.outerJoinType = outerJoinType;
    +   }
    +
    +   public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, 
IN2, OUT> operatorInfo,
    +                   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
    +           super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
    +           this.outerJoinType = outerJoinType;
    +   }
    +
    +   public OuterJoinOperatorBase(Class<? extends FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
    +                   int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
    +           super(new UserCodeClassWrapper<FT>(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
    +           this.outerJoinType = outerJoinType;
    +   }
    +
    +   public void setOuterJoinType(OuterJoinType outerJoinType) {
    +           this.outerJoinType = outerJoinType;
    +   }
    +
    +   public OuterJoinType getOuterJoinType() {
    +           return outerJoinType;
    +   }
    +
    +   @Override
    +   protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2> 
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) 
throws Exception {
    +           TypeInformation<IN1> leftInformation = 
getOperatorInfo().getFirstInputType();
    +           TypeInformation<IN2> rightInformation = 
getOperatorInfo().getSecondInputType();
    +           TypeInformation<OUT> outInformation = 
getOperatorInfo().getOutputType();
    +
    +           TypeComparator<IN1> leftComparator = buildComparatorFor(0, 
executionConfig, leftInformation);
    +           TypeComparator<IN2> rightComparator = buildComparatorFor(1, 
executionConfig, rightInformation);
    +
    +           TypeSerializer<IN1> leftSerializer = 
leftInformation.createSerializer(executionConfig);
    +           TypeSerializer<IN2> rightSerializer = 
rightInformation.createSerializer(executionConfig);
    +
    +           OuterJoinListIterator<IN1, IN2> outerJoinIterator =
    +                           new OuterJoinListIterator<>(leftInput, 
leftSerializer, leftComparator,
    +                                           rightInput, rightSerializer, 
rightComparator, outerJoinType);
    +
    +           // 
--------------------------------------------------------------------
    +           // Run UDF
    +           // 
--------------------------------------------------------------------
    +           FlatJoinFunction<IN1, IN2, OUT> function = 
userFunction.getUserCodeObject();
    +
    +           FunctionUtils.setFunctionRuntimeContext(function, 
runtimeContext);
    +           FunctionUtils.openFunction(function, this.parameters);
    +
    +
    +           List<OUT> result = new ArrayList<>();
    +           Collector<OUT> collector = new CopyingListCollector<>(result, 
outInformation.createSerializer(executionConfig));
    +
    +           while (outerJoinIterator.next()) {
    +                   IN1 left = outerJoinIterator.getLeft();
    +                   IN2 right = outerJoinIterator.getRight();
    +                   function.join(left == null ? null : 
leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), 
collector);
    +           }
    +
    +           return result;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private <T> TypeComparator<T> buildComparatorFor(int input, 
ExecutionConfig executionConfig, TypeInformation<T> typeInformation) {
    +           TypeComparator<T> comparator;
    +           if (typeInformation instanceof AtomicType) {
    +                   comparator = ((AtomicType<T>) 
typeInformation).createComparator(true, executionConfig);
    +           } else if (typeInformation instanceof CompositeType) {
    +                   int[] keyPositions = getKeyColumns(input);
    +                   boolean[] orders = new boolean[keyPositions.length];
    +                   Arrays.fill(orders, true);
    +
    +                   comparator = ((CompositeType<T>) 
typeInformation).createComparator(keyPositions, orders, 0, executionConfig);
    +           } else {
    +                   throw new RuntimeException("Type information for input 
of type " + typeInformation.getClass()
    +                                   .getCanonicalName() + " is not 
supported. Could not generate a comparator.");
    +           }
    +           return comparator;
    +   }
    +
    +   private static class OuterJoinListIterator<IN1, IN2> {
    +
    +
    +           private static enum MatchStatus {
    +                   NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, 
FIRST_EMPTY, SECOND_EMPTY
    +           }
    +
    +           private OuterJoinType outerJoinType;
    +
    +           private ListKeyGroupedIterator<IN1> leftGroupedIterator;
    +           private ListKeyGroupedIterator<IN2> rightGroupedIterator;
    +           private Iterable<IN1> currLeftSubset;
    +           private ResettableIterator currLeftIterator;
    +           private Iterable<IN2> currRightSubset;
    +           private ResettableIterator currRightIterator;
    +
    +           private MatchStatus matchStatus;
    +           private GenericPairComparator<IN1, IN2> pairComparator;
    +
    +           private IN1 leftReturn;
    +           private IN2 rightReturn;
    +
    +           public OuterJoinListIterator(List<IN1> leftInput, 
TypeSerializer<IN1> leftSerializer, final TypeComparator<IN1> leftComparator,
    +                           List<IN2> rightInput, TypeSerializer<IN2> 
rightSerializer, final TypeComparator<IN2> rightComparator,
    +                           OuterJoinType outerJoinType) {
    +                   this.outerJoinType = outerJoinType;
    +                   pairComparator = new 
GenericPairComparator<>(leftComparator, rightComparator);
    +                   leftGroupedIterator = new 
ListKeyGroupedIterator<>(leftInput, leftSerializer, leftComparator);
    +                   rightGroupedIterator = new 
ListKeyGroupedIterator<>(rightInput, rightSerializer, rightComparator);
    +                   // 
----------------------------------------------------------------
    +                   // Sort
    +                   // 
----------------------------------------------------------------
    +                   Collections.sort(leftInput, new Comparator<IN1>() {
    +                           @Override
    +                           public int compare(IN1 o1, IN1 o2) {
    +                                   return leftComparator.compare(o1, o2);
    +                           }
    +                   });
    +
    +                   Collections.sort(rightInput, new Comparator<IN2>() {
    +                           @Override
    +                           public int compare(IN2 o1, IN2 o2) {
    +                                   return rightComparator.compare(o1, o2);
    +                           }
    +                   });
    +
    +           }
    +
    +           @SuppressWarnings("unchecked")
    +           private boolean next() throws IOException {
    +                   boolean hasMoreElements;
    +                   if ((currLeftIterator == null || 
!currLeftIterator.hasNext()) && (currRightIterator == null || 
!currRightIterator.hasNext())) {
    +                           hasMoreElements = nextGroups(outerJoinType);
    +                           if (hasMoreElements) {
    +                                   if (outerJoinType != 
OuterJoinType.LEFT) {
    +                                           currLeftIterator = new 
ListIteratorWrapper(currLeftSubset.iterator());
    +                                   }
    +                                   leftReturn = (IN1) 
currLeftIterator.next();
    +                                   if (outerJoinType != 
OuterJoinType.RIGHT) {
    +                                           currRightIterator = new 
ListIteratorWrapper(currRightSubset.iterator());
    +                                   }
    +                                   rightReturn = (IN2) 
currRightIterator.next();
    +                                   return true;
    +                           } else {
    +                                   //no more elements
    +                                   return false;
    +                           }
    +                   } else if (currLeftIterator.hasNext() && 
!currRightIterator.hasNext()) {
    --- End diff --
    
    @jkovacs is right, it is not possible that ```currLeftIterator``` or 
```currRightIterator``` are null at this point, as they are just wrappers 
around the subsets and always get assigned. In case that a subset is null and 
```hasNext()``` is called, the wrapper will return false. 
    There are also tests in ```OuterJoinOperatorBaseTest``` which cover the 
cases where one side of the outer join is empty.


> Add outer joins to API and Optimizer
> ------------------------------------
>
>                 Key: FLINK-2576
>                 URL: https://issues.apache.org/jira/browse/FLINK-2576
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Java API, Optimizer, Scala API
>            Reporter: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and 
> to the optimizer of Flink.
> Initially, the execution strategy should be a sort-merge outer join 
> (FLINK-2105) but can later be extended to hash joins for left/right outer 
> joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to