egalpin commented on code in PR #22183:
URL: https://github.com/apache/beam/pull/22183#discussion_r917888945
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2366,6 +2361,11 @@ protected BulkIOBaseFn(BulkIO bulkSpec) {
this.spec = bulkSpec;
}
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
Review Comment:
Yes the way in which this error arises is that multiple elements from the
same bundle and window are buffered, and then later output. But the timestamp
of each element differs, even though they're in the same window and bundle.
The timestamp check only considers the output timestamp provided as arg against
the timestamp of the _current_ element so in a case like this of buffering
values which are all in the same window, the timestamp of the element at the
point in time where the buffer has reached the desired size may result in
checkTimestamp failures if any buffered elements from the same window had an
earlier timestamp than the current element.
Agreed, it would be great to be able to relax the strictness of
`checkTimestamp` if that can be done in a way that preserves the original
intent of the method (like Jan said, ensuring "not to output elements that
change from on_time to late (or droppable).")
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]