Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/85#discussion_r15570952
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
---
@@ -424,28 +448,77 @@ protected DefaultJoin(DataSet<I1> input1, DataSet<I2>
input2,
Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
{
super(input1, input2, keys1, keys2,
- (JoinFunction<I1, I2, Tuple2<I1, I2>>) new
DefaultJoinFunction<I1, I2>(),
+ (FlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new
DefaultFlatJoinFunction<I1, I2>(),
new TupleTypeInfo<Tuple2<I1,
I2>>(input1.getType(), input2.getType()), hint);
}
/**
- * Finalizes a Join transformation by applying a {@link
JoinFunction} to each pair of joined elements.<br/>
+ * Finalizes a Join transformation by applying a {@link
org.apache.flink.api.java.functions.FlatJoinFunction} to each pair of joined
elements.<br/>
* Each JoinFunction call returns exactly one element.
*
* @param function The JoinFunction that is called for each
pair of joined elements.
* @return An EquiJoin that represents the joined result DataSet
*
- * @see JoinFunction
+ * @see org.apache.flink.api.java.functions.FlatJoinFunction
* @see
org.apache.flink.api.java.operators.JoinOperator.EquiJoin
* @see DataSet
*/
- public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R>
function) {
+ public <R> EquiJoin<I1, I2, R> with(FlatJoinable<I1, I2, R>
function) {
if (function == null) {
throw new NullPointerException("Join function
must not be null.");
}
+ if (FunctionUtils.isLambdaFunction(function)) {
+ throw new
UnsupportedLambdaExpressionException();
+ }
TypeInformation<R> returnType =
TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
return new EquiJoin<I1, I2, R>(getInput1(),
getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
}
+
+ public <R> EquiJoin<I1, I2, R> with (Joinable<I1, I2, R>
function) {
+ if (function == null) {
+ throw new NullPointerException("Join function
must not be null.");
+ }
+ if (FunctionUtils.isLambdaFunction(function)) {
+ throw new
UnsupportedLambdaExpressionException();
+ }
+ FlatJoinable generatedFunction = new
GeneratedFlatJoinFunction<I1, I2, R> (function);
+ TypeInformation<R> returnType =
TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
+ return new EquiJoin<I1, I2, R>(getInput1(),
getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType,
getJoinHint());
+ }
+
+ private static class GeneratedFlatJoinFunction<IN1, IN2, OUT>
extends WrappingFunction<Joinable<IN1,IN2,OUT>> implements FlatJoinable<IN1,
IN2, OUT> {
--- End diff --
Minor comment: I personally find that GeneratedFunction sounds like a code
generated piece of code. Since it is a static wrapper function, I would
actually call it WrappingFlatJoinFunction.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---