Github user StephanEwen commented on the issue:
    @tweise @tzulitai 
    I would suggest to solve this the following way, which should be both 
simple and cover our cases:
      - We extend the current periodic watermark generators for idleness. We 
can do that for example by maintaining a record counter and remembering the 
last counter and a System.nanoTime() timestamp each time the call whether to 
generate a watermark is called. If no record came for too long, return a 
special watermark object that indicated "idle". Or change the return type to 
return either 'none', 'idle', or 'watermark'
      - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka 
Consumer does. That part needs to be added to the Kinesis consumer anyways.
    That way, we automatically get per-shard idleness in Kinesis and 
per-partition idleness in Kafka without doing anything specific for the source 
    We can then also remove the idleness logic from the source context - it 
would be duplicate there.


