[ https://issues.apache.org/jira/browse/FLINK-31632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai updated FLINK-31632: ---------------------------------------- Fix Version/s: 1.16.2 1.18.0 1.17.1 > watermark aligned idle source can't resume > ------------------------------------------ > > Key: FLINK-31632 > URL: https://issues.apache.org/jira/browse/FLINK-31632 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.17.0, 1.16.1, 1.15.4 > Reporter: haishui > Assignee: haishui > Priority: Critical > Labels: pull-request-available > Fix For: 1.16.2, 1.18.0, 1.17.1 > > > > {code:java} > WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy > .<String>forBoundedOutOfOrderness(Duration.ofMillis(0)) > .withTimestampAssigner((element, recordTimestamp) -> > Long.parseLong(element)) > .withWatermarkAlignment("group", Duration.ofMillis(10), > Duration.ofSeconds(2)) > .withIdleness(Duration.ofSeconds(10)); > DataStreamSource<String> s1 = env.fromSource(kafkaSource("topic1"), > watermarkStrategy, "S1"); > DataStreamSource<String> s2 = env.fromSource(kafkaSource("topic2"), > watermarkStrategy, "S2");{code} > send "0" to kafka topic1 and topic2 > > After 10s, source1 and source2 is idle,and logs are > > {code:java} > 09:44:30,403 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0 > 09:44:30,404 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0 > 09:44:32,019 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=9 to subTaskIds=[0] > 09:44:32,019 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=9 to subTaskIds=[0] > 09:44:32,417 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0 > 09:44:32,418 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0 > 09:44:34,028 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=9 to subTaskIds=[0] > 09:44:34,028 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=9 to subTaskIds=[0] > 09:44:34,423 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 15:12:55.807) from subTaskId=0 > 09:44:34,424 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 15:12:55.807) from subTaskId=0 > 09:44:36,023 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0] > 09:44:36,023 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - > Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0] > 09:44:36,433 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 15:12:55.807) from subTaskId=0 > 09:44:36,433 DEBUG > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New > reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 > 15:12:55.807) from subTaskId=0 {code} > send message to topic1 or topic2 now, the message can't be consumed。 > > the reason is: > when a source is marked idle, the lastEmittedWatermark = Long.MAX_VALUE and > currentMaxDesiredWatermark = Long.MAX_VALUE + maxAllowedWatermarkDrift in > org.apache.flink.streaming.api.operators.SourceOperator. > currentMaxDesiredWatermark is negative and always less than > lastEmittedWatermark > operatingMode always is WAITING_FOR_ALIGNMENT -- This message was sent by Atlassian Jira (v8.20.10#820010)