Hey Wilson, the MapFunction should act as a wrapper for the join function. create a class extending RichMapFunction, and pass the joinfunction via the constructor. then you delegate open/close calls to it, with the map function looking something like this:
map(Tuple2<...> tuple) { return joinFunction.join(tuple.f0, tuple.f1); } Regards, Chesnay On 21.12.2014 14:11, Zihong Cao wrote: > Hi Fabian, > > It is very helpful of your response! But in order to make sure I understand > correctly, I put my pseudo-code here first: > > class OuterJoinCoGroupFunction implements CoGroupFunction<Tuple2<String, > Integer>, Tuple2<String, Double>, Double>{ > > @Override > public void coGroup(Iterable<Tuple2<String, Integer> > iVals, > Iterable<Tuple2<String, Double> > dVals, Collector<Tuple2<Integer, Double> > out){ > Set<Integer> ints = new HashSet<Integer>(); > > for (Tuple2<String, Integer > > val : iVals){ > ints.add(val.f1); > } > > if(ints.isEmpty()){ > ints.add(NULL); > } > > for (Tuple2<String, Double> val : dVals){ > for (Integer i : ints){ > out.collect(new Tuple2(i, val.f1)); > } > } > } > } > The code above try to builds the matching pairs and if one of the group is > empty, I append the NULL value to it. However, I don’t really understand how > to implement the OuterJoinMapFunction. > > I am also puzzled about how the Reduce/GroupReduce translated into Map -> > Reduce. Where can I found the materials or the source code about this? > > Best, > Wilson. > >> 在 2014年12月15日,下午5:17,Fabian Hueske <fhue...@apache.org> 写道: >> >> That's a good point. >> >> You can implement an outer join using the available runtime. This way you >> do not need to touch the optimizer and runtime but only the API layer. >> This basically means to add syntactic sugar to the available API. The API >> will translate the outer join into a CoGroup which builds all pairs of >> joining elements and a Map which applies the join function to each joined >> pair. >> >> It could look like this: >> >> DataSet<TypeX> in1; >> DataSet<TypeY> in2; >> in1.outerJoin(in2).where(...).equalTo(...).with(new MyJoinFunction) >> >> which would be translated into >> >> in1.coGroup(in2).where(...).equalTo(...).with(new >> OuterJoinCoGroupFunction).map(new OuterJoinMapFunction(MyJoinFunction)); >> >> OJCoGroupFunction and OJMapFunction are functions that you need to >> implement. >> OJCoGroupFunction does what Stephan said (it builds pairs of matching >> elements) and returns a Tuple2<TypeX, TypeY>. >> OJMapFunction unpacks the Tuple2<TypeX, TypeY> and calls the user's Join >> function (MyJoinFunction). >> >> There are a few operators implemented this way. For example have a look at >> the Reduce/GroupReduce with KeySelectors which are translated into Map -> >> Reduce (or Map -> GroupReduce). >> >> Let us know, if you have any questions! >> >> Cheers, Fabian >> >> >> 2014-12-13 16:25 GMT+01:00 Stephan Ewen <se...@apache.org>: >>> Hi Wilson! >>> >>> You can start by mocking an outer join operator using a special CoGroup >>> function. If one of the two sides for a group is empty, you have the case >>> where you need to append null values. Otherwise, you build the Cartesian >>> produce within the group. >>> >>> For a proper through-the-stack implementation (not sure if that is needed, >>> but may be nice to have), have a look here: >>> >>> >>> http://flink.incubator.apache.org/docs/0.7-incubating/internal_add_operator.html >>> >>> Greetings, >>> Stephan >>> >>> >>> On Sat, Dec 13, 2014 at 3:19 AM, Zihong Cao <wilsonca...@gmail.com> wrote: >>>> Hi, >>>> >>>> I am trying to pick up the outer join operator. However, as Fabian >>>> mentioned to me, that this task would require to touch many different >>>> components of the system, it would be a challenge job for me. Therefore I >>>> would need some help:-) >>>> >>>> I might need to walk through some features like Compiler/Optimizer and >>>> Runtime(as Fabian mentioned to me), so where should I start to get >>> familiar? >>>> One more thing, is the outer join operator implementation similar to the >>>> pure join operator? >>>> >>>> Best, >>>> Wilson Cao >