Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 @tweise @tzulitai I would suggest to solve this the following way, which should be both simple and cover our cases: - We extend the current periodic watermark generators for idleness. We can do that for example by maintaining a record counter and remembering the last counter and a System.nanoTime() timestamp each time the call whether to generate a watermark is called. If no record came for too long, return a special watermark object that indicated "idle". Or change the return type to return either 'none', 'idle', or 'watermark' - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka Consumer does. That part needs to be added to the Kinesis consumer anyways. That way, we automatically get per-shard idleness in Kinesis and per-partition idleness in Kafka without doing anything specific for the source connectors. We can then also remove the idleness logic from the source context - it would be duplicate there.