Hi,another idea that came to my mind. Instead of using a compacted topic, the buffer could use a non-compacted topic and regularly delete records before a given offset as Streams does for repartition topics.
Best, Bruno On 05.06.23 21:48, Bruno Cadonna wrote:
Hi Victoria, that is a good point!I think, the topic needs to be a compacted topic to be able to get rid of records that are evicted from the buffer. So the key might be something with the key, the timestamp, and a sequence number to distinguish between records with the same key and same timestamp.Just an idea! Maybe Walker comes up with something better. Best, Bruno On 05.06.23 20:38, Victoria Xia wrote:Hi Walker,Thanks for the latest updates! The KIP looks great. Just one question aboutthe changelog topic for the join buffer: The KIP says "When a failureoccurs the buffer will try to recover from an OffsetCheckpoint if possible.If not it will reload the buffer from a compacted change-log topic." Thisis a new changelog topic that will be introduced specifically for the joinbuffer, right? Why is the changelog topic compacted? What are the keys? Iam confused because the buffer contains records from the stream-side of thejoin, for which multiple records with the same key should be treated as separate updates will all must be tracked in the buffer, rather than updates which replace each other. Thanks, Victoria On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna <cado...@apache.org> wrote:Hi Walker, Thanks once more for the updates to the KIP! Do you also plan to expose metrics for the buffer? Best, Bruno On 02.06.23 17:16, Walker Carlson wrote:Hello Bruno, I think this covers your questions. Let me know what you think 2. We can use a changelog topic. I think we can treat it like any otherstoreand recover in the usual manner. Also implementation is on disk 3. The description is in the public interfaces description. I will copy it into the proposed changes as well.This is a bit of an implementation detail that I didn't want to add intothe kip, but the record will be added to the buffer to keep the streamtimeconsistent, it will just be ejected immediately. If of course if this causes performance issues we will skip this step and track stream timeseparately. I will update the kip to say that stream time advances when astream record enters the node. Also, yes, updated. 5. No there is no difference right now, everything gets processed as itcomesin and tries to find a record for its time stamp. WalkerOn Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna <cado...@apache.org> wrote:Hi Walker, Thanks for the updates! 2. It is still not clear to me how a failure is handled. I do not understand what you mean by "recover from an OffsetCheckpoint".My understanding is that the buffer needs to be replicated into its ownKafka topic. The input topic is not enough. The offset of a record is added to the offsets to commit once the record is streamed through the subtopology. That means once the record is added to the buffer itsoffset is added to the offsets to commit -- independently of whether therecord was evicted from the buffer and sent to the join node or not. Now, let's assume the following scenario 1. a record is read from the input topic and added to the buffer, but not evicted to be processed by the join node. 2. When the processing of the subtopology finishes the offset of the record is added to the offsets to commit. 3. A commit happens. 4. A failure happens After the failure the buffer is empty but the record will not be read anymore from the input topic since its offset has been already committed. The record is lost. One solution to avoid the loss is to recreate the buffer from acompacted Kafka topic as we do for suppression buffers. I do not think, we need any offset checkpoint here since, we keep the buffer in memory,right? Or do you plan to back the buffer with a persistent store? Even in that case, a compacted Kafka topic would be needed. 3.From the KIP it is still not clear to me what happens if a record isoutside of the grace period. I guess the record that falls outside of the grace period will not be added to the buffer, but will be send tothe join node. Since it is outside of the grace period it will also not increase stream time and it will not trigger an eviction. Also the headof the buffer will not contain a record that needs to be evicted sincethe the timestamp of the head record will be within the interval stream time minus grace period. Is this correct? Please add such a descriptionto the KIP. Furthermore, I think there is a mistake in the text:"... will dequeue when the record timestamp is greater than stream timeplus the grace period". I guess that should be "... will dequeue when the record timestamp is less than (or equal?) stream time minus the grace period" 5.What is the difference between not setting the grace period and settingit to zero? If there is a difference, why is there a difference? Best, Bruno On 01.06.23 23:58, Walker Carlson wrote:Hey Bruno thanks for the feedback. 1) I will add this to the kip, but stream time only advances as the whenthebuffer receives a new record. 2)You are correct, I will add a failure section on to the kip. Since therecords wont change in the buffer from when they are read from thetopicthey are replicated already. 3)I see that I'm out voted on the dropping of records thing. We will passthem on and try to join them if possible. This might cause some null results, but increasing the table history retention should help that. 4)I can add some on the kip. But its pretty directly adding whatever thegrace period is to the latency. I don't see a way around it. Walker On Thu, Jun 1, 2023 at 5:23 AM Bruno Cadonna <cado...@apache.org>wrote:Hi Walker, thanks for the KIP! Here my feedback: 1.It is still not clear to me when stream time for the buffer advances.What is the event that let the stream time advance? In thediscussion, Ido not understand what you mean by "The segment store already has an observed stream time, we advance based on that. That should onlyadvancebased on records that enter the store." Where does this segment store come from? Anyways, I think it would be great to also state how streamtime advances in the KIP. 2.How does the buffer behave in case of a failure? I think I understandthat the buffer will use an implementation ofTimeOrderedKeyValueBufferand therefore the records in the buffer will be replicated to a topicinKafka, but I am not completely sure. Could you elaborate on this intheKIP? 3. I agree with Matthias about dropping late records. We use graceperiodsin scenarios where we records are grouped like in windowedaggregationsand windowed joins. The stream buffer you propose does not reallygroupany records. It rather delays records and reorders them. I am not sure if grace period is the right naming/concept to apply here. Instead ofdropping records that fall outside of the buffer's time interval thejoin should skip the buffer and try to join the record immediately. Inthe end, a stream-table join is a unwindowed join, i.e., no groupingisapplied to the records. What do you and other folks think about this proposal? 4. How does the proposed buffer, affects processing latency? Could you please add some words about this to the KIP? Best, Bruno On 31.05.23 01:49, Walker Carlson wrote:Thanks for all the additional comments. I will either address themhereorupdate the kip accordingly. I mentioned a follow kip to add extra features before and in theresponses.I will try to briefly summarize what options and optimizations I plantoinclude. If a concern is not covered in this list I for sure talkaboutitbelow. * Allowing non versioned tables to still use the stream buffer * Automatically materializing tables instead of forcing the user todoit* Configurable for in memory buffer * Order the records in offset order or in time order * Non memory use buffer (offset order, delayed pull from stream.) * Time synced between stream and table side (maybe)* Do not drop late records and process them as they come in instead.First, Victoria.1) (One of your nits covers this, but you are correct it doesn't makesense. so I removed that part of the example.) For those examples with the "bad" join results I said withoutbufferingthestream it would look like that, but that was incomplete. If the lookupwassimply looking at the latest version of the table when the streamrecordscame in then the results were possible. If we are using the point intimelookup that versioned tables let us then you are correct the futureresultsare not possible. 2) I'll get to this later as Matthias brought up something related. To your additional thoughts, I agree that we need to call thosethingsoutin the documentation. I'm writing up a follow up kip with a lot oftheideas we have discussed so that we can improve this feature beyondthebaseimplementation if it's needed. I addressed the nits in the kip. I somehow missed the table streamtablejoin processor improvement, it makes your first question make a lotmoresense. Table history retention is a much cleaner way to describe it.As to your mention of the syncing the time for the table and stream. Matthias mentioned that as well. I will address both here. I plan tobringthat up in the future, but for now we will leave it out. I suppose itwillbe more useful after the table history retention is separable fromthetable grace period. To address Matthias comments. You are correct by saying the in memory store shouldn't cause anysemanticconcerns. My concern would be more with if we limited the number ofrecordson the buffer and what we would do if we hit said limits, (emittingthoserecords might be an issue, throwing an error and halting would not).Ithink we can leave this discussion to the follow up kip along with afewother options. I will go through your proposals now. - don't support non-versioned KTablesSure, we can always expand this later on. Will include as part of theofthe improvement kip- if grace period is added, users need to explicitly materializethetable as version (either directly, or upstream. Upstream only worksifdownstream tables "inherit" versioned semantics -- cf KIP-914)again, that works for me for now, if we find a use we can always addlater.- the table's history retention time must be larger than thegraceperiod (should be easy to check at runtime, when we build thetopology)agreed- because switching from non-versioned to version stores is notbackward compatibly (cf KIP-914), users need to take care of this themselves, and this also implies that adding grace period is not a backward compatible change (even only if via indirect means) sure, this worksAs to the dropping of late records, I'm not sure. One one hand I likenotdropping things. But on the other I struggle to see how a user canfilterout late records that might have incomplete join results. The pointintimelook up will aggressively expire old data and if new data has beenreplacedit will return null if outside of the retention. This seems like itcouldcorrupt the integrity of the join output. Seeing that we drop laterecordson the table side as well I would think it makes sense to drop laterecordson the stream buffer. I could be convinced otherwise I suppose, Icouldseeadding this as an option in a follow up kip. It would be very easy to implement either way. For now unless no one else objects I'm going tostickwith dropping the records for the sake of getting this kip passed. Itisfunctionally a small change to make and we can update later if youfeelstrongly about it.For the ordering. I have to say that it would be more complicated to implement it to be in offset order, if the goal it to get as many oftherecords validly joined as possible. Because we would process asthingsleftthe buffer a sufficiency early enough record could hold up recordsthatwould otherwise be valid past the table history retention. To fixthiswecould process by timestamp then store in a second queue and emit byoffset,but that would be a lot more complicated. If we didn't care about notmissing some valid joins we could just have no store and pull fromthetopic at a delay only caring about the timestamp of the next offset.Fornow I want to stick with the timestamp ordering as it makes much moresenseto me, but would propose we add both of the other options I have laidouthere in the follow up kip.Lastly, I think having an empty store with zero grace period would besupersimple and not costly, so we might as well make it even if nothinggetsentered. I hope that address all your concerns, Walker On Thu, May 25, 2023 at 9:50 AM Matthias J. Sax <mj...@apache.org>wrote:Walker, thanks for the updates. The KIP itself reads fine (of courseVictoriamade good comments about some phrases), but there is a couple ofthingsfrom your latest reply I don't understand, and that I still thinkneedsome more discussions. Lukas, asked about in-memory option and `WindowStoreSupplier` andyoumention "semantic concerns". There should not be any semanticdifferencefrom the underlying buffer implementation, so I am not sure what youmean here (also the relationship to suppress() is unclear to me)?-- Iam ok to not make it configurable for now. We can always do it via afollow up KIP, and keep interface changes limited for now. Does it really make sense to allow a grace period if the table is non-versioned? You also say: "If table is not materialized it will materialize it as versioned." -- What history retention time wouldwepick for this case (also asked by Victoria)? Or should we rather not support this and force the user to materialize the table explicitly,andthus explicitly picking a history retention time? It's tradeoffbetweenusability and guiding uses that there will be a significant impactondisk usage. There is also compatibility concerns: If the table isnotexplicitly materialized in the old program, we would already need tomaterialize it also in the old program (of course, we would use anon-versioned store so far). Thus, if somebody adds a grace period,wecannot just switch the store type, as it would be a breaking change, potentially required an application re-set, or following the upgradepath for versioned state stores, and also changing the program to explicitly materialize using a versioned store. Also note, that wemightnot materialize the actual join table, but only an upstream table,anduse `ValueGetter` to access the upstream data. To this end, as you already mentioned, history retention of thetableshould be at least grace period. You proposed to include this in afollow up KIP, but I am wondering if it's a fundamental requirementandthus we should put a check in place right away and reject an invalidconfiguration? (It always easier to lift restriction than tointroducethem later.) This would also imply that a non-versioned table cannotbesupported, because it does not have a history retention that islargerthan grace period, and maybe also answer the requirement about materialization: as we already always materialize something on thetablet side as non-versioned store right now, it seems difficult tomigrate the store to a versioned store. Ie, it might be ok to pushtheburden onto the user and say: if you start using grace period, youalsoneed to manually switch from non-versioned to versioned KTables.Doingstuff automatically under the hood if very complex for this case, weifwe push the burden onto the user, it might be ok to not complicatethisKIP significantly. To summarize the last two paragraphs, I would propose to: - don't support non-versioned KTables - if grace period is added, users need to explicitlymaterializethetable as version (either directly, or upstream. Upstream only worksifdownstream tables "inherit" versioned semantics -- cf KIP-914) - the table's history retention time must be larger than thegraceperiod (should be easy to check at runtime, when we build thetopology)- because switching from non-versioned to version stores is notbackward compatibly (cf KIP-914), users need to take care of thisthemselves, and this also implies that adding grace period is not abackward compatible change (even only if via indirect means) About dropping late records: wondering if we should never drop a stream-side record for a left-join, even if it's late? In general,onething I observed over the years is, that it's easier to keep stuffandlet users filter explicitly downstream (or make it configurable),instead of dropping pro-actively, because users have no good way toresurrect record that got already dropped. For ordering, sounds reasonable to me only start with one implementation, and maybe make it configurable as a follow up.However,I am wondering if starting with offset order might be the betteroptionas it seems to align more with what we do so far? So instead ofstoringrecord ordered by timestamp, we can just store them ordered byoffset,and still "poll" from the buffer based on the head recordstimestamp.Orwould this complicate the implementation significantly?I also think it's ok to not "sync" stream-time between the table andthestream in this KIP, but we should consider doing this as a follow upchange (not sure if we would need a KIP or not for a change thisthis).About increasing/decreasing grace period: what you describe makesenseto me. If decreased, the next record would just trigger emitting alotof records, and for increase, the buffer would just need to "fillup"again. For reprocessing getting a different result with a differentgrace period is expected, so that's ok IMHO. -- There seems to beonespecial corner case: grace period zero. For this case, we actuallydon'tneed any store, and the stream-side could be stateless. I think itcanhave the same behavior, but if we want to "add / remove" the storedynamically, we need to add specific code for it. For example, evenifwe start up with a grace period of zero, we would need to check ifthereis a local store, and still emit everything in it, before we canditchthe store (not sure if that's even easily done at all). Or: we wouldneed to have a store for _all_ cases, even if grace period is zero(thestore would be empty all the time though), to avoid super complexcode?-Matthias On 5/25/23 10:53 AM, Lucas Brutschy wrote:Hi Walker, thanks for your responses. That makes sense. I guess there isalwaysthe option to make the implementation more configurable later on,ifusers request it. Also thanks for the clarifications. From my side,the KIP is good to go. Cheers, Lucas On Wed, May 24, 2023 at 11:54 PM Victoria Xia <victoria....@confluent.io.invalid> wrote:Thanks for the updates, Walker! Looks great, though I do have acouplequestions about the latest updates:1. The new example says that without stream-side buffering,"ex"and"fy" are possible join results. How could those joinresultshappen? Theexample versioned table suggests that table record "x" hastimestamp 2, andtable record "y" has timestamp 3. If stream record "e" hastimestamp 1,then it can never be joined against record "x", andsimilarlyforstreamrecord "f" with timestamp 2 being joined against "y". 2. I see in your replies above that "If table is notmaterialized itwill materialize it as versioned" but I don't see thiscalledoutin theKIP -- seems worth calling out. Also, what will the historyretention forthe versioned table be? Will it be the same as the joingraceperiod, orwill it be greater? And some additional thoughts:Sounds like there are a few things users should watch out for whenenablingthe stream-side buffer:- Records will get "stuck" if there are no newer records toadvancestream time. - If there are large gaps between the timestamps ofstream-siderecords,then it's possible that versioned store history retentionwillhaveexpiredby the time a record is evicted from the join buffer,leadingtoajoin"miss." For example, if the join grace period and tablehistoryretentionare both 10, and records come in the order: table side t0 with ts=0 stream side s1 with ts=1 <-- enters buffer table side t10 with ts=10 table side t20 with ts=20 stream side s21 with ts=21 <-- evicts record s1 frombuffer,butversioned store no longer contains data for ts=1 due tohistoryretentionhaving elapsedThis will result in the join result (s1, null) even thoughitshould'vebeen (s1, t0), due to t0 having been expired from theversionedstorealready. - Out-of-order records from the stream-side will bereordered,andlaterecords will be dropped.I don't think any of these are reasons to not go forward with thisKIP,butit'd be good to call them out in the eventual documentation todecreasethechance users get tripped up.We could maybe do an improvement later to advance stream timefromtableside as well, but that might be debatable as we might get morelaterecords.Yes, the likelihood of late records increases but also thelikelihoodof"join misses" due to versioned store history retention havingelapseddecreases, which feels important for certain use cases. Eitherway,agreedthat it can be a discussion for the future as incorporating thiswouldsubstantially complicate the implementation. Also a couple nits: - The KIP currently says "We recently added versionedtableswhichallowthe table side of the a join [...] but it is not takenadvantageofinjoins," but this doesn't seem true? If the table of astream-tablejoin isversioned, then the DSL's stream-table join processor willautomaticallyperform timestamped lookups into the table, in order totakeadvantage ofthe new timestamp-aware store to provide better joinsemantics.- The KIP mentions "grace period" for versioned stores in anumberofplaces but I think you actually mean "history retention"?Thetwohappen tobe the same today (it is not an option for users toconfigurethetwoseparately) but this need not be true in the future."Historyretention"governs how far back in time reads may occur, which is therelevantparameter for performing lookups as part of thestream-tablejoin."Graceperiod" in the context of versioned stores refers to howfarbackin timeout-of-order writes may occur, which probably isn'tdirectlyrelevant forintroducing a stream-side buffer, though it's also possibleI'veoverlookedsomething. (As a bonus, switching from "table graceperiod" intheKIP to"table history retention" also helps to clarify/distinguishthatit's adifferent parameter from the "join grace period," which Icouldseebeingconfusing to readers. :) ) Cheers, Victoria On Thu, May 18, 2023 at 1:43 PM Walker Carlson <wcarl...@confluent.io.invalid> wrote:Hey all,Thanks for the comments, they gave me a lot to think about. I'lltrytoaddress them all inorder. I have made some updates to the kiprelatedtothem, but I mention where below. Lucas Good idea about the example. I added a simple one. 1) I have thought about including options for the underlyingbufferconfiguration. One of which might be adding an in memory option.Mybiggestconcern is about the semantic guarantees. This isn't likesuppressorwithwindows where producing incomplete results is repetitivelyharmless.Herewe would be possibly producing incorrect results. I also wouldliketokeepthe interface changes as simple as I can. Making more than thischangetoJoined I feel could make this more complicated than it needs tobe.Ifwereally want to I could see adding a grace() option with aBufferConifginthere or something, but I would rather not.2) The buffer will be independent of if the table is versioned ornot.Iftable is not materialized it will materialize it as versioned. Itmightmake sense to do a follow up kip where we force the retentionperiodofthe versioned to be greater than whatever the max of the streambufferis.Victoria1) Yes, records will exit in timestamp order not in offset order.2) Late records will be dropped (Late as out of the graceperiod).From myunderstanding that is the point of a grace period, no? Doesn'tthesamething happen with versioned stores? 3) The segment store already has an observed stream time, weadvancebasedon that. That should only advance based on records that enter thestore. Soyes, only stream side records. We could maybe do an improvementlatertoadvance stream time from table side as well, but that might bedebatable aswe might get more late records. Anyways I would rather have thatas aseparate discussion. in memory option? We can do that, for the buffer I plan to usetheTimeOrderedKeyValueBuffer interface which already has an inmemoryimplantation, so it would be simple.I said more in my answer to Lucas's question. The concern I havewithbuffer configs or in memory is complicating the interface. Alsosemanticguarantees but in memory shouldn't effect that Matthias 1) fixed out of order vs late terminology in the kip. 2) I was referring to having a stream. So after this kip we canhaveabuffered stream or a normal one. For the table we can use aversionedtableor a normal table. 3 Good call out. I clarified this as "If the table side uses amaterializedversion store, it can store multiple versions of each recordwithinitsdefined grace period." and modified the rest of the paragraph abit.4) I get the preserving off offset ordering, but if the stream isbufferedto join on timestamp instead of offset doesn't it already seemlikewecaremore about time in this case?If we end up adding more options it might make sense to do this.Maybeoffset order processing can be a follow up?I'll add a section for this in Rejected Alternatives. I think itmakessense to do something like this but maybe in a follow up.5) I hadn't thought about this. I suppose if they changed this inanupgrade the next record would either evict a lot of records (ifthegraceperiod decreased) or there would be a pause until the new graceperiodreached. Increasing is a bit more problematic, especially if thetablegrace period and retention time stays the same. If the data isreprocessedafter a change like that then there would be different results,but Ifeellike that would be expected after such a change. What do you think should happen? Hopefully this answers your questions! Walker On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <mj...@apache.org>wrote:Thanks for the KIP! Also some question/comments from my side:10) Notation: you use the term "late data" but I think you mean out-of-order. We reserve the term "late" to records that arriveaftergrace period passed, and thus, "late == out-of-order data thatisdropped".20) "There is only one option from the stream side and onlyrecentlyisthere a second option on the table side." What are those options? Victoria already asked about the tableside,butI am also not sure what option you mean for the stream side? 30) "If the table side uses a materialized version store thevalueisthe latest by stream time rather than by offset within itsdefinedgraceperiod."The phrase "the value is the latest by stream time" is confusing--inthe end, a versioned stores multiple versions, not just one.40) I am also wondering about ordering. In general, KS tries topreserveoffset-order during processing (with some exception, when offsetorderpreservation is not clearly defined). Given that the stream-sidebufferis really just a "linear buffer", we could easily preserveoffset-order.But I also see a benefit of re-ordering and emittingout-of-orderdataright away when read (instead of blocking them behind in-orderrecordsthat are not ready yet). -- It might even be a possibility, toletuserspick a emit strategy eg "EmitStrategy.preserveOffsets" (namejustaplaceholder). The KIP should explain this in more detail and also discussdifferentoptions and mention them in "Rejected alternatives" in case wedon'twant to include them.50) What happens when users change the grace period? Especially,whenthey turn it on/off (but also increasing/decreasing is aninterestingpoint)? I think we should try to support this if possible; the"Compatibility" section needs to cover switching on/off in moredetail.-Matthias On 5/2/23 2:06 PM, Victoria Xia wrote:Cool KIP, Walker! Thanks for sharing this proposal. A few clarifications:1. Is the order that records exit the buffer in necessarily thesameastheorder that records enter the buffer in, or no? Based on thedescriptioninthe KIP, it sounds like the answer is no, i.e., records willexitthebuffer in increasing timestamp order, which means that they maybeordered(even for the same key) compared to the input order. 2. What happens if the join grace period is nonzero, and astream-siderecord arrives with a timestamp that is older than the currentstreamtimeminus the grace period? Will this record trigger a join result,orwillitbe dropped? Based on the description for what happens when thejoingraceperiod is set to zero, it sounds like the late record will bedropped,evenif the join grace period is nonzero. Is that true? 3. What could cause stream time to advance, for purposes ofremovingrecords from the join buffer? For example, will new recordsarrivingonthetable side of the join cause stream time to advance? From theKIPitsoundslike only stream-side records will advance stream time -- doesthatmeanthat the join processor itself will have to track this streamtime?Also +1 to Lucas's question about what options will beavailableforconfiguring the join buffer. Will users have the option tochoosewhetherthey want the buffer to be in-memory vs persistent? - Victoria On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy <lbruts...@confluent.io.invalid> wrote:HI Walker, thanks for the KIP! We definitely need this. I have twoquestions:- Have you considered allowing the customization of theunderlyingbuffer implementation? As I can see, `StreamJoined` lets youcustomizethe underlying store via a `WindowStoreSupplier`. Would itmakesensefor `Joined` to have this as well? I can imagine one may wanttolimitthe number of records in the buffer, for example. If we hitthemaximum, the only option would be to drop semantic guarantees,butusers may still want to do this. - With "second option on the table side" you arereferringtoversioned tables, right? Will the buffer on the stream sidebehaveanydifferent whether the table side is versioned or not? Finally, I think a simple example in the motivation sectioncouldhelpnon-experts understand the KIP. Best, Lucas On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson <wcarl...@confluent.io.invalid> wrote:Hello everybody,I have a stream proposal to improve the stream table join byadding agraceperiod and buffer to the stream side of the join to allowprocessingintimestamp order matching the recent improvements of theversionedtables.Please take a look here <https://cwiki.apache.org/confluence/x/lAs0Dw>andshare your thoughts. best, Walker