Thanks, Ivan!

That sounds like a great plan to me. Two smaller KIPs are easier to agree on 
than one big one. 

I agree hopping and sliding windows will actually have a duplicating effect. We 
can avoid adding distinct() to the sliding window interface, but hopping 
windows are just a different parameterization of epoch-aligned windows. It 
seems we can’t do much about that except document the issue.

Thanks,
John

On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> Hi John!
> 
> I think that your proposal is just fantastic, it simplifies things a lot!
> 
> I also felt uncomfortable due to the fact that the proposed `distinct()` 
> is not somewhere near `count()` and `reduce(..)`. But 
> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like 
> a correct option for  me because of the issue with the unneeded 
> repartitioning.
> 
> The bold idea that we can just CANCEL the repartitioning didn't came to 
> my mind.
> 
> What seemed to me a single problem is in fact two unrelated problems: 
> `distinct` operation and cancelling the unneeded repartitioning.
> 
>  > what if we introduce a parameter to `selectKey()` that specifies that 
> the caller asserts that the new key does _not_ change the data partitioning?
> 
> I think a more elegant solution would be not to add a new parameter to 
> `selectKey` and all the other key-changing operations (`map`, 
> `transform`, `flatMap`, ...), but add a new operator 
> `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag 
> for the upstream node. Of course, "use it only if you know what you're 
> doing" warning is to be added. Well, it's a topic for a separate KIP!
> 
> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then 
> changes to the API are minimally invasive: we're just adding 
> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and 
> that's all.
> 
> We can now define `distinct` as an operation that returns only a first 
> record that falls into a new window, and filters out all the other 
> records that fall into an already existing window. BTW, we can mock the 
> behaviour of such an operation with `TopologyTestDriver` using 
> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)
> 
> Consider the following example (record times are in seconds):
> 
> //three bursts of variously ordered records
> 4, 5, 6
> 23, 22, 24
> 34, 33, 32
> //'late arrivals'
> 7, 22, 35
> 
> 
> 1. 'Epoch-aligned deduplication' using tumbling windows:
> 
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> 
> produces
> 
> (key@[00000/10000], 4)
> (key@[20000/30000], 23)
> (key@[30000/40000], 34)
> 
> -- that is, one record per epoch-aligned window.
> 
> 2. Hopping and sliding windows do not make much sense here, because they 
> produce multiple intersected windows, so that one record can be 
> multiplied, but we want deduplication.
> 
> 3. SessionWindows work for 'data-aligned deduplication'.
> 
> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
>  
> 
> 
> produces only
> 
> ([key@4000/4000], 4)
> ([key@23000/23000], 23)
> 
> because all the records bigger than 7 are stuck together in one session. 
> Setting inactivity gap to 9 seconds will return three records:
> 
> ([key@4000/4000], 4)
> ([key@23000/23000], 23)
> ([key@34000/34000], 34)
> 
> WDYT? If you like this variant, I will re-write KIP-655 and propose a 
> separate KIP for `cancelRepartitioning` (or whatever name we will choose 
> for it).
> 
> Regards,
> 
> Ivan
> 
> 
> 24.05.2021 22:32, John Roesler пишет:
> > Hey there, Ivan!
> > 
> > In typical fashion, I'm going to make a somewhat outlandish
> > proposal. I'm hoping that we can side-step some of the
> > complications that have arisen. Please bear with me.
> > 
> > It seems like `distinct()` is not fundamentally unlike other windowed
> > "aggregation" operations. Your concern about unnecessary
> > repartitioning seems to apply just as well to `count()` as to `distinct()`.
> > This has come up before, but I don't remember when: what if we
> > introduce a parameter to `selectKey()` that specifies that the caller
> > asserts that the new key does _not_ change the data partitioning?
> > The docs on that parameter would of course spell out all the "rights
> > and responsibilities" of setting it.
> > 
> > In that case, we could indeed get back to
> > `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the
> > key mapper and the windowing function without having to carve out
> > a separate domain just for `distinct()`. All the rest of the KStream
> > operations would also benefit.
> > 
> > What do you think?
> > 
> > Thanks,
> > John
> > 
> > On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote:
> >> Hello everyone,
> >>
> >> let me revive the discussion for KIP-655. Now I have some time again and
> >> I'm eager to finalize this.
> >>
> >> Based on what was already discussed, I think that we can split the
> >> discussion into three topics for our convenience.
> >>
> >> The three topics are:
> >>
> >> - idExtractor  (how should we extract the deduplication key for the record)
> >>
> >> - timeWindows (what time windows should we use)
> >>
> >> - miscellaneous (naming etc.)
> >>
> >> ---- idExtractor ----
> >>
> >> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, v) ->
> >> k.  The drawback here is that we must warn the user to choose such a
> >> function that sets different IDs for records from different partitions,
> >> otherwise same IDs might be not co-partitioned (and not deduplicated as
> >> a result). Additional concern: what should we do when this function
> >> returns null?
> >>
> >> Matthias proposed key-only deduplication: that is, no idExtractor at
> >> all, and if we want to use `distinct` for a particular identifier, we
> >> must `selectKey()` before. The drawback of this approach is that we will
> >> always have repartitioning after the key selection, while in practice
> >> repartitioning will not always be necessary (for example, when the data
> >> stream is such that different values infer different keys).
> >>
> >> So here we have a 'safety vs. performance' trade-off. But 'safe' variant
> >> is also not very convenient for developers, since we're forcing them to
> >> change the structure of their records.
> >>
> >> A 'golden mean' here might be using composite ID with its first
> >> component equals to k and its second component equals to some f(v) (f
> >> defaults to v -> null, and null value returned by f(v) means
> >> 'deduplicate by the key only'). The nuance here is that we will have
> >> serializers only for types of k and f(v), and we must correctly
> >> serialize a tuple (k, f(v)), but of course this is doable.
> >>
> >> What do you think?
> >>
> >> ---- timeWindows ----
> >>
> >> Originally I proposed TimeWindows only just because they solved my
> >> particular case :-) but agree with Matthias' and Sophie's objections.
> >>
> >> I like the Sophie's point: we need both epoch-aligned and data-aligned
> >> windows. IMO this is absolutely correct: "data-aligned is useful for
> >> example when you know that a large number of updates to a single key
> >> will occur in short bursts, and epoch-aligned when you specifically want
> >> to get just a single update per discrete time interval."
> >>
> >> I just cannot agree right away with Sophie's
> >> .groupByKey().windowedBy(...).distinct() proposal, as it implies  the
> >> key-only deduplication -- see the previous topic.
> >>
> >> Epoch-aligned windows are very simple: they should forward only one
> >> record per enumerated time window. TimeWindows are exactly what we want
> >> here. I mentioned in the KIP both tumbling and hopping windows just
> >> because both are possible for TimeWindows, but indeed I don't see any
> >> real use case for hopping windows, only tumbling windows make sence IMO.
> >>
> >> For data-aligned windows SlidingWindow interface seems to be a nearly
> >> valid choice. Nearly. It should forward a record once when it's first
> >> seen, and then not again for any identical records that fall into the
> >> next N timeUnits.  However, we cannot reuse SlidingWindow as is, because
> >> just as Matthias noted, SlidingWindows go backward in time, while we
> >> need a windows that go forward in time, and are not opened while records
> >> fall into an already existing window. We definitely should make our own
> >> implementation, maybe we should call it ExpirationWindow? WDYT?
> >>
> >>
> >> ---- miscellaneous ----
> >>
> >> Persistent/in-memory stores. Matthias proposed to pass Materialized
> >> parameter next to DistinctParameters (and this is necessary, because we
> >> will need to provide a serializer for extracted id). This is absolutely
> >> valid point, I agree and I will fix it in the KIP.
> >>
> >> Naming. Sophie noted that the Streams DSL operators are typically named
> >> as verbs, so she proposes `deduplicate` in favour of `distinct`. I think
> >> that while it's important to stick to the naming conventions, it is also
> >> important to think of the experience of those who come from different
> >> stacks/technologies. People who are familiar with SQL and Java Streams
> >> API must know for sure what does 'distinct' mean, while data
> >> deduplication in general is a more complex task and thus `deduplicate`
> >> might be misleading. But I'm ready to be convinced if the majority
> >> thinks otherwise.
> >>
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >>
> >>
> >> 14.09.2020 21:31, Sophie Blee-Goldman пишет:
> >>> Hey all,
> >>>
> >>> I'm not convinced either epoch-aligned or data-aligned will fit all
> >>> possible use cases.
> >>> Both seem totally reasonable to me: data-aligned is useful for example 
> >>> when
> >>> you know
> >>> that a large number of updates to a single key will occur in short bursts,
> >>> and epoch-
> >>> aligned when you specifically want to get just a single update per 
> >>> discrete
> >>> time
> >>> interval.
> >>>
> >>> Going a step further, though, what if you want just a single update per
> >>> calendar
> >>> month, or per year with accounting for leap years? Neither of those are
> >>> serviced that
> >>> well by the existing Windows specification to windowed aggregations, a
> >>> well-known
> >>> limitation of the current API. There is actually a KIP
> >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
> >>> going
> >>> on in parallel to fix this
> >>> exact issue and make the windowing interface much more flexible. Maybe
> >>> instead
> >>> of re-implementing this windowing interface in a similarly limited fashion
> >>> for the
> >>> Distinct operator, we could leverage it here and get all the benefits
> >>> coming with
> >>> KIP-645.
> >>>
> >>> Specifically, I'm proposing to remove the TimeWindows/etc config from the
> >>> DistinctParameters class, and move the distinct() method from the KStream
> >>> interface
> >>> to the TimeWindowedKStream interface. Since it's semantically similar to a
> >>> kind of
> >>> windowed aggregation, it makes sense to align it with the existing 
> >>> windowing
> >>> framework, ie:
> >>>
> >>> inputStream
> >>>       .groupKyKey()
> >>>       .windowedBy()
> >>>       .distinct()
> >>>
> >>> Then we could use data-aligned windows if SlidingWindows is specified in
> >>> the
> >>> windowedBy(), and epoch-aligned (or some other kind of enumerable window)
> >>> if a Windows is specified in windowedBy() (or an 
> >>> EnumerableWindowDefinition
> >>> once KIP-645 is implemented to replace Windows).
> >>>
> >>> *SlidingWindows*: should forward a record once when it's first seen, and
> >>> then not again
> >>> for any identical records that fall into the next N timeUnits. This
> >>> includes out-of-order
> >>> records, ie if you have a SlidingWindows of size 10s and process records 
> >>> at
> >>> time
> >>> 15s, 20s, 14s then you would just forward the one at 15s. Presumably, if
> >>> you're
> >>> using SlidingWindows, you don't care about what falls into exact time
> >>> boxes, you just
> >>> want to deduplicate. If you do care about exact time boxing then you 
> >>> should
> >>> use...
> >>>
> >>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward only one
> >>> record
> >>> per enumerated time window. If you get a records at 15s, 20s,14s where the
> >>> windows
> >>> are enumerated at [5,14], [15, 24], etc then you forward the record at 15s
> >>> and also
> >>> the record at 14s
> >>>
> >>> Just an idea: not sure if the impedance mismatch would throw users off
> >>> since the
> >>> semantics of the distinct windows are slightly different than in the
> >>> aggregations.
> >>> But if we don't fit this into the existing windowed framework, then we
> >>> shouldn't use
> >>> any existing Windows-type classes at all, imo. ie we should create a new
> >>> DistinctWindows config class, similar to how stream-stream joins get their
> >>> own
> >>> JoinWindows class
> >>>
> >>> I also think that non-windowed deduplication could be useful, in which 
> >>> case
> >>> we
> >>> would want to also have the distinct() operator on the KStream interface.
> >>>
> >>>
> >>> One quick note regarding the naming: it seems like the Streams DSL 
> >>> operators
> >>> are typically named as verbs rather than adjectives, for example. 
> >>> #suppress
> >>> or
> >>> #aggregate. I get that there's some precedent for  'distinct' 
> >>> specifically,
> >>> but
> >>> maybe something like 'deduplicate' would be more appropriate for the 
> >>> Streams
> >>> API.
> >>>
> >>> WDYT?
> >>>
> >>>
> >>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev 
> >>> <iponoma...@mail.ru.invalid>
> >>> wrote:
> >>>
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for your review! It made me think deeper, and indeed I understood
> >>>> that I was missing some important details.
> >>>>
> >>>> To simplify, let me explain my particular use case first so I can refer
> >>>> to it later.
> >>>>
> >>>> We have a system that collects information about ongoing live sporting
> >>>> events from different sources. The information sources have their IDs
> >>>> and these IDs are keys of the stream. Each source emits messages
> >>>> concerning sporting events, and we can have many messages about each
> >>>> sporing event from each source. Event ID is extracted from the message.
> >>>>
> >>>> We need a database of event IDs that were reported at least once by each
> >>>> source (important: events from different sources are considered to be
> >>>> different entities). The requirements are:
> >>>>
> >>>> 1) each new event ID should be written to the database as soon as 
> >>>> possible
> >>>>
> >>>> 2) although it's ok and sometimes even desired to repeat the
> >>>> notification about already known event ID, but we wouldn’t like our
> >>>> database to be bothered by the same event ID more often than once in a
> >>>> given period of time (say, 15 minutes).
> >>>>
> >>>> With this example in mind let me answer your questions
> >>>>
> >>>>    > (1) Using the `idExtractor` has the issue that data might not be
> >>>>    > co-partitioned as you mentioned in the KIP. Thus, I am wondering if 
> >>>> it
> >>>>    > might be better to do deduplication only on the key? If one sets a 
> >>>> new
> >>>>    > key upstream (ie, extracts the deduplication id into the key), the
> >>>>    > `distinct` operator could automatically repartition the data and 
> >>>> thus we
> >>>>    > would avoid user errors.
> >>>>
> >>>> Of course with 'key-only' deduplication + autorepartitioning we will
> >>>> never cause problems with co-partitioning. But in practice, we often
> >>>> don't need repartitioning even if 'dedup ID' is different from the key,
> >>>> like in my example above. So here we have a sort of 'performance vs
> >>>> security' tradeoff.
> >>>>
> >>>> The 'golden middle way' here can be the following: we can form a
> >>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case
> >>>> idExtractor is not provided, we deduplicate by key only (as in original
> >>>> proposal). Then idExtractor transforms only the value (and not the key)
> >>>> and its result is appended to the key. Records from different partitions
> >>>> will inherently have different deduplication IDs and all the data will
> >>>> be co-partitioned. As with any stateful operation, we will repartition
> >>>> the topic in case the key was changed upstream, but only in this case,
> >>>> thus avoiding unnecessary repartitioning. My example above fits this
> >>>> perfectly.
> >>>>
> >>>>    > (2) What is the motivation for allowing the `idExtractor` to return
> >>>>    > `null`? Might be good to have some use-case examples for this 
> >>>> feature.
> >>>>
> >>>> Can't think of any use-cases. As it often happens, it's just came with a
> >>>> copy-paste from StackOverflow -- see Michael Noll's answer here:
> >>>>
> >>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions
> >>>>
> >>>> But, jokes aside, we'll have to decide what to do with nulls. If we
> >>>> accept the above proposal of having deduplication ID as KEY + postfix,
> >>>> then null can be treated as no postfix at all. If we don't accept this
> >>>> approach, then treating nulls as 'no-deduplication' seems to be a
> >>>> reasonable assumption (we can't get or put null as a key to a KV store,
> >>>> so a record with null ID is always going to look 'new' for us).
> >>>>
> >>>>
> >>>>    > (2) Is using a `TimeWindow` really what we want? I was wondering if 
> >>>> a
> >>>>    > `SlidingWindow` might be better? Or maybe we need a new type of 
> >>>> window?
> >>>>
> >>>> Agree. It's probably not what we want. Once I thought that reusing
> >>>> TimeWindow is a clever idea, now I don't.
> >>>>
> >>>> Do we need epoch alignment in our use case? No, we don't, and I don't
> >>>> know if anyone going to need this. Epoch alignment is good for
> >>>> aggregation, but deduplication is a different story.
> >>>>
> >>>> Let me describe the semantic the way I see it now and tell me what you
> >>>> think:
> >>>>
> >>>> - the only parameter that defines the deduplication logic is 'expiration
> >>>> period'
> >>>>
> >>>> - when a deduplication ID arrives and we cannot find it in the store, we
> >>>> forward the message downstream and store the ID + its timestamp.
> >>>>
> >>>> - when an out-of-order ID arrives with an older timestamp and we find a
> >>>> 'fresher' record, we do nothing and don't forward the message (??? OR
> >>>> NOT? In what case would we want to forward an out-of-order message?)
> >>>>
> >>>> - when an ID with fresher timestamp arrives we check if it falls into
> >>>> the expiration period and either forward it or not, but in both cases we
> >>>> update the timestamp of the message in the store
> >>>>
> >>>> - the WindowStore retention mechanism should clean up very old records
> >>>> in order not to run out of space.
> >>>>
> >>>>    > (3) `isPersistent` -- instead of using this flag, it seems better to
> >>>>    > allow users to pass in a `Materialized` parameter next to
> >>>>    > `DistinctParameters` to configure the state store?
> >>>>
> >>>> Fully agree! Users might also want to change the retention time.
> >>>>
> >>>>    > (4) I am wondering if we should really have 4 overloads for
> >>>>    > `DistinctParameters.with()`? It might be better to have one overload
> >>>>    > with all require parameters, and add optional parameters using the
> >>>>    > builder pattern? This seems to follow the DSL Grammer proposal.
> >>>>
> >>>> Oh, I can explain. We can't fully rely on the builder pattern because of
> >>>> Java type inference limitations. We have to provide type parameters to
> >>>> the builder methods or the code won't compile: see e. g. this
> >>>> https://twitter.com/inponomarev/status/1265053286933159938 and following
> >>>> discussion with Tagir Valeev.
> >>>>
> >>>> When we came across the similar difficulties in KIP-418, we finally
> >>>> decided to add all the necessary overloads to parameter class. So I just
> >>>> reproduced that approach here.
> >>>>
> >>>>    > (5) Even if it might be an implementation detail (and maybe the KIP
> >>>>    > itself does not need to mention it), can you give a high level 
> >>>> overview
> >>>>    > how you intent to implement it (that would be easier to grog, 
> >>>> compared
> >>>>    > to reading the PR).
> >>>>
> >>>> Well as with any operation on KStreamImpl level I'm building a store and
> >>>> a processor node.
> >>>>
> >>>> KStreamDistinct class is going to be the ProcessorSupplier, with the
> >>>> logic regarding the forwarding/muting of the records located in
> >>>> KStreamDistinct.KStreamDistinctProcessor#process
> >>>>
> >>>> ----
> >>>>
> >>>> Matthias, if you are still reading this :-) a gentle reminder: my PR for
> >>>> already accepted KIP-418 is still waiting for your review. I think it's
> >>>> better for me to finalize at least one  KIP before proceeding to a new
> >>>> one :-)
> >>>>
> >>>> Regards,
> >>>>
> >>>> Ivan
> >>>>
> >>>> 03.09.2020 4:20, Matthias J. Sax пишет:
> >>>>> Thanks for the KIP Ivan. Having a built-in deduplication operator is for
> >>>>> sure a good addition.
> >>>>>
> >>>>> Couple of questions:
> >>>>>
> >>>>> (1) Using the `idExtractor` has the issue that data might not be
> >>>>> co-partitioned as you mentioned in the KIP. Thus, I am wondering if it
> >>>>> might be better to do deduplication only on the key? If one sets a new
> >>>>> key upstream (ie, extracts the deduplication id into the key), the
> >>>>> `distinct` operator could automatically repartition the data and thus we
> >>>>> would avoid user errors.
> >>>>>
> >>>>> (2) What is the motivation for allowing the `idExtractor` to return
> >>>>> `null`? Might be good to have some use-case examples for this feature.
> >>>>>
> >>>>> (2) Is using a `TimeWindow` really what we want? I was wondering if a
> >>>>> `SlidingWindow` might be better? Or maybe we need a new type of window?
> >>>>>
> >>>>> It would be helpful if you could describe potential use cases in more
> >>>>> detail. -- I am mainly wondering about hopping window? Each record would
> >>>>> always falls into multiple window and thus would be emitted multiple
> >>>>> times, ie, each time the window closes. Is this really a valid use case?
> >>>>>
> >>>>> It seems that for de-duplication, one wants to have some "expiration
> >>>>> time", ie, for each ID, deduplicate all consecutive records with the
> >>>>> same ID and emit the first record after the "expiration time" passed. In
> >>>>> terms of a window, this would mean that the window starts at `r.ts` and
> >>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the data.
> >>>>> TimeWindows are aligned to the epoch though. While `SlidingWindows` also
> >>>>> align to the data, for the aggregation use-case they go backward in
> >>>>> time, while we need a window that goes forward in time. It's an open
> >>>>> question if we can re-purpose `SlidingWindows` -- it might be ok the
> >>>>> make the alignment (into the past vs into the future) an operator
> >>>>> dependent behavior?
> >>>>>
> >>>>> (3) `isPersistent` -- instead of using this flag, it seems better to
> >>>>> allow users to pass in a `Materialized` parameter next to
> >>>>> `DistinctParameters` to configure the state store?
> >>>>>
> >>>>> (4) I am wondering if we should really have 4 overloads for
> >>>>> `DistinctParameters.with()`? It might be better to have one overload
> >>>>> with all require parameters, and add optional parameters using the
> >>>>> builder pattern? This seems to follow the DSL Grammer proposal.
> >>>>>
> >>>>> (5) Even if it might be an implementation detail (and maybe the KIP
> >>>>> itself does not need to mention it), can you give a high level overview
> >>>>> how you intent to implement it (that would be easier to grog, compared
> >>>>> to reading the PR).
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote:
> >>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic
> >>>>>>
> >>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет:
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> I'd like to start a discussion for KIP-655.
> >>>>>>>
> >>>>>>> KIP-655:
> >>>>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API
> >>>>>>>
> >>>>>>>
> >>>>>>> I also opened a proof-of-concept PR for you to experiment with the 
> >>>>>>> API:
> >>>>>>>
> >>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>>
> >>>>>>> Ivan Ponomarev
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> 
> 

Reply via email to