Sophie Blee-Goldman created KAFKA-9368:
------------------------------------------
Summary: Preserve stream-time across rebalances/restarts
Key: KAFKA-9368
URL: https://issues.apache.org/jira/browse/KAFKA-9368
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Sophie Blee-Goldman
Stream-time is used to make decisions about processing out-of-order records or
drop them if they are late (ie, timestamp < stream-time - grace-period). This
is currently tracked on a per-processor basis such that each node has its own
local view of stream-time based on the maximum timestamp it has processed.
During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, -1)
for all processors in tasks that are newly created (or migrated). In net
effect, we forget current stream-time for this case what may lead to
non-deterministic behavior if we stop processing right before a late record,
that would be dropped if we continue processing, but is not dropped after
rebalance/restart. Let's look at an examples with a grace period of 5ms for a
tumbling windowed of 5ms, and the following records (timestamps in parenthesis):
{code:java}
r1(0) r2(5) r3(11) r4(2){code}
In the example, stream-time advances as 0, 5, 11, 11 and thus record `r4` is
dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or
rebalance after processing `r3` but before processing `r4`, we would
reinitialize stream-time as -1, and thus would process `r4` on restart/after
rebalance. The problem is, that stream-time does advance differently from a
global point of view: 0, 5, 11, 2.
Of course, this is a corner case because if we would stop processing one record
earlier -- ie, after processing `r2` but before processing `r3` -- stream-time
would be advanced correctly from a global point of view.
Note that in previous versions the maximum partition-time was actually used for
stream-time. This changed in 2.3 due to KAFKA-7895/[PR
6278|https://github.com/apache/kafka/pull/6278], and could potentially change
yet again in future versions (c.f. KAFKA-8769). Partition-time actually is
preserved as of 2.4 thanks to KAFKA-7994.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)