You need to do an outer-join. However, there is no build-in support for outer-joins yet.
You can use Window-CoGroup to implement the outer-join as an own operator.
-Matthias
On 06/13/2016 06:53 PM, Vinay Patil wrote:
> Hi,
>
> I have a question regarding the join operation, consider the following
> dummy example:
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> DataStreamSource<Integer> sourceStream =
> env.fromElements(10,20,23,25,30,33,102,18);
> DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10);
>
> sourceStream.join(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> .apply(new JoinFunction<Integer, Integer, Integer>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
> return paramIN1;
> }
> }).print();
>
> I perfectly get the elements that are matching in both the streams, however
> my requirement is to write these matched elements and also the unmatched
> elements to sink(S3)
>
> How do I get the unmatched elements from each stream ?
>
> Regards,
> Vinay Patil
>
signature.asc
Description: OpenPGP digital signature
