Hey Wilson,

the MapFunction should act as a wrapper for the join function. create a
class extending RichMapFunction, and pass the joinfunction via the
constructor. then you delegate open/close calls to it, with the map
function looking something like this:

map(Tuple2<...> tuple) {
return joinFunction.join(tuple.f0, tuple.f1);
}

Regards,
Chesnay

On 21.12.2014 14:11, Zihong Cao wrote:
> Hi Fabian,
>
> It is very helpful of your response! But in order to make sure I understand 
> correctly, I put my pseudo-code here first:
>
> class OuterJoinCoGroupFunction implements CoGroupFunction<Tuple2<String, 
> Integer>, Tuple2<String, Double>, Double>{
>
>     @Override
>     public void coGroup(Iterable<Tuple2<String, Integer> > iVals, 
> Iterable<Tuple2<String, Double> > dVals, Collector<Tuple2<Integer, Double> 
> out){
>         Set<Integer> ints = new HashSet<Integer>();
>
>         for (Tuple2<String, Integer > > val : iVals){
>             ints.add(val.f1);
>         }
>
>         if(ints.isEmpty()){
>             ints.add(NULL);
>         }
>
>         for (Tuple2<String, Double> val : dVals){
>             for (Integer i : ints){
>                 out.collect(new Tuple2(i, val.f1));
>             }
>         }
>       }
>   }
> The code above try to builds the matching pairs and if one of the group is 
> empty, I append the NULL value to it. However, I don’t really understand how 
> to implement the OuterJoinMapFunction.  
>
> I am also puzzled about how the Reduce/GroupReduce translated into Map -> 
> Reduce. Where can I found the materials or the source code about this?
>
> Best, 
> Wilson. 
>
>> 在 2014年12月15日,下午5:17,Fabian Hueske <fhue...@apache.org> 写道:
>>
>> That's a good point.
>>
>> You can implement an outer join using the available runtime. This way you
>> do not need to touch the optimizer and runtime but only the API layer.
>> This basically means to add syntactic sugar to the available API. The API
>> will translate the outer join into a CoGroup which builds all pairs of
>> joining elements and a Map which applies the join function to each joined
>> pair.
>>
>> It could look like this:
>>
>> DataSet<TypeX> in1;
>> DataSet<TypeY> in2;
>> in1.outerJoin(in2).where(...).equalTo(...).with(new MyJoinFunction)
>>
>> which would be translated into
>>
>> in1.coGroup(in2).where(...).equalTo(...).with(new
>> OuterJoinCoGroupFunction).map(new OuterJoinMapFunction(MyJoinFunction));
>>
>> OJCoGroupFunction and OJMapFunction are functions that you need to
>> implement.
>> OJCoGroupFunction does what Stephan said (it builds pairs of matching
>> elements) and returns a Tuple2<TypeX, TypeY>.
>> OJMapFunction unpacks the Tuple2<TypeX, TypeY> and calls the user's Join
>> function (MyJoinFunction).
>>
>> There are a few operators implemented this way. For example have a look at
>> the Reduce/GroupReduce with KeySelectors which are translated into Map ->
>> Reduce  (or Map -> GroupReduce).
>>
>> Let us know, if you have any questions!
>>
>> Cheers, Fabian
>>
>>
>> 2014-12-13 16:25 GMT+01:00 Stephan Ewen <se...@apache.org>:
>>> Hi Wilson!
>>>
>>> You can start by mocking an outer join operator using a special CoGroup
>>> function. If one of the two sides for a group is empty, you have the case
>>> where you need to append null values. Otherwise, you build the Cartesian
>>> produce within the group.
>>>
>>> For a proper through-the-stack implementation (not sure if that is needed,
>>> but may be nice to have), have a look here:
>>>
>>>
>>> http://flink.incubator.apache.org/docs/0.7-incubating/internal_add_operator.html
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Sat, Dec 13, 2014 at 3:19 AM, Zihong Cao <wilsonca...@gmail.com> wrote:
>>>> Hi,
>>>>
>>>> I am trying to pick up the outer join operator. However, as Fabian
>>>> mentioned to me, that this task would require to touch many different
>>>> components of the system, it would be a challenge job for me. Therefore I
>>>> would need some help:-)
>>>>
>>>> I might need to walk through some features like Compiler/Optimizer and
>>>> Runtime(as Fabian mentioned to me), so where should I start to get
>>> familiar?
>>>> One more thing, is the outer join operator implementation similar to the
>>>> pure join operator?
>>>>
>>>> Best,
>>>> Wilson Cao
>

Reply via email to