If confusion is the problem, then totally agree no point adding more knobs. Perhaps you're right that users don't /really/ want processing-time semantics. Just /think/ they want them until they start considering replay/catch-up scenarios. I guess people rarely think about those from the start (I sure didn't).

Cheers,

Michał


On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually /want/ processing time semantics? There are definitely times when its safe to assume the two are close enough that a little lossiness doesn't matter much but it is pretty hard to make assumptions about when the processing time is and has been hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

  * stream -- immutable events that occur
  * tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the root problem is we're missing important use cases that justify the additional knobs then i think it's good to try to really understand them. I think there could be use cases around systems that don't take updates, example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new semantics, but might help with the use cases we need to collect, would be to add a new operator in the DSL. Something like .freezeAfter(30, TimeUnit.SECONDS) that collects all updates for a given window and both emits and enforces a single output after 30 seconds after the advancement of stream time and remembers that it is omitted, suppressing all further output (so the output is actually a KStream). This might or might not depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> wrote:

    I wonder if it's a frequent enough use case that Kafka Streams
    should consider providing this out of the box - this was asked for
    multiple times, right?

    Personally, I agree totally with the philosophy of "no final
    aggregation", as expressed by Eno's post, but IMO that is
    predicated totally on event-time semantics.

    If users want processing-time semantics then, as the docs already
    point out, there is no such thing as a late-arriving record -
    every record just falls in the currently open window(s), hence the
    notion of final aggregation makes perfect sense, from the
    usability point of view.

    The single abstraction of "stream time" proves leaky in some cases
    (e.g. for punctuate method - being addressed in KIP-138). Perhaps
    this is another case where processing-time semantics warrant
    explicit handling in the api - but of course, only if there's
    sufficient user demand for this.

    What I could imagine is a new type of time window
    (ProcessingTimeWindow?), that if used in an aggregation, the
    underlying processor would force the WallclockTimestampExtractor
    (KAFKA-4144 enables that) and would use the system-time
    punctuation (KIP-138) to send the final aggregation value once the
    window has expired and could be configured to not send
    intermediate updates while the window was open.

    Of course this is just a helper for the users, since they can
    implement it all themselves using the low-level API, as Matthias
    pointed out already. Just seems there's recurring interest in this.

    Again, this only makes sense for processing time semantics. For
    event-time semantics I find the arguments for "no final
    aggregation" totally convincing.


    Cheers,

    Michał


    On 16/06/17 00:08, Matthias J. Sax wrote:
    Hi Paolo,

    This SO question might help, too:
    
https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
    
<https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable>

    For Streams, the basic model is based on "change" and we report updates
    to the "current" result immediately reducing latency to a minimum.

    Last, if you say it's going to fall into the next window, you won't get
    event time semantics but you fall back processing time semantics, that
    cannot provide exact results....

    If you really want to trade-off correctness version getting (late)
    updates and want to use processing time semantics, you should configure
    WallclockTimestampExtractor and implement a "update deduplication"
    operator using table.toStream().transform(). You can attached a state to
    your transformer and store all update there (ie, newer update overwrite
    older updates). Punctuations allow you to emit "final" results for
    windows for which "window end time" passed.


    -Matthias

    On 6/15/17 9:21 AM, Paolo Patierno wrote:
    Hi Eno,


    regarding closing window I think that it's up to the streaming application. 
I mean ...

    If I want something like I described, I know that a value outside my 5 seconds window 
will be taken into account for the next processing (in the next 5 seconds). I don't think 
I'm losing a record, I am ware that this record will fall in the next 
"processing" window. Btw I'll take a look at your article ! Thanks !


    Paolo


    Paolo Patierno
    Senior Software Engineer (IoT) @ Red Hat
    Microsoft MVP on Windows Embedded & IoT
    Microsoft Azure Advisor

    Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
    Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
    <http://it.linkedin.com/in/paolopatierno>
    Blog : DevExperience<http://paolopatierno.wordpress.com/>
    <http://paolopatierno.wordpress.com/>


    ________________________________
    From: Eno Thereska<eno.there...@gmail.com> <mailto:eno.there...@gmail.com>
    Sent: Thursday, June 15, 2017 3:57 PM
    To:us...@kafka.apache.org <mailto:us...@kafka.apache.org>
    Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

    Hi Paolo,

    Yeah, so if you want fewer records, you should actually "not" disable 
cache. If you disable cache you'll get all the records as you described.

    About closing windows: if you close a window and a late record arrives that 
should have been in that window, you basically lose the ability to process that 
record. In Kafka Streams we are robust to that, in that we handle late arriving 
records. There is a comparison here for example when we compare it to other 
methods that depend on watermarks or 
triggers:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
    <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>  
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
    <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

    Eno


    On 15 Jun 2017, at 14:57, Paolo Patierno<ppatie...@live.com> 
<mailto:ppatie...@live.com>  wrote:

    Hi Emo,


    thanks for the reply !

    Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
(so disabling cache).

    Regarding the interactive query API (I'll take a look) it means that it's 
up to the application doing something like we have oob with Spark.

    May I ask what do you mean with "We don’t believe in closing windows" ? 
Isn't it much more code that user has to write for having the same result ?

    I'm exploring Kafka Streams and it's very powerful imho even because the 
usage is pretty simple but this scenario could have a lack against Spark.


    Thanks,

    Paolo.


    Paolo Patierno
    Senior Software Engineer (IoT) @ Red Hat
    Microsoft MVP on Windows Embedded & IoT
    Microsoft Azure Advisor

    Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
    Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
    <http://it.linkedin.com/in/paolopatierno>
    Blog : DevExperience<http://paolopatierno.wordpress.com/>
    <http://paolopatierno.wordpress.com/>


    ________________________________
    From: Eno Thereska<eno.there...@gmail.com> <mailto:eno.there...@gmail.com>
    Sent: Thursday, June 15, 2017 1:45 PM
    To:us...@kafka.apache.org <mailto:us...@kafka.apache.org>
    Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

    Hi Paolo,

    That is indeed correct. We don’t believe in closing windows in Kafka 
Streams.
    You could reduce the number of downstream records by using record 
caches:http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
    
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
  
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
    
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

    Alternatively you can just query the KTable whenever you want using the 
Interactive Query APIs (so when you query dictates what  data you receive), see 
thishttps://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
    
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
  
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
    
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

    Thanks
    Eno
    On Jun 15, 2017, at 2:38 PM, Paolo Patierno<ppatie...@live.com> 
<mailto:ppatie...@live.com>  wrote:

    Hi,


    using the streams library I noticed a difference (or there is a lack of 
knowledge on my side)with Apache Spark.

    Imagine following scenario ...


    I have a source topic where numeric values come in and I want to check the 
maximum value in the latest 5 seconds but ... putting the max value into a 
destination topic every 5 seconds.

    This is what happens with reduceByWindow method in Spark.

    I'm using reduce on a KStream here that process the max value taking into 
account previous values in the latest 5 seconds but the final value is put into 
the destination topic for each incoming value.


    For example ...


    An application sends numeric values every 1 second.

    With Spark ... the source gets values every 1 second, process max in a 
window of 5 seconds, puts the max into the destination every 5 seconds (so when 
the window ends). If the sequence is 21, 25, 22, 20, 26 the output will be just 
26.

    With Kafka Streams ... the source gets values every 1 second, process max 
in a window of 5 seconds, puts the max into the destination every 1 seconds (so 
every time an incoming value arrives). Of course, if for example the sequence 
is 21, 25, 22, 20, 26 ... the output will be 21, 25, 25, 25, 26.


    Is it possible with Kafka Streams ? Or it's something to do at application 
level ?


    Thanks,

    Paolo


    Paolo Patierno
    Senior Software Engineer (IoT) @ Red Hat
    Microsoft MVP on Windows Embedded & IoT
    Microsoft Azure Advisor

    Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
    Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
    <http://it.linkedin.com/in/paolopatierno>
    Blog : DevExperience<http://paolopatierno.wordpress.com/>
    <http://paolopatierno.wordpress.com/>

-- <http://www.openbet.com/> Michal Borowiecki
    Senior Software Engineer L4
        T:      +44 208 742 1600 <tel:+44%2020%208742%201600>

        
        +44 203 249 8448 <tel:+44%2020%203249%208448>

        
        
        E:      michal.borowie...@openbet.com
    <mailto:michal.borowie...@openbet.com>
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
    <https://www.openbet.com/email_promo>

    This message is confidential and intended only for the addressee.
    If you have received this message in error, please immediately
    notify the postmas...@openbet.com <mailto:postmas...@openbet.com>
    and delete it from your system as well as any copies. The content
    of e-mails as well as traffic data may be monitored by OpenBet for
    employment and security purposes. To protect the environment
    please do not print this e-mail unless necessary. OpenBet Ltd.
    Registered Office: Chiswick Park Building 9, 566 Chiswick High
    Road, London, W4 5XT, United Kingdom. A company registered in
    England and Wales. Registered no. 3134634. VAT no. GB927523612



--
Signature
<http://www.openbet.com/>         Michal Borowiecki
Senior Software Engineer L4
        T:      +44 208 742 1600

        
        +44 203 249 8448

        
        
        E:      michal.borowie...@openbet.com
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Reply via email to