Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1138#discussion_r40439650
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
    @@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) {
                }
     
                @Override
    -           protected JoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
    +           protected AbstractJoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
    +                   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
    +
    +                   JoinOperatorBaseBuilder<OUT> builder = new 
JoinOperatorBaseBuilder<OUT>(name, joinType)
    +                                   .withParallelism(getParallelism())
    +                                   .withPartitioner(getPartitioner())
    +                                   .withJoinHint(getJoinHint())
    +                                   .withResultType(getResultType());
    +
    +                   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
    +                   if (requiresTupleUnwrapping) {
    +                           if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
    +                                   // Both join sides have a key selector 
function, so we need to do the
    +                                   // tuple wrapping/unwrapping on both 
sides.
    +
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I1, ?> 
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I2, ?> 
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    +
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleUnwrappingJoiner<>(function))
    +                                                   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
    +                                                   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
    +                           } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
    +                                   // The right side of the join needs the 
tuple wrapping/unwrapping
    +
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I2, ?> 
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    +
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
    +                                                   .withInput1(input1, 
getInput1Type(), keys1)
    +                                                   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
    +                           } else {
    +                                   // The left side of the join needs the 
tuple wrapping/unwrapping
     
    -                   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I1, ?> 
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
     
    -                   final JoinOperatorBase<?, ?, OUT, ?> translated;
    -                   
    -                   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
    -                           // Both join sides have a key selector 
function, so we need to do the
    -                           // tuple wrapping/unwrapping on both sides.
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
    +                                                   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
    +                                                   .withInput2(input2, 
getInput2Type(), keys2);
    +                           }
    +                   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
    +                           // Neither side needs the tuple 
wrapping/unwrapping
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 
= (Keys.SelectorFunctionKeys<I1, ?>) keys1;
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 
= (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    -                           
    -                           PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> 
po =
    -                                           
translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, 
    -                                           getInput1Type(), 
getInput2Type(), getResultType(), name, input1, input2);
    -                           
    -                           // set parallelism
    -                           po.setParallelism(this.getParallelism());
    -                           
    -                           translated = po;
    +                           builder = builder
    +                                           .withUdf(function)
    +                                           .withInput1(input1, 
getInput1Type(), keys1)
    +                                           .withInput2(input2, 
getInput2Type(), keys2);
    +                   } else {
    +                           throw new 
UnsupportedOperationException("Unrecognized or incompatible key types.");
                        }
    -                   else if (keys2 instanceof Keys.SelectorFunctionKeys) {
    -                           // The right side of the join needs the tuple 
wrapping/unwrapping
     
    -                           int[] logicalKeyPositions1 = 
keys1.computeLogicalKeyPositions();
    +                   return builder.build();
    +           }
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
    -                                           (Keys.SelectorFunctionKeys<I2, 
?>) keys2;
     
    -                           PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> 
po =
    -                                           
translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
    -                                                           function, 
getInput1Type(), getInput2Type(), getResultType(), name,
    -                                                           input1, input2);
    +           private static final class JoinOperatorBaseBuilder<OUT> {
    +
    +                   private final String name;
    +                   private final JoinType joinType;
    +
    +                   private int parallelism;
    +                   private FlatJoinFunction udf;
    --- End diff --
    
    There are a few compiler warning here complaining about "raw use of 
parameterized classes".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to