Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1138#discussion_r40439511
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
---
@@ -281,233 +315,225 @@ protected boolean
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) {
}
@Override
- protected JoinOperatorBase<?, ?, OUT, ?>
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+ protected AbstractJoinOperatorBase<?, ?, OUT, ?>
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+ String name = getName() != null ? getName() : "Join at
" + joinLocationName;
+
+ JoinOperatorBaseBuilder<OUT> builder = new
JoinOperatorBaseBuilder<OUT>(name, joinType)
+ .withParallelism(getParallelism())
+ .withPartitioner(getPartitioner())
+ .withJoinHint(getJoinHint())
+ .withResultType(getResultType());
+
+ final boolean requiresTupleUnwrapping = keys1
instanceof Keys.SelectorFunctionKeys || keys2 instanceof
Keys.SelectorFunctionKeys;
+ if (requiresTupleUnwrapping) {
+ if (keys1 instanceof Keys.SelectorFunctionKeys
&& keys2 instanceof Keys.SelectorFunctionKeys) {
+ // Both join sides have a key selector
function, so we need to do the
+ // tuple wrapping/unwrapping on both
sides.
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I1, ?>
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I2, ?>
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+ builder = builder
+ .withUdf(new
TupleUnwrappingJoiner<>(function))
+
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+ } else if (keys2 instanceof
Keys.SelectorFunctionKeys) {
+ // The right side of the join needs the
tuple wrapping/unwrapping
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I2, ?>
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+ builder = builder
+ .withUdf(new
TupleRightUnwrappingJoiner<>(function))
+ .withInput1(input1,
getInput1Type(), keys1)
+
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+ } else {
+ // The left side of the join needs the
tuple wrapping/unwrapping
- String name = getName() != null ? getName() : "Join at
"+joinLocationName;
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I1, ?>
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
- final JoinOperatorBase<?, ?, OUT, ?> translated;
-
- if (keys1 instanceof Keys.SelectorFunctionKeys && keys2
instanceof Keys.SelectorFunctionKeys) {
- // Both join sides have a key selector
function, so we need to do the
- // tuple wrapping/unwrapping on both sides.
+ builder = builder
+ .withUdf(new
TupleLeftUnwrappingJoiner<>(function))
+
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+ .withInput2(input2,
getInput2Type(), keys2);
+ }
+ } else if (keys1 instanceof Keys.ExpressionKeys &&
keys2 instanceof Keys.ExpressionKeys) {
+ // Neither side needs the tuple
wrapping/unwrapping
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I1, ?> selectorKeys1
= (Keys.SelectorFunctionKeys<I1, ?>) keys1;
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I2, ?> selectorKeys2
= (Keys.SelectorFunctionKeys<I2, ?>) keys2;
-
- PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?>
po =
-
translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function,
- getInput1Type(),
getInput2Type(), getResultType(), name, input1, input2);
-
- // set parallelism
- po.setParallelism(this.getParallelism());
-
- translated = po;
+ builder = builder
+ .withUdf(function)
+ .withInput1(input1,
getInput1Type(), keys1)
+ .withInput2(input2,
getInput2Type(), keys2);
+ } else {
+ throw new
UnsupportedOperationException("Unrecognized or incompatible key types.");
}
- else if (keys2 instanceof Keys.SelectorFunctionKeys) {
- // The right side of the join needs the tuple
wrapping/unwrapping
- int[] logicalKeyPositions1 =
keys1.computeLogicalKeyPositions();
+ return builder.build();
+ }
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
- (Keys.SelectorFunctionKeys<I2,
?>) keys2;
- PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?>
po =
-
translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
- function,
getInput1Type(), getInput2Type(), getResultType(), name,
- input1, input2);
+ private static final class JoinOperatorBaseBuilder<OUT> {
+
+ private final String name;
+ private final JoinType joinType;
+
+ private int parallelism;
+ private FlatJoinFunction udf;
+ private TypeInformation<OUT> resultType;
+
+ private Operator input1;
+ private TypeInformation input1Type;
+ private Keys<?> keys1;
+
+ private Operator input2;
+ private TypeInformation input2Type;
+ private Keys<?> keys2;
- // set parallelism
- po.setParallelism(this.getParallelism());
+ private Partitioner<?> partitioner;
+ private JoinHint joinHint;
- translated = po;
+ public JoinOperatorBaseBuilder(String name, JoinType
joinType) {
+ this.name = name;
+ this.joinType = joinType;
}
- else if (keys1 instanceof Keys.SelectorFunctionKeys) {
- // The left side of the join needs the tuple
wrapping/unwrapping
+ public JoinOperatorBaseBuilder<OUT> with() {
+ return this;
+ }
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
- (Keys.SelectorFunctionKeys<I1,
?>) keys1;
+ public <I1, K> JoinOperatorBaseBuilder<OUT>
withUnwrappingLeftInput(
+ Operator<I1> input1,
+ Keys.SelectorFunctionKeys<I1, ?>
rawKeys1,
+ TypeInformation<I1> inputType1) {
+ TypeInformation<Tuple2<K, I1>> typeInfoWithKey1
= new TupleTypeInfo<>(rawKeys1.getKeyType(), inputType1);
- int[] logicalKeyPositions2 =
keys2.computeLogicalKeyPositions();
+ MapOperatorBase<I1, Tuple2<K, I1>,
MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+ createKeyMapper(rawKeys1,
inputType1, input1, "Key Extractor 1");
+
+ return this.withInput1(keyMapper1,
typeInfoWithKey1, rawKeys1);
+ }
- PlanLeftUnwrappingJoinOperator<I1, I2, OUT, ?>
po =
-
translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function,
-
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+ public <I2, K> JoinOperatorBaseBuilder<OUT>
withUnwrappingRightInput(
--- End diff --
rename to `withWrappedInput2()` for consistency with `withInput2()`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---