Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439650 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) { } @Override - protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + protected AbstractJoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder<OUT> builder = new JoinOperatorBaseBuilder<OUT>(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; - final JoinOperatorBase<?, ?, OUT, ?> translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; - - PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, - getInput1Type(), getInput2Type(), getResultType(), name, input1, input2); - - // set parallelism - po.setParallelism(this.getParallelism()); - - translated = po; + builder = builder + .withUdf(function) + .withInput1(input1, getInput1Type(), keys1) + .withInput2(input2, getInput2Type(), keys2); + } else { + throw new UnsupportedOperationException("Unrecognized or incompatible key types."); } - else if (keys2 instanceof Keys.SelectorFunctionKeys) { - // The right side of the join needs the tuple wrapping/unwrapping - int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions(); + return builder.build(); + } - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = - (Keys.SelectorFunctionKeys<I2, ?>) keys2; - PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2, - function, getInput1Type(), getInput2Type(), getResultType(), name, - input1, input2); + private static final class JoinOperatorBaseBuilder<OUT> { + + private final String name; + private final JoinType joinType; + + private int parallelism; + private FlatJoinFunction udf; --- End diff -- There are a few compiler warning here complaining about "raw use of parameterized classes".
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---