Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/85#discussion_r15570952
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
    @@ -424,28 +448,77 @@ protected DefaultJoin(DataSet<I1> input1, DataSet<I2> 
input2,
                                Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
                {
                        super(input1, input2, keys1, keys2, 
    -                           (JoinFunction<I1, I2, Tuple2<I1, I2>>) new 
DefaultJoinFunction<I1, I2>(),
    +                           (FlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new 
DefaultFlatJoinFunction<I1, I2>(),
                                new TupleTypeInfo<Tuple2<I1, 
I2>>(input1.getType(), input2.getType()), hint);
                }
                
                /**
    -            * Finalizes a Join transformation by applying a {@link 
JoinFunction} to each pair of joined elements.<br/>
    +            * Finalizes a Join transformation by applying a {@link 
org.apache.flink.api.java.functions.FlatJoinFunction} to each pair of joined 
elements.<br/>
                 * Each JoinFunction call returns exactly one element. 
                 * 
                 * @param function The JoinFunction that is called for each 
pair of joined elements.
                 * @return An EquiJoin that represents the joined result DataSet
                 * 
    -            * @see JoinFunction
    +            * @see org.apache.flink.api.java.functions.FlatJoinFunction
                 * @see 
org.apache.flink.api.java.operators.JoinOperator.EquiJoin
                 * @see DataSet
                 */
    -           public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> 
function) {
    +           public <R> EquiJoin<I1, I2, R> with(FlatJoinable<I1, I2, R> 
function) {
                        if (function == null) {
                                throw new NullPointerException("Join function 
must not be null.");
                        }
    +                   if (FunctionUtils.isLambdaFunction(function)) {
    +                           throw new 
UnsupportedLambdaExpressionException();
    +                   }
                        TypeInformation<R> returnType = 
TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
                        return new EquiJoin<I1, I2, R>(getInput1(), 
getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
                }
    +
    +           public <R> EquiJoin<I1, I2, R> with (Joinable<I1, I2, R> 
function) {
    +                   if (function == null) {
    +                           throw new NullPointerException("Join function 
must not be null.");
    +                   }
    +                   if (FunctionUtils.isLambdaFunction(function)) {
    +                           throw new 
UnsupportedLambdaExpressionException();
    +                   }
    +                   FlatJoinable generatedFunction = new 
GeneratedFlatJoinFunction<I1, I2, R> (function);
    +                   TypeInformation<R> returnType = 
TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
    +                   return new EquiJoin<I1, I2, R>(getInput1(), 
getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, 
getJoinHint());
    +           }
    +
    +           private static class GeneratedFlatJoinFunction<IN1, IN2, OUT> 
extends WrappingFunction<Joinable<IN1,IN2,OUT>> implements FlatJoinable<IN1, 
IN2, OUT> {
    --- End diff --
    
    Minor comment: I personally find that GeneratedFunction sounds like a code 
generated piece of code. Since it is a static wrapper function, I would 
actually call it WrappingFlatJoinFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to