Lorenzo Cagnatel created KAFKA-13678:
----------------------------------------
Summary: 2nd punctuation using STREAM_TIME does not respect
scheduled interval
Key: KAFKA-13678
URL: https://issues.apache.org/jira/browse/KAFKA-13678
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.0.0
Reporter: Lorenzo Cagnatel
Scheduling a punctuator using stream time, the first punctuation occurs
immediately as documented, but the second one is not triggered at *t_schedule +
interval* but it could happen before that time.
For example, assume that we schedule a punctuation every 10 sec at timestamp 5
(t5). The system now works like this:
{noformat}
t5 -> schedule, punctuate, next schedule at t10
t6 -> no punctuation
t7 -> no punctuation
t8 -> no punctuation
t9 -> no punctuation
t10 -> punctuate, next schedule at t20
...{noformat}
In this example the 2nd schedule occurs after 5 seconds from the first one,
breaking the interval duration.
>From my point of view, a reasonable behaviour could be:
{noformat}
t5 -> schedule, punctuate, next schedule at t15
t6 -> no punctuation
t7 -> no punctuation
t8 -> no punctuation
t9 -> no punctuation
t10 -> no punctuation
t11 -> no punctuation
t12 -> no punctuation
t13 -> no punctuation
t14 -> no punctuation
t15 -> punctuate, next schedule at t25
...{noformat}
The origin of this problem can be found in {*}StreamTask.schedule{*}:
{code:java}
/**
* Schedules a punctuation for the processor
*
* @param interval the interval in milliseconds
* @param type the punctuation type
* @throws IllegalStateException if the current node is not null
*/
public Cancellable schedule(final long interval, final PunctuationType type,
final Punctuator punctuator) {
switch (type) {
case STREAM_TIME:
// align punctuation to 0L, punctuate as soon as we have data
return schedule(0L, interval, type, punctuator);
case WALL_CLOCK_TIME:
// align punctuation to now, punctuate after interval has elapsed
return schedule(time.milliseconds() + interval, interval, type,
punctuator);
default:
throw new IllegalArgumentException("Unrecognized PunctuationType: " +
type);
}
}{code}
when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)