egalpin commented on code in PR #22183:
URL: https://github.com/apache/beam/pull/22183#discussion_r917888945


##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2366,6 +2361,11 @@ protected BulkIOBaseFn(BulkIO bulkSpec) {
         this.spec = bulkSpec;
       }
 
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return Duration.millis(Long.MAX_VALUE);

Review Comment:
   Yes the way in which this error arises is that multiple elements from the 
same bundle and window are buffered, and then later output.  But the timestamp 
of each element differs, even though they're in the same window and bundle.  
The timestamp check only considers the output timestamp provided as arg against 
the timestamp of the _current_ element so in a case like this of buffering 
values which are all in the same window, the timestamp of the element at the 
point in time where the buffer has reached the desired size may result in 
checkTimestamp failures if any buffered elements from the same window had an 
earlier timestamp than the current element.
   
   Agreed, it would be great to be able to relax the strictness of 
`checkTimestamp` if that can be done in a way that preserves the original 
intent of the method (like Jan said, ensuring "not to output elements that 
change from on_time to late (or droppable).")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to