psolomin commented on issue #28760:
URL: https://github.com/apache/beam/issues/28760#issuecomment-1747717875

   Hi @sjmittal thank you for filling in this issue! The refs to KafkaIO are 
also quite useful too.
   
   I've checked other implementations of `UnboundedSource.UnboundedReader` and 
their `getCurrentTimestamp` methods, namely:
   
   - `PubsubReader` - uses either [publish time or custom timestamp set by 
publisher 
explicitly](https://github.com/apache/beam/blob/c2666e114971a4727919d9c95287a4ceaf486a92/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java#L273).
   
   - `UnboundedRabbitMqReader` - 
[hard-codes](https://github.com/apache/beam/blob/c2666e114971a4727919d9c95287a4ceaf486a92/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java#L571)
 to delivery timestamp attribute, uses `Instant.now()` only if that attribute 
is not present.
   
   - `SqsUnboundedReader` - [uses publish timestamp 
too](https://github.com/apache/beam/blob/6f4e2852311db381a622dec3ebf26654c9372806/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsMessage.java#L36)
   
   These implementations make me thinking that `KinesisIO` has its 
`getCurrentTimestamp` following the commonly used semantics of other readers.
   
   In the mailing used you wrote:
   
   > We wanted the current timestamp based on some custom time embedded within 
the record and not approximate arrival time and not sure how we can achieve 
that.
   
   KinesisIO outputs only byte[] of a message payload without any decoding. If 
your timestamps sit in the messages' payload, I think, this approach should 
work: 
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
   
   Therefore I am not sure `KinesisIO` code should change.
   
   Any thoughts @aromanenko-dev @mosche ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to