You are right, debugged it for all elements , I can do that now. Thanks a lot.
Regards, Vinay Patil On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com> wrote: > In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, > Collector<Integer> out)` , when both iter1 and iter2 are not empty, it > means they are matched elements from both stream. > When one of iter1 and iter2 is empty , it means that they are unmatched. > > > - Jark Wu (wuchong) > > > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道: > > > > Hi Matthias , > > > > I did not get you, even if we use Co-Group we have to apply it on a key > > > > sourceStream.coGroup(destStream) > > .where(new ElementSelector()) > > .equalTo(new ElementSelector()) > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > .apply(new CoGroupFunction<Integer, Integer, Integer>() { > > private static final long serialVersionUID = 6408179761497497475L; > > > > @Override > > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer> > > paramIterable1, > > Collector<Integer> paramCollector) throws Exception { > > Iterator<Integer> iterator = paramIterable.iterator(); > > while(iterator.hasNext()) { > > } > > } > > }); > > > > when I debug this ,only the matched element from both stream will come in > > the coGroup function. > > > > What I want is how do I check for unmatched elements from both streams > and > > write it to sink. > > > > Regards, > > Vinay Patil > > > > *+91-800-728-4749* > > > > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org> > wrote: > > > >> You need to do an outer-join. However, there is no build-in support for > >> outer-joins yet. > >> > >> You can use Window-CoGroup to implement the outer-join as an own > operator. > >> > >> > >> -Matthias > >> > >> On 06/13/2016 06:53 PM, Vinay Patil wrote: > >>> Hi, > >>> > >>> I have a question regarding the join operation, consider the following > >>> dummy example: > >>> > >>> StreamExecutionEnvironment env = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > >>> DataStreamSource<Integer> sourceStream = > >>> env.fromElements(10,20,23,25,30,33,102,18); > >>> DataStreamSource<Integer> destStream = > >> env.fromElements(20,30,40,50,60,10); > >>> > >>> sourceStream.join(destStream) > >>> .where(new ElementSelector()) > >>> .equalTo(new ElementSelector()) > >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > >>> .apply(new JoinFunction<Integer, Integer, Integer>() { > >>> > >>> private static final long serialVersionUID = 1L; > >>> > >>> @Override > >>> public Integer join(Integer paramIN1, Integer paramIN2) throws > Exception > >> { > >>> return paramIN1; > >>> } > >>> }).print(); > >>> > >>> I perfectly get the elements that are matching in both the streams, > >> however > >>> my requirement is to write these matched elements and also the > unmatched > >>> elements to sink(S3) > >>> > >>> How do I get the unmatched elements from each stream ? > >>> > >>> Regards, > >>> Vinay Patil > >>> > >> > >> > >