[
https://issues.apache.org/jira/browse/FLINK-2576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908117#comment-14908117
]
ASF GitHub Bot commented on FLINK-2576:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1138#discussion_r40439511
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
---
@@ -281,233 +315,225 @@ protected boolean
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) {
}
@Override
- protected JoinOperatorBase<?, ?, OUT, ?>
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+ protected AbstractJoinOperatorBase<?, ?, OUT, ?>
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+ String name = getName() != null ? getName() : "Join at
" + joinLocationName;
+
+ JoinOperatorBaseBuilder<OUT> builder = new
JoinOperatorBaseBuilder<OUT>(name, joinType)
+ .withParallelism(getParallelism())
+ .withPartitioner(getPartitioner())
+ .withJoinHint(getJoinHint())
+ .withResultType(getResultType());
+
+ final boolean requiresTupleUnwrapping = keys1
instanceof Keys.SelectorFunctionKeys || keys2 instanceof
Keys.SelectorFunctionKeys;
+ if (requiresTupleUnwrapping) {
+ if (keys1 instanceof Keys.SelectorFunctionKeys
&& keys2 instanceof Keys.SelectorFunctionKeys) {
+ // Both join sides have a key selector
function, so we need to do the
+ // tuple wrapping/unwrapping on both
sides.
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I1, ?>
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I2, ?>
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+ builder = builder
+ .withUdf(new
TupleUnwrappingJoiner<>(function))
+
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+ } else if (keys2 instanceof
Keys.SelectorFunctionKeys) {
+ // The right side of the join needs the
tuple wrapping/unwrapping
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I2, ?>
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+ builder = builder
+ .withUdf(new
TupleRightUnwrappingJoiner<>(function))
+ .withInput1(input1,
getInput1Type(), keys1)
+
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
+ } else {
+ // The left side of the join needs the
tuple wrapping/unwrapping
- String name = getName() != null ? getName() : "Join at
"+joinLocationName;
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I1, ?>
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
- final JoinOperatorBase<?, ?, OUT, ?> translated;
-
- if (keys1 instanceof Keys.SelectorFunctionKeys && keys2
instanceof Keys.SelectorFunctionKeys) {
- // Both join sides have a key selector
function, so we need to do the
- // tuple wrapping/unwrapping on both sides.
+ builder = builder
+ .withUdf(new
TupleLeftUnwrappingJoiner<>(function))
+
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
+ .withInput2(input2,
getInput2Type(), keys2);
+ }
+ } else if (keys1 instanceof Keys.ExpressionKeys &&
keys2 instanceof Keys.ExpressionKeys) {
+ // Neither side needs the tuple
wrapping/unwrapping
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I1, ?> selectorKeys1
= (Keys.SelectorFunctionKeys<I1, ?>) keys1;
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I2, ?> selectorKeys2
= (Keys.SelectorFunctionKeys<I2, ?>) keys2;
-
- PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?>
po =
-
translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function,
- getInput1Type(),
getInput2Type(), getResultType(), name, input1, input2);
-
- // set parallelism
- po.setParallelism(this.getParallelism());
-
- translated = po;
+ builder = builder
+ .withUdf(function)
+ .withInput1(input1,
getInput1Type(), keys1)
+ .withInput2(input2,
getInput2Type(), keys2);
+ } else {
+ throw new
UnsupportedOperationException("Unrecognized or incompatible key types.");
}
- else if (keys2 instanceof Keys.SelectorFunctionKeys) {
- // The right side of the join needs the tuple
wrapping/unwrapping
- int[] logicalKeyPositions1 =
keys1.computeLogicalKeyPositions();
+ return builder.build();
+ }
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
- (Keys.SelectorFunctionKeys<I2,
?>) keys2;
- PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?>
po =
-
translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
- function,
getInput1Type(), getInput2Type(), getResultType(), name,
- input1, input2);
+ private static final class JoinOperatorBaseBuilder<OUT> {
+
+ private final String name;
+ private final JoinType joinType;
+
+ private int parallelism;
+ private FlatJoinFunction udf;
+ private TypeInformation<OUT> resultType;
+
+ private Operator input1;
+ private TypeInformation input1Type;
+ private Keys<?> keys1;
+
+ private Operator input2;
+ private TypeInformation input2Type;
+ private Keys<?> keys2;
- // set parallelism
- po.setParallelism(this.getParallelism());
+ private Partitioner<?> partitioner;
+ private JoinHint joinHint;
- translated = po;
+ public JoinOperatorBaseBuilder(String name, JoinType
joinType) {
+ this.name = name;
+ this.joinType = joinType;
}
- else if (keys1 instanceof Keys.SelectorFunctionKeys) {
- // The left side of the join needs the tuple
wrapping/unwrapping
+ public JoinOperatorBaseBuilder<OUT> with() {
+ return this;
+ }
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
- (Keys.SelectorFunctionKeys<I1,
?>) keys1;
+ public <I1, K> JoinOperatorBaseBuilder<OUT>
withUnwrappingLeftInput(
+ Operator<I1> input1,
+ Keys.SelectorFunctionKeys<I1, ?>
rawKeys1,
+ TypeInformation<I1> inputType1) {
+ TypeInformation<Tuple2<K, I1>> typeInfoWithKey1
= new TupleTypeInfo<>(rawKeys1.getKeyType(), inputType1);
- int[] logicalKeyPositions2 =
keys2.computeLogicalKeyPositions();
+ MapOperatorBase<I1, Tuple2<K, I1>,
MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+ createKeyMapper(rawKeys1,
inputType1, input1, "Key Extractor 1");
+
+ return this.withInput1(keyMapper1,
typeInfoWithKey1, rawKeys1);
+ }
- PlanLeftUnwrappingJoinOperator<I1, I2, OUT, ?>
po =
-
translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function,
-
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+ public <I2, K> JoinOperatorBaseBuilder<OUT>
withUnwrappingRightInput(
--- End diff --
rename to `withWrappedInput2()` for consistency with `withInput2()`?
> Add outer joins to API and Optimizer
> ------------------------------------
>
> Key: FLINK-2576
> URL: https://issues.apache.org/jira/browse/FLINK-2576
> Project: Flink
> Issue Type: Sub-task
> Components: Java API, Optimizer, Scala API
> Reporter: Ricky Pogalz
> Priority: Minor
> Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and
> to the optimizer of Flink.
> Initially, the execution strategy should be a sort-merge outer join
> (FLINK-2105) but can later be extended to hash joins for left/right outer
> joins.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)