Your example use case is legitimate, and it is indeed something that we 
sacrifice here. 
Allow me to paraphrase:
```
KTable table = 
....windowedBy(TimeWindows.of(10).withGrace(100)).count(Materialized.as("count"));
table.suppress(untilWindowCloses(overrideGrace(10))).to("output");
```
This was a point of discussion early on in the KIP. The downside of this API is 
that querying "count" and observing "output", we will see divergent results, 
since "count" will permit some records that the suppression drops (say, any 
record that arrives more than 10ms later than its window).

We felt that if the operator's job is to emit only "final result" of the 
window's aggregation, then that's exactly what it should do. Redefining the 
window parameters is out of scope.

However, to your second comment, I didn't follow. We don't _just_ offer 
`untilWindowCloses`. You could alternatively do:

```
KTable table = 
....windowedBy(TimeWindows.of(10).withGrace(100)).count(Materialized.as("count"));
table.suppress(untilTimeElapses(10)).to("output");
```

This won't give you "final results", since it will still emit updates if they 
are needed.

Final thought: Like I said, I don't think it's unreasonable what you proposed, 
but I think it's better to start with something simple and safe. If people are 
asking for this API later on, we can always add it.

[ Full content available at: https://github.com/apache/kafka/pull/5567 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to