Thanks, Ivan! That sounds like a great plan to me. Two smaller KIPs are easier to agree on than one big one.
I agree hopping and sliding windows will actually have a duplicating effect. We can avoid adding distinct() to the sliding window interface, but hopping windows are just a different parameterization of epoch-aligned windows. It seems we can’t do much about that except document the issue. Thanks, John On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: > Hi John! > > I think that your proposal is just fantastic, it simplifies things a lot! > > I also felt uncomfortable due to the fact that the proposed `distinct()` > is not somewhere near `count()` and `reduce(..)`. But > `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like > a correct option for me because of the issue with the unneeded > repartitioning. > > The bold idea that we can just CANCEL the repartitioning didn't came to > my mind. > > What seemed to me a single problem is in fact two unrelated problems: > `distinct` operation and cancelling the unneeded repartitioning. > > > what if we introduce a parameter to `selectKey()` that specifies that > the caller asserts that the new key does _not_ change the data partitioning? > > I think a more elegant solution would be not to add a new parameter to > `selectKey` and all the other key-changing operations (`map`, > `transform`, `flatMap`, ...), but add a new operator > `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag > for the upstream node. Of course, "use it only if you know what you're > doing" warning is to be added. Well, it's a topic for a separate KIP! > > Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then > changes to the API are minimally invasive: we're just adding > `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and > that's all. > > We can now define `distinct` as an operation that returns only a first > record that falls into a new window, and filters out all the other > records that fall into an already existing window. BTW, we can mock the > behaviour of such an operation with `TopologyTestDriver` using > `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) > > Consider the following example (record times are in seconds): > > //three bursts of variously ordered records > 4, 5, 6 > 23, 22, 24 > 34, 33, 32 > //'late arrivals' > 7, 22, 35 > > > 1. 'Epoch-aligned deduplication' using tumbling windows: > > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() > > produces > > (key@[00000/10000], 4) > (key@[20000/30000], 23) > (key@[30000/40000], 34) > > -- that is, one record per epoch-aligned window. > > 2. Hopping and sliding windows do not make much sense here, because they > produce multiple intersected windows, so that one record can be > multiplied, but we want deduplication. > > 3. SessionWindows work for 'data-aligned deduplication'. > > .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() > > > > produces only > > ([key@4000/4000], 4) > ([key@23000/23000], 23) > > because all the records bigger than 7 are stuck together in one session. > Setting inactivity gap to 9 seconds will return three records: > > ([key@4000/4000], 4) > ([key@23000/23000], 23) > ([key@34000/34000], 34) > > WDYT? If you like this variant, I will re-write KIP-655 and propose a > separate KIP for `cancelRepartitioning` (or whatever name we will choose > for it). > > Regards, > > Ivan > > > 24.05.2021 22:32, John Roesler пишет: > > Hey there, Ivan! > > > > In typical fashion, I'm going to make a somewhat outlandish > > proposal. I'm hoping that we can side-step some of the > > complications that have arisen. Please bear with me. > > > > It seems like `distinct()` is not fundamentally unlike other windowed > > "aggregation" operations. Your concern about unnecessary > > repartitioning seems to apply just as well to `count()` as to `distinct()`. > > This has come up before, but I don't remember when: what if we > > introduce a parameter to `selectKey()` that specifies that the caller > > asserts that the new key does _not_ change the data partitioning? > > The docs on that parameter would of course spell out all the "rights > > and responsibilities" of setting it. > > > > In that case, we could indeed get back to > > `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the > > key mapper and the windowing function without having to carve out > > a separate domain just for `distinct()`. All the rest of the KStream > > operations would also benefit. > > > > What do you think? > > > > Thanks, > > John > > > > On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: > >> Hello everyone, > >> > >> let me revive the discussion for KIP-655. Now I have some time again and > >> I'm eager to finalize this. > >> > >> Based on what was already discussed, I think that we can split the > >> discussion into three topics for our convenience. > >> > >> The three topics are: > >> > >> - idExtractor (how should we extract the deduplication key for the record) > >> > >> - timeWindows (what time windows should we use) > >> > >> - miscellaneous (naming etc.) > >> > >> ---- idExtractor ---- > >> > >> Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, v) -> > >> k. The drawback here is that we must warn the user to choose such a > >> function that sets different IDs for records from different partitions, > >> otherwise same IDs might be not co-partitioned (and not deduplicated as > >> a result). Additional concern: what should we do when this function > >> returns null? > >> > >> Matthias proposed key-only deduplication: that is, no idExtractor at > >> all, and if we want to use `distinct` for a particular identifier, we > >> must `selectKey()` before. The drawback of this approach is that we will > >> always have repartitioning after the key selection, while in practice > >> repartitioning will not always be necessary (for example, when the data > >> stream is such that different values infer different keys). > >> > >> So here we have a 'safety vs. performance' trade-off. But 'safe' variant > >> is also not very convenient for developers, since we're forcing them to > >> change the structure of their records. > >> > >> A 'golden mean' here might be using composite ID with its first > >> component equals to k and its second component equals to some f(v) (f > >> defaults to v -> null, and null value returned by f(v) means > >> 'deduplicate by the key only'). The nuance here is that we will have > >> serializers only for types of k and f(v), and we must correctly > >> serialize a tuple (k, f(v)), but of course this is doable. > >> > >> What do you think? > >> > >> ---- timeWindows ---- > >> > >> Originally I proposed TimeWindows only just because they solved my > >> particular case :-) but agree with Matthias' and Sophie's objections. > >> > >> I like the Sophie's point: we need both epoch-aligned and data-aligned > >> windows. IMO this is absolutely correct: "data-aligned is useful for > >> example when you know that a large number of updates to a single key > >> will occur in short bursts, and epoch-aligned when you specifically want > >> to get just a single update per discrete time interval." > >> > >> I just cannot agree right away with Sophie's > >> .groupByKey().windowedBy(...).distinct() proposal, as it implies the > >> key-only deduplication -- see the previous topic. > >> > >> Epoch-aligned windows are very simple: they should forward only one > >> record per enumerated time window. TimeWindows are exactly what we want > >> here. I mentioned in the KIP both tumbling and hopping windows just > >> because both are possible for TimeWindows, but indeed I don't see any > >> real use case for hopping windows, only tumbling windows make sence IMO. > >> > >> For data-aligned windows SlidingWindow interface seems to be a nearly > >> valid choice. Nearly. It should forward a record once when it's first > >> seen, and then not again for any identical records that fall into the > >> next N timeUnits. However, we cannot reuse SlidingWindow as is, because > >> just as Matthias noted, SlidingWindows go backward in time, while we > >> need a windows that go forward in time, and are not opened while records > >> fall into an already existing window. We definitely should make our own > >> implementation, maybe we should call it ExpirationWindow? WDYT? > >> > >> > >> ---- miscellaneous ---- > >> > >> Persistent/in-memory stores. Matthias proposed to pass Materialized > >> parameter next to DistinctParameters (and this is necessary, because we > >> will need to provide a serializer for extracted id). This is absolutely > >> valid point, I agree and I will fix it in the KIP. > >> > >> Naming. Sophie noted that the Streams DSL operators are typically named > >> as verbs, so she proposes `deduplicate` in favour of `distinct`. I think > >> that while it's important to stick to the naming conventions, it is also > >> important to think of the experience of those who come from different > >> stacks/technologies. People who are familiar with SQL and Java Streams > >> API must know for sure what does 'distinct' mean, while data > >> deduplication in general is a more complex task and thus `deduplicate` > >> might be misleading. But I'm ready to be convinced if the majority > >> thinks otherwise. > >> > >> > >> Regards, > >> > >> Ivan > >> > >> > >> > >> 14.09.2020 21:31, Sophie Blee-Goldman пишет: > >>> Hey all, > >>> > >>> I'm not convinced either epoch-aligned or data-aligned will fit all > >>> possible use cases. > >>> Both seem totally reasonable to me: data-aligned is useful for example > >>> when > >>> you know > >>> that a large number of updates to a single key will occur in short bursts, > >>> and epoch- > >>> aligned when you specifically want to get just a single update per > >>> discrete > >>> time > >>> interval. > >>> > >>> Going a step further, though, what if you want just a single update per > >>> calendar > >>> month, or per year with accounting for leap years? Neither of those are > >>> serviced that > >>> well by the existing Windows specification to windowed aggregations, a > >>> well-known > >>> limitation of the current API. There is actually a KIP > >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> > >>> going > >>> on in parallel to fix this > >>> exact issue and make the windowing interface much more flexible. Maybe > >>> instead > >>> of re-implementing this windowing interface in a similarly limited fashion > >>> for the > >>> Distinct operator, we could leverage it here and get all the benefits > >>> coming with > >>> KIP-645. > >>> > >>> Specifically, I'm proposing to remove the TimeWindows/etc config from the > >>> DistinctParameters class, and move the distinct() method from the KStream > >>> interface > >>> to the TimeWindowedKStream interface. Since it's semantically similar to a > >>> kind of > >>> windowed aggregation, it makes sense to align it with the existing > >>> windowing > >>> framework, ie: > >>> > >>> inputStream > >>> .groupKyKey() > >>> .windowedBy() > >>> .distinct() > >>> > >>> Then we could use data-aligned windows if SlidingWindows is specified in > >>> the > >>> windowedBy(), and epoch-aligned (or some other kind of enumerable window) > >>> if a Windows is specified in windowedBy() (or an > >>> EnumerableWindowDefinition > >>> once KIP-645 is implemented to replace Windows). > >>> > >>> *SlidingWindows*: should forward a record once when it's first seen, and > >>> then not again > >>> for any identical records that fall into the next N timeUnits. This > >>> includes out-of-order > >>> records, ie if you have a SlidingWindows of size 10s and process records > >>> at > >>> time > >>> 15s, 20s, 14s then you would just forward the one at 15s. Presumably, if > >>> you're > >>> using SlidingWindows, you don't care about what falls into exact time > >>> boxes, you just > >>> want to deduplicate. If you do care about exact time boxing then you > >>> should > >>> use... > >>> > >>> *EnumerableWindowDefinition* (eg *TimeWindows*): should forward only one > >>> record > >>> per enumerated time window. If you get a records at 15s, 20s,14s where the > >>> windows > >>> are enumerated at [5,14], [15, 24], etc then you forward the record at 15s > >>> and also > >>> the record at 14s > >>> > >>> Just an idea: not sure if the impedance mismatch would throw users off > >>> since the > >>> semantics of the distinct windows are slightly different than in the > >>> aggregations. > >>> But if we don't fit this into the existing windowed framework, then we > >>> shouldn't use > >>> any existing Windows-type classes at all, imo. ie we should create a new > >>> DistinctWindows config class, similar to how stream-stream joins get their > >>> own > >>> JoinWindows class > >>> > >>> I also think that non-windowed deduplication could be useful, in which > >>> case > >>> we > >>> would want to also have the distinct() operator on the KStream interface. > >>> > >>> > >>> One quick note regarding the naming: it seems like the Streams DSL > >>> operators > >>> are typically named as verbs rather than adjectives, for example. > >>> #suppress > >>> or > >>> #aggregate. I get that there's some precedent for 'distinct' > >>> specifically, > >>> but > >>> maybe something like 'deduplicate' would be more appropriate for the > >>> Streams > >>> API. > >>> > >>> WDYT? > >>> > >>> > >>> On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev > >>> <iponoma...@mail.ru.invalid> > >>> wrote: > >>> > >>>> Hi Matthias, > >>>> > >>>> Thanks for your review! It made me think deeper, and indeed I understood > >>>> that I was missing some important details. > >>>> > >>>> To simplify, let me explain my particular use case first so I can refer > >>>> to it later. > >>>> > >>>> We have a system that collects information about ongoing live sporting > >>>> events from different sources. The information sources have their IDs > >>>> and these IDs are keys of the stream. Each source emits messages > >>>> concerning sporting events, and we can have many messages about each > >>>> sporing event from each source. Event ID is extracted from the message. > >>>> > >>>> We need a database of event IDs that were reported at least once by each > >>>> source (important: events from different sources are considered to be > >>>> different entities). The requirements are: > >>>> > >>>> 1) each new event ID should be written to the database as soon as > >>>> possible > >>>> > >>>> 2) although it's ok and sometimes even desired to repeat the > >>>> notification about already known event ID, but we wouldn’t like our > >>>> database to be bothered by the same event ID more often than once in a > >>>> given period of time (say, 15 minutes). > >>>> > >>>> With this example in mind let me answer your questions > >>>> > >>>> > (1) Using the `idExtractor` has the issue that data might not be > >>>> > co-partitioned as you mentioned in the KIP. Thus, I am wondering if > >>>> it > >>>> > might be better to do deduplication only on the key? If one sets a > >>>> new > >>>> > key upstream (ie, extracts the deduplication id into the key), the > >>>> > `distinct` operator could automatically repartition the data and > >>>> thus we > >>>> > would avoid user errors. > >>>> > >>>> Of course with 'key-only' deduplication + autorepartitioning we will > >>>> never cause problems with co-partitioning. But in practice, we often > >>>> don't need repartitioning even if 'dedup ID' is different from the key, > >>>> like in my example above. So here we have a sort of 'performance vs > >>>> security' tradeoff. > >>>> > >>>> The 'golden middle way' here can be the following: we can form a > >>>> deduplication ID as KEY + separator + idExtractor(VALUE). In case > >>>> idExtractor is not provided, we deduplicate by key only (as in original > >>>> proposal). Then idExtractor transforms only the value (and not the key) > >>>> and its result is appended to the key. Records from different partitions > >>>> will inherently have different deduplication IDs and all the data will > >>>> be co-partitioned. As with any stateful operation, we will repartition > >>>> the topic in case the key was changed upstream, but only in this case, > >>>> thus avoiding unnecessary repartitioning. My example above fits this > >>>> perfectly. > >>>> > >>>> > (2) What is the motivation for allowing the `idExtractor` to return > >>>> > `null`? Might be good to have some use-case examples for this > >>>> feature. > >>>> > >>>> Can't think of any use-cases. As it often happens, it's just came with a > >>>> copy-paste from StackOverflow -- see Michael Noll's answer here: > >>>> > >>>> https://stackoverflow.com/questions/55803210/how-to-handle-duplicate-messages-using-kafka-streaming-dsl-functions > >>>> > >>>> But, jokes aside, we'll have to decide what to do with nulls. If we > >>>> accept the above proposal of having deduplication ID as KEY + postfix, > >>>> then null can be treated as no postfix at all. If we don't accept this > >>>> approach, then treating nulls as 'no-deduplication' seems to be a > >>>> reasonable assumption (we can't get or put null as a key to a KV store, > >>>> so a record with null ID is always going to look 'new' for us). > >>>> > >>>> > >>>> > (2) Is using a `TimeWindow` really what we want? I was wondering if > >>>> a > >>>> > `SlidingWindow` might be better? Or maybe we need a new type of > >>>> window? > >>>> > >>>> Agree. It's probably not what we want. Once I thought that reusing > >>>> TimeWindow is a clever idea, now I don't. > >>>> > >>>> Do we need epoch alignment in our use case? No, we don't, and I don't > >>>> know if anyone going to need this. Epoch alignment is good for > >>>> aggregation, but deduplication is a different story. > >>>> > >>>> Let me describe the semantic the way I see it now and tell me what you > >>>> think: > >>>> > >>>> - the only parameter that defines the deduplication logic is 'expiration > >>>> period' > >>>> > >>>> - when a deduplication ID arrives and we cannot find it in the store, we > >>>> forward the message downstream and store the ID + its timestamp. > >>>> > >>>> - when an out-of-order ID arrives with an older timestamp and we find a > >>>> 'fresher' record, we do nothing and don't forward the message (??? OR > >>>> NOT? In what case would we want to forward an out-of-order message?) > >>>> > >>>> - when an ID with fresher timestamp arrives we check if it falls into > >>>> the expiration period and either forward it or not, but in both cases we > >>>> update the timestamp of the message in the store > >>>> > >>>> - the WindowStore retention mechanism should clean up very old records > >>>> in order not to run out of space. > >>>> > >>>> > (3) `isPersistent` -- instead of using this flag, it seems better to > >>>> > allow users to pass in a `Materialized` parameter next to > >>>> > `DistinctParameters` to configure the state store? > >>>> > >>>> Fully agree! Users might also want to change the retention time. > >>>> > >>>> > (4) I am wondering if we should really have 4 overloads for > >>>> > `DistinctParameters.with()`? It might be better to have one overload > >>>> > with all require parameters, and add optional parameters using the > >>>> > builder pattern? This seems to follow the DSL Grammer proposal. > >>>> > >>>> Oh, I can explain. We can't fully rely on the builder pattern because of > >>>> Java type inference limitations. We have to provide type parameters to > >>>> the builder methods or the code won't compile: see e. g. this > >>>> https://twitter.com/inponomarev/status/1265053286933159938 and following > >>>> discussion with Tagir Valeev. > >>>> > >>>> When we came across the similar difficulties in KIP-418, we finally > >>>> decided to add all the necessary overloads to parameter class. So I just > >>>> reproduced that approach here. > >>>> > >>>> > (5) Even if it might be an implementation detail (and maybe the KIP > >>>> > itself does not need to mention it), can you give a high level > >>>> overview > >>>> > how you intent to implement it (that would be easier to grog, > >>>> compared > >>>> > to reading the PR). > >>>> > >>>> Well as with any operation on KStreamImpl level I'm building a store and > >>>> a processor node. > >>>> > >>>> KStreamDistinct class is going to be the ProcessorSupplier, with the > >>>> logic regarding the forwarding/muting of the records located in > >>>> KStreamDistinct.KStreamDistinctProcessor#process > >>>> > >>>> ---- > >>>> > >>>> Matthias, if you are still reading this :-) a gentle reminder: my PR for > >>>> already accepted KIP-418 is still waiting for your review. I think it's > >>>> better for me to finalize at least one KIP before proceeding to a new > >>>> one :-) > >>>> > >>>> Regards, > >>>> > >>>> Ivan > >>>> > >>>> 03.09.2020 4:20, Matthias J. Sax пишет: > >>>>> Thanks for the KIP Ivan. Having a built-in deduplication operator is for > >>>>> sure a good addition. > >>>>> > >>>>> Couple of questions: > >>>>> > >>>>> (1) Using the `idExtractor` has the issue that data might not be > >>>>> co-partitioned as you mentioned in the KIP. Thus, I am wondering if it > >>>>> might be better to do deduplication only on the key? If one sets a new > >>>>> key upstream (ie, extracts the deduplication id into the key), the > >>>>> `distinct` operator could automatically repartition the data and thus we > >>>>> would avoid user errors. > >>>>> > >>>>> (2) What is the motivation for allowing the `idExtractor` to return > >>>>> `null`? Might be good to have some use-case examples for this feature. > >>>>> > >>>>> (2) Is using a `TimeWindow` really what we want? I was wondering if a > >>>>> `SlidingWindow` might be better? Or maybe we need a new type of window? > >>>>> > >>>>> It would be helpful if you could describe potential use cases in more > >>>>> detail. -- I am mainly wondering about hopping window? Each record would > >>>>> always falls into multiple window and thus would be emitted multiple > >>>>> times, ie, each time the window closes. Is this really a valid use case? > >>>>> > >>>>> It seems that for de-duplication, one wants to have some "expiration > >>>>> time", ie, for each ID, deduplicate all consecutive records with the > >>>>> same ID and emit the first record after the "expiration time" passed. In > >>>>> terms of a window, this would mean that the window starts at `r.ts` and > >>>>> ends at `r.ts + windowSize`, ie, the window is aligned to the data. > >>>>> TimeWindows are aligned to the epoch though. While `SlidingWindows` also > >>>>> align to the data, for the aggregation use-case they go backward in > >>>>> time, while we need a window that goes forward in time. It's an open > >>>>> question if we can re-purpose `SlidingWindows` -- it might be ok the > >>>>> make the alignment (into the past vs into the future) an operator > >>>>> dependent behavior? > >>>>> > >>>>> (3) `isPersistent` -- instead of using this flag, it seems better to > >>>>> allow users to pass in a `Materialized` parameter next to > >>>>> `DistinctParameters` to configure the state store? > >>>>> > >>>>> (4) I am wondering if we should really have 4 overloads for > >>>>> `DistinctParameters.with()`? It might be better to have one overload > >>>>> with all require parameters, and add optional parameters using the > >>>>> builder pattern? This seems to follow the DSL Grammer proposal. > >>>>> > >>>>> (5) Even if it might be an implementation detail (and maybe the KIP > >>>>> itself does not need to mention it), can you give a high level overview > >>>>> how you intent to implement it (that would be easier to grog, compared > >>>>> to reading the PR). > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 8/23/20 4:29 PM, Ivan Ponomarev wrote: > >>>>>> Sorry, I forgot to add [DISCUSS] tag to the topic > >>>>>> > >>>>>> 24.08.2020 2:27, Ivan Ponomarev пишет: > >>>>>>> Hello, > >>>>>>> > >>>>>>> I'd like to start a discussion for KIP-655. > >>>>>>> > >>>>>>> KIP-655: > >>>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API > >>>>>>> > >>>>>>> > >>>>>>> I also opened a proof-of-concept PR for you to experiment with the > >>>>>>> API: > >>>>>>> > >>>>>>> PR#9210: https://github.com/apache/kafka/pull/9210 > >>>>>>> > >>>>>>> Regards, > >>>>>>> > >>>>>>> Ivan Ponomarev > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > >