Hi Jark,

I am able to get the non-matching elements in a stream :,

Of-course we can collect the matching elements in the same stream as well,
however I want to perform additional operations on the joined stream before
writing it to S3, so I would have to include a separate join operator for
the same two streams, right ?
Correct me if I am wrong.

I have pasted the dummy code which collects the non-matching records (i
have to perform this on the actual data, correct me if I am dong wrong).

sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new CoGroupFunction<Integer, Integer, Integer>() {

private static final long serialVersionUID = 6408179761497497475L;

@Override
public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
paramIterable1,
Collector<Integer> paramCollector) throws Exception {
long exactSizeIfKnown = paramIterable.spliterator().getExactSizeIfKnown();
long exactSizeIfKnown2 = paramIterable1.spliterator().getExactSizeIfKnown();
if(exactSizeIfKnown == 0 ) {
paramCollector.collect(paramIterable1.iterator().next());
} else if (exactSizeIfKnown2 == 0) {
paramCollector.collect(paramIterable.iterator().next());
}
}
}).print();

Regards,
Vinay Patil


On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <vinay18.pa...@gmail.com>
wrote:

> You are right, debugged it for all elements , I can do that now.
> Thanks a lot.
>
> Regards,
> Vinay Patil
>
> On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <wuchong...@alibaba-inc.com>
> wrote:
>
>> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
>> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it
>> means they are matched elements from both stream.
>> When one of iter1 and iter2 is empty , it means that they are unmatched.
>>
>>
>> - Jark Wu (wuchong)
>>
>> > 在 2016年6月14日,下午12:46,Vinay Patil <vinay18.pa...@gmail.com> 写道:
>> >
>> > Hi Matthias ,
>> >
>> > I did not get you, even if we use Co-Group we have to apply it on a key
>> >
>> > sourceStream.coGroup(destStream)
>> > .where(new ElementSelector())
>> > .equalTo(new ElementSelector())
>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>> > private static final long serialVersionUID = 6408179761497497475L;
>> >
>> > @Override
>> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
>> > paramIterable1,
>> > Collector<Integer> paramCollector) throws Exception {
>> > Iterator<Integer> iterator = paramIterable.iterator();
>> > while(iterator.hasNext()) {
>> > }
>> > }
>> > });
>> >
>> > when I debug this ,only the matched element from both stream will come
>> in
>> > the coGroup function.
>> >
>> > What I want is how do I check for unmatched elements from both streams
>> and
>> > write it to sink.
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > *+91-800-728-4749*
>> >
>> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> You need to do an outer-join. However, there is no build-in support for
>> >> outer-joins yet.
>> >>
>> >> You can use Window-CoGroup to implement the outer-join as an own
>> operator.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>> >>> Hi,
>> >>>
>> >>> I have a question regarding the join operation, consider the following
>> >>> dummy example:
>> >>>
>> >>> StreamExecutionEnvironment env =
>> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> >>> DataStreamSource<Integer> sourceStream =
>> >>> env.fromElements(10,20,23,25,30,33,102,18);
>> >>> DataStreamSource<Integer> destStream =
>> >> env.fromElements(20,30,40,50,60,10);
>> >>>
>> >>> sourceStream.join(destStream)
>> >>> .where(new ElementSelector())
>> >>> .equalTo(new ElementSelector())
>> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>> >>>
>> >>> private static final long serialVersionUID = 1L;
>> >>>
>> >>> @Override
>> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
>> Exception
>> >> {
>> >>> return paramIN1;
>> >>> }
>> >>> }).print();
>> >>>
>> >>> I perfectly get the elements that are matching in both the streams,
>> >> however
>> >>> my requirement is to write these matched elements and also the
>> unmatched
>> >>> elements to sink(S3)
>> >>>
>> >>> How do I get the unmatched elements from each stream ?
>> >>>
>> >>> Regards,
>> >>> Vinay Patil
>> >>>
>> >>
>> >>
>>
>>
>

Reply via email to