[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006582#comment-16006582
 ] 

Aljoscha Krettek commented on FLINK-6116:
-----------------------------------------

It seems that the downstream operator has two input channels, elements from 
both the unioned inputs are always received on the one channel while watermarks 
are received on the other channel. If you add some print statements to 
{{StreamInputProcessor}} you get this:
{code}
NEW CHANNEL: 1
IN WM: Watermark @ 1494514601000 channel 1
IN WM: Watermark @ 1494514601000 channel 1
NEW CHANNEL: 0
GOT ELEMENT: Record @ 1494514601175 : hello!
GOT ELEMENT: Record @ 1494514601175 : hello!
NEW CHANNEL: 1
NEW CHANNEL: 0
GOT ELEMENT: Record @ 1494514601978 : hello!
GOT ELEMENT: Record @ 1494514601978 : hello!
NEW CHANNEL: 1
IN WM: Watermark @ 1494514602000 channel 1
IN WM: Watermark @ 1494514602000 channel 1
{code}
note that it doesn't print {{GOT WATERMARK}}, because we never receive 
watermarks on the other channel and thus never forward a watermark to the 
operator.

> Watermarks don't work when unioning with same DataStream
> --------------------------------------------------------
>
>                 Key: FLINK-6116
>                 URL: https://issues.apache.org/jira/browse/FLINK-6116
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>       public static void main(String[] args) throws Exception {
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>               env.getConfig().setAutoWatermarkInterval(1000);
>               env.setParallelism(1);
>               DataStreamSource<String> input = env.addSource(new 
> SourceFunction<String>() {
>                       @Override
>                       public void run(SourceContext<String> ctx) throws 
> Exception {
>                               while (true) {
>                                       ctx.collect("hello!");
>                                       Thread.sleep(800);
>                               }
>                       }
>                       @Override
>                       public void cancel() {
>                       }
>               });
>               input.union(input)
>                               .flatMap(new IdentityFlatMap())
>                               .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>               env.execute();
>       }
>       public static class WatermarkObserver
>                       extends AbstractStreamOperator<String>
>                       implements OneInputStreamOperator<String, String> {
>               @Override
>               public void processElement(StreamRecord<String> element) throws 
> Exception {
>                       System.out.println("GOT ELEMENT: " + element);
>               }
>               @Override
>               public void processWatermark(Watermark mark) throws Exception {
>                       super.processWatermark(mark);
>                       System.out.println("GOT WATERMARK: " + mark);
>               }
>       }
>       private static class IdentityFlatMap
>                       extends RichFlatMapFunction<String, String> {
>               @Override
>               public void flatMap(String value, Collector<String> out) throws 
> Exception {
>                       out.collect(value);
>               }
>       }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to