Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1138#discussion_r40439495
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
    @@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) {
                }
     
                @Override
    -           protected JoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
    +           protected AbstractJoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
    +                   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
    +
    +                   JoinOperatorBaseBuilder<OUT> builder = new 
JoinOperatorBaseBuilder<OUT>(name, joinType)
    +                                   .withParallelism(getParallelism())
    +                                   .withPartitioner(getPartitioner())
    +                                   .withJoinHint(getJoinHint())
    +                                   .withResultType(getResultType());
    +
    +                   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
    +                   if (requiresTupleUnwrapping) {
    +                           if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
    +                                   // Both join sides have a key selector 
function, so we need to do the
    +                                   // tuple wrapping/unwrapping on both 
sides.
    +
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I1, ?> 
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I2, ?> 
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    +
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleUnwrappingJoiner<>(function))
    +                                                   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
    +                                                   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
    +                           } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
    +                                   // The right side of the join needs the 
tuple wrapping/unwrapping
    +
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I2, ?> 
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    +
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
    +                                                   .withInput1(input1, 
getInput1Type(), keys1)
    +                                                   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
    +                           } else {
    +                                   // The left side of the join needs the 
tuple wrapping/unwrapping
     
    -                   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I1, ?> 
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
     
    -                   final JoinOperatorBase<?, ?, OUT, ?> translated;
    -                   
    -                   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
    -                           // Both join sides have a key selector 
function, so we need to do the
    -                           // tuple wrapping/unwrapping on both sides.
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
    +                                                   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
    +                                                   .withInput2(input2, 
getInput2Type(), keys2);
    +                           }
    +                   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
    +                           // Neither side needs the tuple 
wrapping/unwrapping
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 
= (Keys.SelectorFunctionKeys<I1, ?>) keys1;
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 
= (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    -                           
    -                           PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> 
po =
    -                                           
translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, 
    -                                           getInput1Type(), 
getInput2Type(), getResultType(), name, input1, input2);
    -                           
    -                           // set parallelism
    -                           po.setParallelism(this.getParallelism());
    -                           
    -                           translated = po;
    +                           builder = builder
    +                                           .withUdf(function)
    +                                           .withInput1(input1, 
getInput1Type(), keys1)
    +                                           .withInput2(input2, 
getInput2Type(), keys2);
    +                   } else {
    +                           throw new 
UnsupportedOperationException("Unrecognized or incompatible key types.");
                        }
    -                   else if (keys2 instanceof Keys.SelectorFunctionKeys) {
    -                           // The right side of the join needs the tuple 
wrapping/unwrapping
     
    -                           int[] logicalKeyPositions1 = 
keys1.computeLogicalKeyPositions();
    +                   return builder.build();
    +           }
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
    -                                           (Keys.SelectorFunctionKeys<I2, 
?>) keys2;
     
    -                           PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> 
po =
    -                                           
translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
    -                                                           function, 
getInput1Type(), getInput2Type(), getResultType(), name,
    -                                                           input1, input2);
    +           private static final class JoinOperatorBaseBuilder<OUT> {
    +
    +                   private final String name;
    +                   private final JoinType joinType;
    +
    +                   private int parallelism;
    +                   private FlatJoinFunction udf;
    +                   private TypeInformation<OUT> resultType;
    +
    +                   private Operator input1;
    +                   private TypeInformation input1Type;
    +                   private Keys<?> keys1;
    +
    +                   private Operator input2;
    +                   private TypeInformation input2Type;
    +                   private Keys<?> keys2;
     
    -                           // set parallelism
    -                           po.setParallelism(this.getParallelism());
    +                   private Partitioner<?> partitioner;
    +                   private JoinHint joinHint;
     
    -                           translated = po;
    +                   public JoinOperatorBaseBuilder(String name, JoinType 
joinType) {
    +                           this.name = name;
    +                           this.joinType = joinType;
                        }
    -                   else if (keys1 instanceof Keys.SelectorFunctionKeys) {
    -                           // The left side of the join needs the tuple 
wrapping/unwrapping
     
    +                   public JoinOperatorBaseBuilder<OUT> with() {
    +                           return this;
    +                   }
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
    -                                           (Keys.SelectorFunctionKeys<I1, 
?>) keys1;
    +                   public <I1, K> JoinOperatorBaseBuilder<OUT> 
withUnwrappingLeftInput(
    --- End diff --
    
    rename to `withWrappedInput1()` for consistency with `withInput1()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to