[
https://issues.apache.org/jira/browse/FLINK-24032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Paul updated FLINK-24032:
--------------------------------
Description:
In the following scenario, the sink will never receive watermarks
{code:java}
env.readFile(...)
.assignTimestampsAndWatermarks(format, file)
.rebalance()
.addSink(...);
{code}
I also noticed that when changing the code to the following the watermarks flow
to the sink. Moreover, if the complete pipeline is chained, remove the
`.rebalance()`, the sink can also receive the watermarks.
{code:java}
env.readFile(...)
.assignTimestampsAndWatermarks(format, file)
.rebalance()
.process(new ProcessFunction() {...})
.addSink(...);
{code}
An example test case is accessible here
[https://github.com/fapaul/flink/blob/9b749ac80cd128a7f288da45db313bafa39d8008/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java#L68]
was:
In the following scenario, the sink will never receive watermarks
{code:java}
env.readFile(...)
.assignTimestampsAndWatermarks(format, file)
.rebalance()
.addSink(...);
{code}
I also noticed that when changing the code to the following the watermarks flow
to the sink
{code:java}
env.readFile(...)
.assignTimestampsAndWatermarks(format, file)
.rebalance()
.process(new ProcessFunction() {...})
.addSink(...);
{code}
An example test case is accessible here
[https://github.com/fapaul/flink/blob/9b749ac80cd128a7f288da45db313bafa39d8008/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java#L68]
> StreamSink does not receive periodic watermarks
> -----------------------------------------------
>
> Key: FLINK-24032
> URL: https://issues.apache.org/jira/browse/FLINK-24032
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.14.0
> Reporter: Fabian Paul
> Priority: Blocker
>
> In the following scenario, the sink will never receive watermarks
>
> {code:java}
> env.readFile(...)
> .assignTimestampsAndWatermarks(format, file)
> .rebalance()
> .addSink(...);
> {code}
>
> I also noticed that when changing the code to the following the watermarks
> flow to the sink. Moreover, if the complete pipeline is chained, remove the
> `.rebalance()`, the sink can also receive the watermarks.
> {code:java}
> env.readFile(...)
> .assignTimestampsAndWatermarks(format, file)
> .rebalance()
> .process(new ProcessFunction() {...})
> .addSink(...);
> {code}
> An example test case is accessible here
> [https://github.com/fapaul/flink/blob/9b749ac80cd128a7f288da45db313bafa39d8008/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java#L68]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)