[ https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327284#comment-16327284 ]
ASF GitHub Bot commented on FLINK-6116: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4649#discussion_r161797751 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java --- @@ -60,17 +60,23 @@ */ private StreamPartitioner<?> outputPartitioner; + /** + * The unique id for differentiating edges between the same source and target. + */ + private final int edgeSubId; + public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, - List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) { + List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag, int edgeSubId) { this.sourceVertex = sourceVertex; this.targetVertex = targetVertex; this.typeNumber = typeNumber; this.selectedNames = selectedNames; this.outputPartitioner = outputPartitioner; this.outputTag = outputTag; + this.edgeSubId = edgeSubId; --- End diff -- rename `edgeSubId` to `uniqueId`? Since it's not a "sub" id of the `edgeId`, but rather a component of it. > Watermarks don't work when unioning with same DataStream > -------------------------------------------------------- > > Key: FLINK-6116 > URL: https://issues.apache.org/jira/browse/FLINK-6116 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.3.0 > Reporter: Aljoscha Krettek > Priority: Critical > > In this example job we don't get any watermarks in the {{WatermarkObserver}}: > {code} > public class WatermarkTest { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.getConfig().setAutoWatermarkInterval(1000); > env.setParallelism(1); > DataStreamSource<String> input = env.addSource(new > SourceFunction<String>() { > @Override > public void run(SourceContext<String> ctx) throws > Exception { > while (true) { > ctx.collect("hello!"); > Thread.sleep(800); > } > } > @Override > public void cancel() { > } > }); > input.union(input) > .flatMap(new IdentityFlatMap()) > .transform("WatermarkOp", > BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver()); > env.execute(); > } > public static class WatermarkObserver > extends AbstractStreamOperator<String> > implements OneInputStreamOperator<String, String> { > @Override > public void processElement(StreamRecord<String> element) throws > Exception { > System.out.println("GOT ELEMENT: " + element); > } > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("GOT WATERMARK: " + mark); > } > } > private static class IdentityFlatMap > extends RichFlatMapFunction<String, String> { > @Override > public void flatMap(String value, Collector<String> out) throws > Exception { > out.collect(value); > } > } > } > {code} > When commenting out the `union` it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)