You are right, debugged it for all elements , I can do that now.
Thanks a lot.

Regards,
Vinay Patil

On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com>
wrote:

> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it
> means they are matched elements from both stream.
> When one of iter1 and iter2 is empty , it means that they are unmatched.
>
>
> - Jark Wu (wuchong)
>
> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道:
> >
> > Hi Matthias ,
> >
> > I did not get you, even if we use Co-Group we have to apply it on a key
> >
> > sourceStream.coGroup(destStream)
> > .where(new ElementSelector())
> > .equalTo(new ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > private static final long serialVersionUID = 6408179761497497475L;
> >
> > @Override
> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> > paramIterable1,
> > Collector<Integer> paramCollector) throws Exception {
> > Iterator<Integer> iterator = paramIterable.iterator();
> > while(iterator.hasNext()) {
> > }
> > }
> > });
> >
> > when I debug this ,only the matched element from both stream will come in
> > the coGroup function.
> >
> > What I want is how do I check for unmatched elements from both streams
> and
> > write it to sink.
> >
> > Regards,
> > Vinay Patil
> >
> > *+91-800-728-4749*
> >
> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> You need to do an outer-join. However, there is no build-in support for
> >> outer-joins yet.
> >>
> >> You can use Window-CoGroup to implement the outer-join as an own
> operator.
> >>
> >>
> >> -Matthias
> >>
> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> >>> Hi,
> >>>
> >>> I have a question regarding the join operation, consider the following
> >>> dummy example:
> >>>
> >>> StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >>> DataStreamSource<Integer> sourceStream =
> >>> env.fromElements(10,20,23,25,30,33,102,18);
> >>> DataStreamSource<Integer> destStream =
> >> env.fromElements(20,30,40,50,60,10);
> >>>
> >>> sourceStream.join(destStream)
> >>> .where(new ElementSelector())
> >>> .equalTo(new ElementSelector())
> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> >>>
> >>> private static final long serialVersionUID = 1L;
> >>>
> >>> @Override
> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> Exception
> >> {
> >>> return paramIN1;
> >>> }
> >>> }).print();
> >>>
> >>> I perfectly get the elements that are matching in both the streams,
> >> however
> >>> my requirement is to write these matched elements and also the
> unmatched
> >>> elements to sink(S3)
> >>>
> >>> How do I get the unmatched elements from each stream ?
> >>>
> >>> Regards,
> >>> Vinay Patil
> >>>
> >>
> >>
>
>

Reply via email to