bounkong khamphousone created KAFKA-13739:
---------------------------------------------
Summary: Sliding window without grace not working
Key: KAFKA-13739
URL: https://issues.apache.org/jira/browse/KAFKA-13739
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.1.0
Reporter: bounkong khamphousone
Hi everyone! I would like to understand why KafkaStreams DSL offer the ability
to express a SlidingWindow with no grace period but seems that it doesn't work.
[confluent's
site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
state that grace period is required and with the deprecated method, it's
default to 24 hours.
Doing a basic sliding window with a count, if I set grace period to 1 ms,
expected output is done. Based on the sliding window documentation, lower and
upper bounds are inclusive.
If I set grace period to 0 ms, I can see that record is not skipped at
KStreamSlidingWindowAggregate(l.126) but when we try to create the window and
push the event in KStreamSlidingWindowAggregate#createWindows we call the
method updateWindowAndForward(l.417). This method (l.468) check that
{{{}windowEnd > closeTime{}}}.
closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} (Sliding
window configuration)
windowEnd is defined as {{{}inputRecordTimestamp{}}}.
For a first event with a record timestamp, we can assume that
observedStreamTime is equal to inputRecordTimestamp.
Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which
results to {{{}inputRecordTimestamp{}}}.
If we go back to the check done in {{updateWindowAndForward}} method, then we
have inputRecordTimestamp > inputRecordTimestamp which is always false. The
record is then skipped for record's own window.
Stating that lower and upper bounds are inclusive, I would have expected the
event to be pushed in the store and forwarded. Hence, the check would be
{{{}windowEnd >= closeTime{}}}.
Is it a bug or is it intended ?
Thanks in advance for your explanations!
Best regards!
--
This message was sent by Atlassian Jira
(v8.20.1#820001)