[ 
https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458307#comment-16458307
 ] 

Raghu Angadi commented on BEAM-591:
-----------------------------------

See following methods added to `KafkaIO.Read` in 3 pull requests attached this 
jira:
 * {{withLogAppendTime()}}
 * {{withCreateTime()}}
 * {{withProcessingTime()}}
 * {{withTimestampPolicyFactory()}}


> Better handling of watermark in KafkaIO
> ---------------------------------------
>
>                 Key: BEAM-591
>                 URL: https://issues.apache.org/jira/browse/BEAM-591
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Raghu Angadi
>            Assignee: Raghu Angadi
>            Priority: Major
>             Fix For: 2.4.0
>
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Right now default watermark in KafkaIO is same as timestamp of the record. 
> The main problem with this is that watermark does not change if there n't any 
> new records on the topic. This can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the 
> runner reads a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. 
> There are a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record 
> timestamp? 
>    *** Should the watermark stay same as record timestamp?
>    *** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner 
> (we could still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record 
> timestamp in addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to