Hi,
I have a question regarding the join operation, consider the following
dummy example:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource<Integer> sourceStream =
env.fromElements(10,20,23,25,30,33,102,18);
DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10);
sourceStream.join(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.apply(new JoinFunction<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
return paramIN1;
}
}).print();
I perfectly get the elements that are matching in both the streams, however
my requirement is to write these matched elements and also the unmatched
elements to sink(S3)
How do I get the unmatched elements from each stream ?
Regards,
Vinay Patil