Xu Mingmin created BEAM-3699:
--------------------------------

             Summary: RecordTimestamp should be the default Watermark in KafkaIO
                 Key: BEAM-3699
                 URL: https://issues.apache.org/jira/browse/BEAM-3699
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-extensions
            Reporter: Xu Mingmin
            Assignee: Xu Mingmin
             Fix For: 2.4.0


Currently, the priority to get Watermark Instance in KafkaIO is:

{code}
getWatermarkFn().apply(curRecord)
  getTimestampFn().apply(record)
    Instant.now()
{code}

I would propose to change it as below to leverage {{KafkaRecord.timestamp}} if 
no {{WatermarkFn()}} or {{TimestampFn()}} is available:
{code}
getWatermarkFn().apply(curRecord)
  getTimestampFn().apply(record)
    KafkaRecord(Beam.KafkaIO).timestamp
{code}

 It equals to
{code}
getWatermarkFn().apply(curRecord)
  getTimestampFn().apply(record)
    KafkaRawRecord(Kafka_client).timestamp
      Instant.now()
{code}

[~rangadi] any comments?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to