sjmittal commented on issue #28760:
URL: https://github.com/apache/beam/issues/28760#issuecomment-1747979048

   @psolomin I understand your point here as why `KinesisIO` has its 
`getCurrentTimestamp` implemented in a certain way.
   I have create a PR https://github.com/apache/beam/pull/28763 where we wish 
to alter this behavior slightly.
   
   In `KinesisIO` the `getCurrentTimestamp` method fetches event timestamp for 
the record and as a fallback it fetches `getApproximateArrivalTimestamp`.
   
   ```
   public Instant getCurrentTimestamp() throws NoSuchElementException {
       Instant timestamp = 
shardReadersPool.getEventTimestamp(currentRecord.get());
       if (timestamp == null) {
         return currentRecord.get().getApproximateArrivalTimestamp();
       }
       return timestamp;
     }
   ```
   
   A solution like this can solve for both event time and processing time 
semantics of stream processing.
   If we always use arrival time as the current timestamp, then in case where 
we want to aggregate based on event time extracted from Kinesis record's 
payload and these records arriving out of order we don't get correct results. 
One workaround is to set a very high arbitrary value for allowed lateness but 
that also may not work correctly as this allowed lateness itself varies.
   
   Let me know what you think.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to