[ 
https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438443#comment-15438443
 ] 

Daniel Halperin commented on BEAM-591:
--------------------------------------

May be worth checking out PubSubIO. It has two modes -- processing-time 
timestamps, and message-time timestamps. But it keeps watermark tracking to 
itself.

> Better handling watermark in KafkaIO
> ------------------------------------
>
>                 Key: BEAM-591
>                 URL: https://issues.apache.org/jira/browse/BEAM-591
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Raghu Angadi
>            Assignee: Raghu Angadi
>
> Right now default watermark in KafkaIO is same as timestamp of the record. 
> The main problem with this is that watermark does not change if there n't any 
> new records on the topic. This can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the 
> runner reads a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. 
> There are a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record 
> timestamp? 
>    *** Should the watermark stay same as record timestamp?
>    *** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner 
> (we could still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record 
> timestamp in addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to