Just to clarify I'll update `as(StoreSupplier, StoreSupplier)` to `with(..., ...)` and change the class name from `StreamJoined` to `StreamJoin`
-Bill On Tue, Sep 17, 2019 at 2:09 PM Bill Bejeck <bbej...@gmail.com> wrote: > Thanks for the comments, Matthias. > > I like both suggestions regarding the names and I'll update the KIP > accordingly. > > On Tue, Sep 17, 2019 at 11:22 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks Bill! >> >> Using a new configuration object was suggested by John in the original >> DISCUSS thread already. We rejected it because we wanted to avoid a new >> configuration class. However, given your analysis, it seems it's >> actually the right choice to introduce a new configuration class. >> >> Hence, overall I am +1 on the proposal. >> >> >> Some nits about names (as always :)) >> >> - `StreamJoined.as()` does not sound right for the `StoreSupplier` >> overload. Maybe it's better to call that static method `with` (not 100% >> sure) >> >> - Should we use plural instead of singular -> `Stream{s}Joined` >> -> or keep singular but call it `StreamJoin` >> `Joined` seems to refer to the input stream(s) while `Join` would >> refer to the join-operator >> >> Thoughts? >> >> >> You suggest to deprecate existing overload what I support -- can you >> list deprecated method in the "Public Interface" section? >> >> >> >> -Matthias >> >> >> >> On 9/17/19 9:39 AM, Guozhang Wang wrote: >> > Hello Bill, >> > >> > Thanks for the updated KIP. I made a pass on the StreamJoined section. >> Just >> > a quick question from user's perspective: if a user wants to provide a >> > customized store-supplier, she is forced to also provide a name via the >> > store-supplier. So there's no way to say "I want to provide my own store >> > engine but let the library decide its name", is that right? >> > >> > >> > Guozhang >> > >> > >> > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbej...@gmail.com> wrote: >> > >> >> Bumping this discussion as we need to re-vote before the KIP deadline. >> >> >> >> On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbej...@gmail.com> >> wrote: >> >> >> >>> Hi All, >> >>> >> >>> While working on the implementation of KIP-479, some issues came to >> light >> >>> that the KIP as written won't work. I have updated the KIP with a >> >> solution >> >>> I believe will solve the original problem as well as address the >> >>> impediment to the initial approach. >> >>> >> >>> This update is a significant change, so please review the updated KIP >> >>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join >> >> and >> >>> provide feedback. After we conclude the discussion, there will be a >> >>> re-vote. >> >>> >> >>> Thanks! >> >>> Bill >> >>> >> >>> On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <wangg...@gmail.com> >> >> wrote: >> >>> >> >>>> Hi Bill, thanks for your explanations. I'm on board with your >> decision >> >>>> too. >> >>>> >> >>>> >> >>>> Guozhang >> >>>> >> >>>> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbej...@gmail.com> >> wrote: >> >>>> >> >>>>> Thanks for the response, John. >> >>>>> >> >>>>>> If I can offer my thoughts, it seems better to just document on the >> >>>>>> Stream join javadoc for the Materialized parameter that it will not >> >>>>>> make the join result queriable. I'm not opposed to the queriable >> >> flag >> >>>>>> in general, but introducing it is a much larger consideration that >> >> has >> >>>>>> previously derailed this KIP discussion. In the interest of just >> >>>>>> closing the gap and keeping the API change small, it seems better >> to >> >>>>>> just go with documentation for now. >> >>>>> >> >>>>> I agree with your statement here. IMHO the most important goal of >> >> this >> >>>> KIP >> >>>>> is to not breaking existing users and gain some consistency of the >> >> API. >> >>>>> >> >>>>> I'll update the KIP accordingly. >> >>>>> >> >>>>> -Bill >> >>>>> >> >>>>> On Tue, Jul 16, 2019 at 11:55 AM John Roesler <j...@confluent.io> >> >>>> wrote: >> >>>>> >> >>>>>> Hi Bill, >> >>>>>> >> >>>>>> Thanks for driving this KIP toward a conclusion. I'm on board with >> >>>>>> your decision. >> >>>>>> >> >>>>>> You didn't mention whether you're still proposing to add the >> >>>>>> "queriable" flag to the Materialized config object, or just >> document >> >>>>>> that a Stream join is never queriable. Both options have come up >> >>>>>> earlier in the discussion, so it would be good to pin this down. >> >>>>>> >> >>>>>> If I can offer my thoughts, it seems better to just document on the >> >>>>>> Stream join javadoc for the Materialized parameter that it will not >> >>>>>> make the join result queriable. I'm not opposed to the queriable >> >> flag >> >>>>>> in general, but introducing it is a much larger consideration that >> >> has >> >>>>>> previously derailed this KIP discussion. In the interest of just >> >>>>>> closing the gap and keeping the API change small, it seems better >> to >> >>>>>> just go with documentation for now. >> >>>>>> >> >>>>>> Thanks again, >> >>>>>> -John >> >>>>>> >> >>>>>> On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <bbej...@gmail.com> >> >>>> wrote: >> >>>>>>> >> >>>>>>> Thanks all for the great discussion so far. >> >>>>>>> >> >>>>>>> Everyone has made excellent points, and I appreciate the detail >> >>>>> everyone >> >>>>>>> has put into their arguments. >> >>>>>>> >> >>>>>>> However, after carefully evaluating all the points made so far, >> >>>>> creating >> >>>>>> an >> >>>>>>> overload with Materialized is still my #1 option. >> >>>>>>> My reasoning for saying so is two-fold: >> >>>>>>> >> >>>>>>> 1. It's a small change, and IMHO since it's consistent with our >> >>>>>> current >> >>>>>>> API concerning state store usage, the cognitive load on users >> >>>> will >> >>>>> be >> >>>>>>> minimal. >> >>>>>>> 2. It achieves the most important goal of this KIP, namely to >> >>>> close >> >>>>>> the >> >>>>>>> gap of naming state stores independently of the join operator >> >>>> name. >> >>>>>>> >> >>>>>>> Additionally, I agree with the points made by Matthias earlier (I >> >>>>> realize >> >>>>>>> there is some overlap here). >> >>>>>>> >> >>>>>>>> - the main purpose of this KIP is to close the naming gap what >> >> we >> >>>>>> achieve >> >>>>>>>> - we can allow people to use the new in-memory store >> >>>>>>>> - we allow people to enable/disable caching >> >>>>>>>> - we unify the API >> >>>>>>>> - we decouple querying from naming >> >>>>>>>> - it's a small API change >> >>>>>>> >> >>>>>>> Although it's not a perfect solution, IMHO the positives of using >> >>>>>>> Materialize far outweigh the negatives, and from what we've >> >>>> discussed >> >>>>> so >> >>>>>>> far, anything we implement seems to involve an additional change >> >>>> down >> >>>>> the >> >>>>>>> road. >> >>>>>>> >> >>>>>>> If others are still strongly opposed to using Materialized, my >> >> other >> >>>>>>> preferences would be >> >>>>>>> >> >>>>>>> 1. Add a "withStoreName" to Joined. Although I agree with >> >>>> Guozhang >> >>>>>> that >> >>>>>>> having a parameter that only applies to one use-case would be >> >>>>> clumsy. >> >>>>>>> 2. Add a String overload for naming the store, but this would >> >> be >> >>>> my >> >>>>>>> least favorite option as IMHO this seems to be a step backward >> >>>> from >> >>>>>> why we >> >>>>>>> introduced configuration objects in the first place. >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Bill >> >>>>>>> >> >>>>>>> On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax < >> >>>> matth...@confluent.io >> >>>>>> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Thanks for the KIP Bill! >> >>>>>>>> >> >>>>>>>> Great discussion to far. >> >>>>>>>> >> >>>>>>>> About John's idea about querying upstream stores and don't >> >>>>> materialize >> >>>>>> a >> >>>>>>>> store: I agree with Bill that this seems to be an orthogonal >> >>>>> question, >> >>>>>>>> and it might be better to treat it as an independent >> >> optimization >> >>>> and >> >>>>>>>> exclude from this KIP. >> >>>>>>>> >> >>>>>>>>> What should be the behavior if there is no store >> >>>>>>>>> configured (e.g., if Materialized with only serdes) and >> >>>> querying is >> >>>>>>>>> enabled? >> >>>>>>>> >> >>>>>>>> IMHO, this could be an error case. If one wants to query a >> >> store, >> >>>>> they >> >>>>>>>> need to provide a name -- if you don't know the name, how would >> >>>> you >> >>>>>>>> actually query the store (even if it would be possible to get >> >> the >> >>>>> name >> >>>>>>>> from the `TopologyDescription`, it seems clumsy). >> >>>>>>>> >> >>>>>>>> If we don't want to throw an error, materializing seems to be >> >> the >> >>>>> right >> >>>>>>>> option, to exclude "query optimization" from this KIP. I would >> >> be >> >>>> ok >> >>>>>>>> with this option, even if it's clumsy to get the name from >> >>>>>>>> `TopologyDescription`; hence, I would prefer to treat it as an >> >>>> error. >> >>>>>>>> >> >>>>>>>>> To get back to the current behavior, users would have to >> >>>>>>>>> add a "bytes store supplier" to the Materialized to indicate >> >>>> that, >> >>>>>>>>> yes, they really want a state store there. >> >>>>>>>> >> >>>>>>>> This sound like a quite subtle semantic difference on how to use >> >>>> the >> >>>>>>>> API. Might be hard to explain to users. I would prefer to not >> >>>>>> introduce it. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> About Guozhang's points: >> >>>>>>>> >> >>>>>>>> 1a) That is actually a good point. However, I believe we cannot >> >>>> get >> >>>>>>>> around this issue easily, and it seems ok to me, to expose the >> >>>> actual >> >>>>>>>> store type we are using. (More thoughts later.) >> >>>>>>>> >> >>>>>>>> 1b) I don't see an issue with allowing users to query all >> >> stores? >> >>>>> What >> >>>>>>>> is the rational behind it? What do we gain by not allowing it? >> >>>>>>>> >> >>>>>>>> 2) While I understand what you are saying, we also want/need to >> >>>> have >> >>>>> a >> >>>>>>>> way in the PAPI to allow users adding "internal/private" >> >>>>> non-queryable >> >>>>>>>> stores to a topology. That's possible via >> >>>>>>>> `Materialized#withQueryingDisabled()`. We could also update >> >>>>>>>> `Topology#addStateStore(StoreBuilder, boolean isQueryable, >> >>>>> String...)` >> >>>>>>>> to address this. Again, I agree with Bill that the current API >> >> is >> >>>>> built >> >>>>>>>> in a certain way, and if we want to change it, it should be a >> >>>>> separate >> >>>>>>>> KIP, as it seems to be an orthogonal concern. >> >>>>>>>> >> >>>>>>>>> Instead, we just restrict KIP-307 to NOT >> >>>>>>>>> use the Joined.name for state store names and always use >> >>>> internal >> >>>>>> names >> >>>>>>>> as >> >>>>>>>>> well, which admittedly indeed leaves a hole of not being able >> >> to >> >>>>>> cover >> >>>>>>>> all >> >>>>>>>>> internal names here >> >>>>>>>> >> >>>>>>>> I think it's important to close this gap. Naming entities seems >> >>>> to a >> >>>>>>>> binary feature: if there is a gap, the feature is more or less >> >>>>> useless, >> >>>>>>>> rendering KIP-307 void. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> I like John's detailed list of required features and what >> >>>>>>>> Materialized/WindowByteStoreSuppliers offers. My take is, that >> >>>> adding >> >>>>>>>> Materialized including the required run-time checks is the best >> >>>>> option >> >>>>>>>> we have, for the following reasons: >> >>>>>>>> >> >>>>>>>> - the main purpose of this KIP is to close the naming gap what >> >> we >> >>>>>> achieve >> >>>>>>>> - we can allow people to use the new in-memory store >> >>>>>>>> - we allow people to enable/disable caching >> >>>>>>>> - we unify the API >> >>>>>>>> - we decouple querying from naming >> >>>>>>>> - it's a small API change >> >>>>>>>> >> >>>>>>>> Adding an overload and only passing in a name, would address the >> >>>> main >> >>>>>>>> purpose of the KIP. However, it falls short on all the other >> >>>>> "goodies". >> >>>>>>>> As you mentioned, passing in `Materialized` might not be perfect >> >>>> and >> >>>>>>>> maybe we need to deprecate is at some point; but this is also >> >> true >> >>>>> for >> >>>>>>>> passing in just a name. >> >>>>>>>> >> >>>>>>>> I am also not convinced, that a `StreamJoinStore` would resolve >> >>>> all >> >>>>> the >> >>>>>>>> issues. In the end, as long as we are using a `WindowedStore` >> >>>>>>>> internally, we need to expose this "implemenation detail" to >> >>>> users to >> >>>>>>>> allow them to plug in a custom store. Adding `Materialized` seem >> >>>> to >> >>>>> be >> >>>>>>>> the best short-term fix from my point of view. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 6/27/19 9:56 AM, Guozhang Wang wrote: >> >>>>>>>>> Hi John, >> >>>>>>>>> >> >>>>>>>>> I actually feels better about a new interface but I'm not sure >> >>>> if >> >>>>> we >> >>>>>>>> would >> >>>>>>>>> need the full configuration of store / log / cache, now or in >> >>>> the >> >>>>>> future >> >>>>>>>>> ever for stream-stream join. >> >>>>>>>>> >> >>>>>>>>> Right now I feel that 1) we want to improve our implementation >> >>>> of >> >>>>>>>>> stream-stream join, and potentially also allow users to >> >>>> customize >> >>>>>> this >> >>>>>>>>> implementation but with a more suitable interface than the >> >>>> current >> >>>>>>>>> WindowStore interface, how to do that is less clear and >> >>>>>> execution-wise >> >>>>>>>> it's >> >>>>>>>>> (arguably..) not urgent; 2) we want to close the last gap >> >>>>>> (Stream-stream >> >>>>>>>>> join) of allowing users to specify all internal names to help >> >> on >> >>>>>> backward >> >>>>>>>>> compatibility, which is urgent. >> >>>>>>>>> >> >>>>>>>>> Therefore if we want to unblock 2) from 1) in the near term, I >> >>>> feel >> >>>>>>>>> slightly inclined to just add overload functions that takes >> >> in a >> >>>>>> store >> >>>>>>>> name >> >>>>>>>>> for stream-stream joins only -- and admittedly, in the future >> >>>> this >> >>>>>>>> function >> >>>>>>>>> maybe deprecated -- i.e. if we have to do something that we >> >> "may >> >>>>>> regret" >> >>>>>>>> in >> >>>>>>>>> the future, I'd like to pick the least intrusive option. >> >>>>>>>>> >> >>>>>>>>> About `Joined#withStoreName`: since the Joined class itself is >> >>>> also >> >>>>>> used >> >>>>>>>> in >> >>>>>>>>> other join types, I feel less comfortable to have a >> >>>>>>>> `Joined#withStoreName` >> >>>>>>>>> which is only going to be used by stream-stream join. Or >> >> maybe I >> >>>>> miss >> >>>>>>>>> something here about the "latter" case that you are referring >> >>>> to? >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Guozhang >> >>>>>>>>> >> >>>>>>>>> On Mon, Jun 24, 2019 at 12:16 PM John Roesler < >> >>>> j...@confluent.io> >> >>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Thanks Guozhang, >> >>>>>>>>>> >> >>>>>>>>>> Yep. Maybe we can consider just exactly what the join needs: >> >>>>>>>>>> >> >>>>>>>>>>> the WindowStore<Bytes, byte[]> itself is fine, if overly >> >>>> broad, >> >>>>>>>>>>> since the only two methods we need are `window.put(key, >> >> value, >> >>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> iter = >> >>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`. >> >>>>>>>>>> >> >>>>>>>>>> One "middle ground" would be to extract _this_ into a new >> >> store >> >>>>>>>>>> interface, which only supports these API calls, like >> >>>>>>>>>> StreamJoinStore<K, V>. This would give us the latitude we >> >> need >> >>>> to >> >>>>>>>>>> efficiently support the exact operation without concerning >> >>>>> ourselves >> >>>>>>>>>> with all the other things a WindowStore can do (which are >> >>>>>> unreachable >> >>>>>>>>>> for the join use case). It would also let us drop "store >> >>>>> duplicates" >> >>>>>>>>>> from the main WindowStore interface, since it only exists to >> >>>>> support >> >>>>>>>>>> the join use case. >> >>>>>>>>>> >> >>>>>>>>>> If we were to add a new StreamJoinStore interface, then it'd >> >> be >> >>>>>>>>>> straightforward how we could add also >> >>>>>>>>>> `Materialized.as(StreamJoinBytesStoreSupplier)` and use >> >>>>>> Materialized, >> >>>>>>>>>> or alternatively add the ability to set the bytes store on >> >>>> Joined. >> >>>>>>>>>> >> >>>>>>>>>> Personally, I'm kind of leaning toward the latter (and also >> >>>> doing >> >>>>>>>>>> `Joined#withStoreName`), since adding the new interface to >> >>>>>>>>>> Materialized then also pollutes the interface for its >> >> _actual_ >> >>>> use >> >>>>>>>>>> case of materializing a table view. Of course, to solve the >> >>>>>> immediate >> >>>>>>>>>> problem, all we need is the store name, but we might feel >> >>>> better >> >>>>>> about >> >>>>>>>>>> adding the store name to Joined if we _also_ feel like in the >> >>>>>> future, >> >>>>>>>>>> we would add store/log/cache configuration to Joined as well. >> >>>>>>>>>> >> >>>>>>>>>> -John >> >>>>>>>>>> >> >>>>>>>>>> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang < >> >>>>> wangg...@gmail.com> >> >>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> Hello John, >> >>>>>>>>>>> >> >>>>>>>>>>> My main concern is exactly the first point at the bottom of >> >>>> your >> >>>>>>>> analysis >> >>>>>>>>>>> here: "* configure the bytes store". I'm not sure if using a >> >>>>> window >> >>>>>>>> bytes >> >>>>>>>>>>> store would be ideal for stream-stream windowed join; e.g. >> >> we >> >>>>> could >> >>>>>>>>>>> consider two dimensional list sorted by timestamps and then >> >> by >> >>>>>> keys to >> >>>>>>>> do >> >>>>>>>>>>> the join, whereas a windowed bytes store is basically sorted >> >>>> by >> >>>>> key >> >>>>>>>>>> first, >> >>>>>>>>>>> then by timestamp. If we expose the Materialized to let user >> >>>> pass >> >>>>>> in a >> >>>>>>>>>>> windowed bytes store, then we would need to change that if >> >> we >> >>>>> want >> >>>>>> to >> >>>>>>>>>>> replace it with a different implementation interface. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> Guozhang >> >>>>>>>>>>> >> >>>>>>>>>>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler < >> >>>> j...@confluent.io> >> >>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hey Guozhang and Bill, >> >>>>>>>>>>>> >> >>>>>>>>>>>> For what it's worth, I agree with you both! >> >>>>>>>>>>>> >> >>>>>>>>>>>> I think it might help the discussion to look concretely at >> >>>> what >> >>>>>>>>>>>> Materialized does: >> >>>>>>>>>>>> * set a WindowBytesStoreSupplier >> >>>>>>>>>>>> * set a name >> >>>>>>>>>>>> * set the key/value serdes >> >>>>>>>>>>>> * disable/enable/configure change-logging >> >>>>>>>>>>>> * disable/enable caching >> >>>>>>>>>>>> * configure retention >> >>>>>>>>>>>> >> >>>>>>>>>>>> Further, looking into the WindowBytesStoreSupplier, the >> >>>>> interface >> >>>>>> lets >> >>>>>>>>>> you: >> >>>>>>>>>>>> * get the segment interval >> >>>>>>>>>>>> * get the window size >> >>>>>>>>>>>> * get whether "duplicates" are enabled >> >>>>>>>>>>>> * get the retention period >> >>>>>>>>>>>> * (obviously) get a WindowStore<Bytes, byte[]> >> >>>>>>>>>>>> >> >>>>>>>>>>>> We know that Materialized isn't exactly what we need for >> >>>> stream >> >>>>>> joins, >> >>>>>>>>>>>> but we can see how close Materialized is to what we need. >> >> If >> >>>> it >> >>>>> is >> >>>>>>>>>>>> close, maybe we can use it and document the gaps, and if it >> >>>> is >> >>>>> not >> >>>>>>>>>>>> close, then maybe we should just add what we need to >> >> Joined. >> >>>>>>>>>>>> Stream Join's requirements for its stores: >> >>>>>>>>>>>> * a multimap store (i.e., it keeps duplicates) for storing >> >>>>> general >> >>>>>>>>>>>> (not windowed) keyed records associated with their >> >> insertion >> >>>>>> time, and >> >>>>>>>>>>>> allows efficient time-bounded lookups and also efficient >> >>>> purges >> >>>>>> of old >> >>>>>>>>>>>> data. >> >>>>>>>>>>>> ** Note, a properly configured WindowBytesStoreSupplier >> >>>>> satisfies >> >>>>>> this >> >>>>>>>>>>>> requirement, and the interface supports the queries we need >> >>>> to >> >>>>>> verify >> >>>>>>>>>>>> the configuration at run-time >> >>>>>>>>>>>> * set a name for the store >> >>>>>>>>>>>> * do _not_ set the serdes (they are already set in Joined) >> >>>>>>>>>>>> * logging could be configurable (set to enabled now) >> >>>>>>>>>>>> * caching could be configurable (set to enabled now) >> >>>>>>>>>>>> * do _not_ configure retention (determined by JoinWindows) >> >>>>>>>>>>>> >> >>>>>>>>>>>> So, out of six capabilities for Materialized, there are two >> >>>> we >> >>>>>> don't >> >>>>>>>>>>>> want (serdes and retention). These would become run-time >> >>>> checks >> >>>>>> if we >> >>>>>>>>>>>> use it. >> >>>>>>>>>>>> >> >>>>>>>>>>>> A third questionable capability is to provide a >> >>>>>>>>>>>> WindowBytesStoreSupplier. Looking at whether the >> >>>>>>>>>>>> WindowBytesStoreSupplier is the right interface for Stream >> >>>> Join: >> >>>>>>>>>>>> * configuring segment interval is fine >> >>>>>>>>>>>> * should _not_ configure window size (it's determined by >> >>>>>> JoinWindows) >> >>>>>>>>>>>> * duplicates _must_ be enabled >> >>>>>>>>>>>> * retention should be _at least_ windowSize + gracePeriod, >> >>>> but >> >>>>>> note >> >>>>>>>>>>>> that (unlike for Table window stores) there is no utility >> >> in >> >>>>>> having a >> >>>>>>>>>>>> longer retention time. >> >>>>>>>>>>>> * the WindowStore<Bytes, byte[]> itself is fine, if overly >> >>>>> broad, >> >>>>>>>>>>>> since the only two methods we need are `window.put(key, >> >>>> value, >> >>>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> iter = >> >>>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thus, flattening out the overlap for >> >> WindowBytesStoreSupplier >> >>>>>> onto the >> >>>>>>>>>>>> overlap for Materialized, we have 9 capabilities total >> >> (note >> >>>>>> retention >> >>>>>>>>>>>> is duplicated), we have 4 that we don't want: >> >>>>>>>>>>>> * do _not_ set the serdes (they are already set in Joined) >> >>>>>>>>>>>> * do _not_ configure retention (determined by JoinWindows) >> >>>>>>>>>>>> * should _not_ configure window size (it's determined by >> >>>>>> JoinWindows) >> >>>>>>>>>>>> * duplicates _must_ be enabled >> >>>>>>>>>>>> >> >>>>>>>>>>>> These gaps would have to be covered with run-time checks if >> >>>> we >> >>>>>> re-use >> >>>>>>>>>>>> Materialized and WindowStoreBytesStoreSupplier both. Maybe >> >>>> this >> >>>>>> sounds >> >>>>>>>>>>>> bad, but consider the other side, that we get 5 new >> >>>> capabilities >> >>>>>> we >> >>>>>>>>>>>> don't require, but are still pretty nice: >> >>>>>>>>>>>> * configure the bytes store >> >>>>>>>>>>>> * set a name for the store >> >>>>>>>>>>>> * configure caching >> >>>>>>>>>>>> * configure logging >> >>>>>>>>>>>> * configure segment interval >> >>>>>>>>>>>> >> >>>>>>>>>>>> Not sure where this nets us out, but it's food for thought. >> >>>>>>>>>>>> -John >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang < >> >>>>> wangg...@gmail.com >> >>>>>>> >> >>>>>>>>>> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Bill, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I think by giving a Materialized param into stream-stream >> >>>> join, >> >>>>>> it's >> >>>>>>>>>> okay >> >>>>>>>>>>>>> (though still ideal) to say "we still would not expose the >> >>>>> store >> >>>>>> for >> >>>>>>>>>>>>> queries", but it would sound a bit awkward to say "we >> >> would >> >>>>> also >> >>>>>>>>>> ignore >> >>>>>>>>>>>>> whatever the passed in store supplier but just use our >> >>>> default >> >>>>>> ones" >> >>>>>>>>>> -- >> >>>>>>>>>>>>> again the concern is that, if in the future we'd want to >> >>>> change >> >>>>>> the >> >>>>>>>>>>>> default >> >>>>>>>>>>>>> implementation of join algorithm which no longer rely on a >> >>>>> window >> >>>>>>>>>> store >> >>>>>>>>>>>>> with deduping enabled, then we need to change this API >> >>>> again by >> >>>>>>>>>> changing >> >>>>>>>>>>>>> the store supplier type. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> If we do want to fill this hole for stream-stream join, I >> >>>> feel >> >>>>>> just >> >>>>>>>>>>>> adding >> >>>>>>>>>>>>> a String typed store-name would even be less >> >>>> future-intrusive >> >>>>> if >> >>>>>> we >> >>>>>>>>>>>> expect >> >>>>>>>>>>>>> this parameter to be modified later. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Does that makes sense? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Guozhang >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck < >> >>>>> bbej...@gmail.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> Thanks for the comments John and Guozhang, I'll address >> >>>> each >> >>>>>> one of >> >>>>>>>>>>>> your >> >>>>>>>>>>>>>> comments in turn. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> John, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the truth >> >>>> table >> >>>>>>>>>> involving >> >>>>>>>>>>>>>>> whether a Materialized is stored or not and querying is >> >>>>>>>>>>>>>>> enabled/disabled... What should be the behavior if there >> >>>> is >> >>>>> no >> >>>>>>>>>> store >> >>>>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) and >> >>>>>> querying >> >>>>>>>>>> is >> >>>>>>>>>>>>>> enabled? >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> It seems we have two choices: >> >>>>>>>>>>>>>>> 1. we can force creation of a state store in this case, >> >> so >> >>>>> the >> >>>>>>>>>> store >> >>>>>>>>>>>>>>> can be used to serve the queries >> >>>>>>>>>>>>>>> 2. we can provide just a queriable view, basically >> >>>> letting IQ >> >>>>>>>>>> query >> >>>>>>>>>>>>>>> into the "KTableValueGetter", which would transparently >> >>>>>>>>>> construct the >> >>>>>>>>>>>>>>> query response by applying the operator logic to the >> >>>> upstream >> >>>>>>>>>> state >> >>>>>>>>>>>> if >> >>>>>>>>>>>>>>> the operator state isn't already stored. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I agree with your assertion about a missing quadrant from >> >>>> the >> >>>>>> truth >> >>>>>>>>>>>> table. >> >>>>>>>>>>>>>> Additionally, I too like the concept of a queriable view. >> >>>>> But I >> >>>>>>>>>> think >> >>>>>>>>>>>> that >> >>>>>>>>>>>>>> goes a bit beyond the scope of this KIP and would like to >> >>>>> pursue >> >>>>>>>>>> that >> >>>>>>>>>>>>>> feature as follow-on work. Also thinking about this KIP >> >>>> some >> >>>>>>>>>> more, I'm >> >>>>>>>>>>>>>> thinking of the changes to Materialized might be a reach >> >> as >> >>>>>> well. >> >>>>>>>>>>>>>> Separating the naming from a store and its queryable >> >> state >> >>>>> seems >> >>>>>>>>>> like a >> >>>>>>>>>>>>>> complex issue in and of itself and should be treated >> >>>>>> accordingly. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> So here's what I'm thinking now. We add Materialzied to >> >>>> Join, >> >>>>>> but >> >>>>>>>>>> for >> >>>>>>>>>>>> now, >> >>>>>>>>>>>>>> we internally disable querying. I know this breaks our >> >>>>> current >> >>>>>>>>>>>> semantic >> >>>>>>>>>>>>>> approach, but I think it's crucial that we do two things >> >> in >> >>>>> this >> >>>>>>>>>> KIP >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 1. Break the naming of the state stores from Joined to >> >>>>>>>>>>>> Materialized, so >> >>>>>>>>>>>>>> the naming of state stores follows our current pattern >> >>>> and >> >>>>>>>>>> enables >> >>>>>>>>>>>>>> upgrades >> >>>>>>>>>>>>>> from 2.3 to 2.4 >> >>>>>>>>>>>>>> 2. Offer the ability to configure the state stores of >> >>>> the >> >>>>>> join, >> >>>>>>>>>> even >> >>>>>>>>>>>>>> providing a different implementation (i.e. in-memory) >> >> if >> >>>>>>>>>> desired. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> With that in mind I'm considering changing the KIP to >> >>>> remove >> >>>>> the >> >>>>>>>>>>>> changes to >> >>>>>>>>>>>>>> Materialized, and we document very clearly that by >> >>>> providing a >> >>>>>>>>>>>> Materialized >> >>>>>>>>>>>>>> object with a name is only for naming the state store, >> >>>> hence >> >>>>> the >> >>>>>>>>>>>> changelog >> >>>>>>>>>>>>>> topics and any possible configurations of the store, but >> >>>> this >> >>>>>> store >> >>>>>>>>>>>> *will >> >>>>>>>>>>>>>> not be available for IQ.* >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> WDYT? >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Guozhang, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 1. About not breaking compatibility of stream-stream >> >> join >> >>>>>>>>>>>> materialized >> >>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, but >> >> after >> >>>>>>>>>> thinking >> >>>>>>>>>>>> about >> >>>>>>>>>>>>>>> it once more I'm not sure if exposing Materialized would >> >>>> be a >> >>>>>>>>>> good >> >>>>>>>>>>>>>> solution >> >>>>>>>>>>>>>>> here. My rationles: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of >> >>>> window-store >> >>>>>> is >> >>>>>>>>>> not >> >>>>>>>>>>>>>> ideal, >> >>>>>>>>>>>>>>> and we want to modify it in the near future to be more >> >>>>>>>>>> efficient. Not >> >>>>>>>>>>>>>>> allowing users to override such state store backend >> >> gives >> >>>> us >> >>>>>> such >> >>>>>>>>>>>> freedom >> >>>>>>>>>>>>>>> (which was also considered in the original DSL design), >> >>>>> whereas >> >>>>>>>>>>>> getting a >> >>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out that >> >> freedom >> >>>>> out >> >>>>>>>>>> of the >> >>>>>>>>>>>>>>> window. >> >>>>>>>>>>>>>>> 1.b For strema-stream join, in our original design we >> >>>> intend >> >>>>> to >> >>>>>>>>>>>> "never" >> >>>>>>>>>>>>>>> want users to query the state, since it is just for >> >>>> buffering >> >>>>>> the >> >>>>>>>>>>>>>> upcoming >> >>>>>>>>>>>>>>> records from the stream. Now I know that some users may >> >>>>> indeed >> >>>>>>>>>> want >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>>> query it from the debugging perspective, but still I >> >>>>> concerned >> >>>>>>>>>> about >> >>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes would be >> >> the >> >>>>> right >> >>>>>>>>>>>> solution >> >>>>>>>>>>>>>>> here. And adding Materialized object opens the door to >> >> let >> >>>>>> users >> >>>>>>>>>>>> query >> >>>>>>>>>>>>>>> about it (unless we did something intentionally to still >> >>>>>> forbids >> >>>>>>>>>> it), >> >>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>> also restricts us in the future. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 2. About the coupling between Materialized.name() and >> >>>>>> queryable: >> >>>>>>>>>>>> again I >> >>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if the >> >>>> current >> >>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the >> >>>> best >> >>>>>>>>>>>> approach. >> >>>>>>>>>>>>>>> Here I think I agree with John, that generally speaking >> >>>> it's >> >>>>>>>>>> better >> >>>>>>>>>>>> be a >> >>>>>>>>>>>>>>> control function on the `KTable` itself, rather than on >> >>>>>>>>>>>> `Materialized`, >> >>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>> fixing it via adding functions through `Materialized` >> >>>> seems >> >>>>>> not a >> >>>>>>>>>>>> natural >> >>>>>>>>>>>>>> approach either. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I understand your thoughts here, and up to a point, I >> >> agree >> >>>>> with >> >>>>>>>>>> you. >> >>>>>>>>>>>>>> But concerning not providing Materialized as it may >> >>>> restrict >> >>>>> us >> >>>>>> in >> >>>>>>>>>> the >> >>>>>>>>>>>>>> future for delivering different implementations, I'm >> >>>> wondering >> >>>>>> if >> >>>>>>>>>> we >> >>>>>>>>>>>> are >> >>>>>>>>>>>>>> doing some premature optimization here. >> >>>>>>>>>>>>>> My rationale for saying so >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> 1. I think the cost of not allowing the naming of >> >> state >> >>>>>> stores >> >>>>>>>>>> for >> >>>>>>>>>>>> joins >> >>>>>>>>>>>>>> is too big of a gap to leave. IMHO for joins to >> >> follow >> >>>>> the >> >>>>>>>>>> current >> >>>>>>>>>>>>>> pattern of using Materialized for naming state stores >> >>>> would >> >>>>>> be >> >>>>>>>>>> what >> >>>>>>>>>>>> most >> >>>>>>>>>>>>>> users would expect to use. As I said in my comments >> >>>>> above, I >> >>>>>>>>>> think >> >>>>>>>>>>>> we >> >>>>>>>>>>>>>> should *not include* the changes to Materialized and >> >>>>> enforce >> >>>>>>>>>> named >> >>>>>>>>>>>>>> stores for joins as unavailable for IQ. >> >>>>>>>>>>>>>> 2. We'll still have the join methods available >> >> without a >> >>>>>>>>>>>> Materialized >> >>>>>>>>>>>>>> allowing us to do something different internally if a >> >>>>>>>>>> Materialized >> >>>>>>>>>>>> is >> >>>>>>>>>>>>>> not >> >>>>>>>>>>>>>> provided. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use two >> >> stones >> >>>>>> rather >> >>>>>>>>>>>> than >> >>>>>>>>>>>>>> one >> >>>>>>>>>>>>>>> to kill these two birds, and probably for this KIP we >> >> just >> >>>>>> focus >> >>>>>>>>>> on >> >>>>>>>>>>>> 1) >> >>>>>>>>>>>>>>> above. And for that I'd like to not expose the >> >>>> Materialized >> >>>>>>>>>> either >> >>>>>>>>>>>> for >> >>>>>>>>>>>>>>> rationales that I've listed above. Instead, we just >> >>>> restrict >> >>>>>>>>>> KIP-307 >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>> NOT >> >>>>>>>>>>>>>>> use the Joined.name for state store names and always use >> >>>>>> internal >> >>>>>>>>>>>> names >> >>>>>>>>>>>>>> as >> >>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not being >> >>>> able >> >>>>>> to >> >>>>>>>>>>>> cover >> >>>>>>>>>>>>>> all >> >>>>>>>>>>>>>>> internal names here, but now I feel this `hole` may >> >>>> better be >> >>>>>>>>>> filled >> >>>>>>>>>>>> by, >> >>>>>>>>>>>>>>> e.g. not creating changelog topics but just use the >> >>>> upstream >> >>>>> to >> >>>>>>>>>>>>>>> re-bootstrap the materialized store, more concretely: >> >> when >> >>>>>>>>>>>> materializing >> >>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic on an >> >>>>> existing >> >>>>>>>>>>>> topic, >> >>>>>>>>>>>>>> e.g. >> >>>>>>>>>>>>>>> a) if the stream is coming directly from some source >> >> topic >> >>>>>>>>>> (including >> >>>>>>>>>>>>>>> repartition topic), make that as changelog topic and if >> >>>> it is >> >>>>>>>>>>>> repartition >> >>>>>>>>>>>>>>> topic change the retention / data purging policy >> >>>> necessarily >> >>>>> as >> >>>>>>>>>>>> well; b) >> >>>>>>>>>>>>>> if >> >>>>>>>>>>>>>>> the stream is coming from some stateless operators, >> >>>> delegate >> >>>>>> that >> >>>>>>>>>>>>>> stateless >> >>>>>>>>>>>>>>> operator to the parent stream similar as a); if the >> >>>> stream is >> >>>>>>>>>> coming >> >>>>>>>>>>>> from >> >>>>>>>>>>>>>> a >> >>>>>>>>>>>>>>> stream-stream join which is the only stateful operator >> >>>> that >> >>>>> can >> >>>>>>>>>>>> result in >> >>>>>>>>>>>>>> a >> >>>>>>>>>>>>>>> stream, consider merging the join into multi-way joins >> >>>> (yes, >> >>>>>>>>>> this is >> >>>>>>>>>>>> a >> >>>>>>>>>>>>>> very >> >>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we do not >> >>>> try >> >>>>> to >> >>>>>>>>>>>> tackle it >> >>>>>>>>>>>>>>> now but leave it for a better solution :). >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I really like this idea! I agree with you in that this >> >>>>> approach >> >>>>>>>>>> to too >> >>>>>>>>>>>>>> much for adding in this KIP, but we could pick it up >> >> later >> >>>> and >> >>>>>>>>>>>> leverage the >> >>>>>>>>>>>>>> Optimization framework to accomplish this re-use. >> >>>>>>>>>>>>>> Again, while I agree we should break the naming of join >> >>>> state >> >>>>>>>>>> stores >> >>>>>>>>>>>> from >> >>>>>>>>>>>>>> KIP-307, IMHO it's something we should fix now as it will >> >>>> be >> >>>>> the >> >>>>>>>>>> last >> >>>>>>>>>>>> piece >> >>>>>>>>>>>>>> we can provide to give users the ability to completely >> >> make >> >>>>>> their >> >>>>>>>>>>>>>> topologies "upgrade proof" when adding additional >> >>>> operations. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Thanks again to both of you for comments and I look >> >>>> forward to >> >>>>>>>>>> hearing >> >>>>>>>>>>>> back >> >>>>>>>>>>>>>> from you. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>>> Bill >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang < >> >>>>>> wangg...@gmail.com> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Hello Bill, >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Thanks for the KIP. Glad to see that we can likely >> >>>> shooting >> >>>>> two >> >>>>>>>>>> birds >> >>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>> one stone. I have some concerns though about those "two >> >>>>> birds" >> >>>>>>>>>>>>>> themselves: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 1. About not breaking compatibility of stream-stream >> >> join >> >>>>>>>>>>>> materialized >> >>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, but >> >> after >> >>>>>>>>>> thinking >> >>>>>>>>>>>> about >> >>>>>>>>>>>>>>> it once more I'm not sure if exposing Materialized would >> >>>> be a >> >>>>>>>>>> good >> >>>>>>>>>>>>>> solution >> >>>>>>>>>>>>>>> here. My rationles: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of >> >>>> window-store >> >>>>>> is >> >>>>>>>>>> not >> >>>>>>>>>>>>>> ideal, >> >>>>>>>>>>>>>>> and we want to modify it in the near future to be more >> >>>>>>>>>> efficient. Not >> >>>>>>>>>>>>>>> allowing users to override such state store backend >> >> gives >> >>>> us >> >>>>>> such >> >>>>>>>>>>>> freedom >> >>>>>>>>>>>>>>> (which was also considered in the original DSL design), >> >>>>> whereas >> >>>>>>>>>>>> getting a >> >>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out that >> >> freedom >> >>>>> out >> >>>>>>>>>> of the >> >>>>>>>>>>>>>>> window. >> >>>>>>>>>>>>>>> 1.b For strema-stream join, in our original design we >> >>>> intend >> >>>>> to >> >>>>>>>>>>>> "never" >> >>>>>>>>>>>>>>> want users to query the state, since it is just for >> >>>> buffering >> >>>>>> the >> >>>>>>>>>>>>>> upcoming >> >>>>>>>>>>>>>>> records from the stream. Now I know that some users may >> >>>>> indeed >> >>>>>>>>>> want >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>>> query it from the debugging perspective, but still I >> >>>>> concerned >> >>>>>>>>>> about >> >>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes would be >> >> the >> >>>>> right >> >>>>>>>>>>>> solution >> >>>>>>>>>>>>>>> here. And adding Materialized object opens the door to >> >> let >> >>>>>> users >> >>>>>>>>>>>> query >> >>>>>>>>>>>>>>> about it (unless we did something intentionally to still >> >>>>>> forbids >> >>>>>>>>>> it), >> >>>>>>>>>>>>>> which >> >>>>>>>>>>>>>>> also restricts us in the future. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 2. About the coupling between Materialized.name() and >> >>>>>> queryable: >> >>>>>>>>>>>> again I >> >>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if the >> >>>> current >> >>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the >> >>>> best >> >>>>>>>>>>>> approach. >> >>>>>>>>>>>>>>> Here I think I agree with John, that generally speaking >> >>>> it's >> >>>>>>>>>> better >> >>>>>>>>>>>> be a >> >>>>>>>>>>>>>>> control function on the `KTable` itself, rather than on >> >>>>>>>>>>>> `Materialized`, >> >>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>> fixing it via adding functions through `Materialized` >> >>>> seems >> >>>>>> not a >> >>>>>>>>>>>> natural >> >>>>>>>>>>>>>>> approach either. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use two >> >> stones >> >>>>>> rather >> >>>>>>>>>>>> than >> >>>>>>>>>>>>>> one >> >>>>>>>>>>>>>>> to kill these two birds, and probably for this KIP we >> >> just >> >>>>>> focus >> >>>>>>>>>> on >> >>>>>>>>>>>> 1) >> >>>>>>>>>>>>>>> above. And for that I'd like to not expose the >> >>>> Materialized >> >>>>>>>>>> either >> >>>>>>>>>>>> for >> >>>>>>>>>>>>>>> rationales that I've listed above. Instead, we just >> >>>> restrict >> >>>>>>>>>> KIP-307 >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>> NOT >> >>>>>>>>>>>>>>> use the Joined.name for state store names and always use >> >>>>>> internal >> >>>>>>>>>>>> names >> >>>>>>>>>>>>>> as >> >>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not being >> >>>> able >> >>>>>> to >> >>>>>>>>>>>> cover >> >>>>>>>>>>>>>> all >> >>>>>>>>>>>>>>> internal names here, but now I feel this `hole` may >> >>>> better be >> >>>>>>>>>> filled >> >>>>>>>>>>>> by, >> >>>>>>>>>>>>>>> e.g. not creating changelog topics but just use the >> >>>> upstream >> >>>>> to >> >>>>>>>>>>>>>>> re-bootstrap the materialized store, more concretely: >> >> when >> >>>>>>>>>>>> materializing >> >>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic on an >> >>>>> existing >> >>>>>>>>>>>> topic, >> >>>>>>>>>>>>>> e.g. >> >>>>>>>>>>>>>>> a) if the stream is coming directly from some source >> >> topic >> >>>>>>>>>> (including >> >>>>>>>>>>>>>>> repartition topic), make that as changelog topic and if >> >>>> it is >> >>>>>>>>>>>> repartition >> >>>>>>>>>>>>>>> topic change the retention / data purging policy >> >>>> necessarily >> >>>>> as >> >>>>>>>>>>>> well; b) >> >>>>>>>>>>>>>> if >> >>>>>>>>>>>>>>> the stream is coming from some stateless operators, >> >>>> delegate >> >>>>>> that >> >>>>>>>>>>>>>> stateless >> >>>>>>>>>>>>>>> operator to the parent stream similar as a); if the >> >>>> stream is >> >>>>>>>>>> coming >> >>>>>>>>>>>>>> from a >> >>>>>>>>>>>>>>> stream-stream join which is the only stateful operator >> >>>> that >> >>>>> can >> >>>>>>>>>>>> result >> >>>>>>>>>>>>>> in a >> >>>>>>>>>>>>>>> stream, consider merging the join into multi-way joins >> >>>> (yes, >> >>>>>>>>>> this is >> >>>>>>>>>>>> a >> >>>>>>>>>>>>>> very >> >>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we do not >> >>>> try >> >>>>> to >> >>>>>>>>>>>> tackle it >> >>>>>>>>>>>>>>> now but leave it for a better solution :). >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Guozhang >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler < >> >>>>>> j...@confluent.io >> >>>>>>>>>>> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Hi Bill, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Thanks for the KIP! Awesome job catching this >> >> unexpected >> >>>>>>>>>>>> consequence >> >>>>>>>>>>>>>>>> of the prior KIPs before it was released. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> The proposal looks good to me. On top of just fixing >> >> the >> >>>>>>>>>> problem, >> >>>>>>>>>>>> it >> >>>>>>>>>>>>>>>> seems to address two other pain points: >> >>>>>>>>>>>>>>>> * that naming a state store automatically causes it to >> >>>>> become >> >>>>>>>>>>>>>> queriable. >> >>>>>>>>>>>>>>>> * that there's currently no way to configure the bytes >> >>>> store >> >>>>>>>>>> for >> >>>>>>>>>>>> join >> >>>>>>>>>>>>>>>> windows. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> It's awesome that we can fix this issue and two others >> >>>> with >> >>>>>> one >> >>>>>>>>>>>>>> feature. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the truth >> >>>> table >> >>>>>>>>>>>> involving >> >>>>>>>>>>>>>>>> whether a Materialized is stored or not and querying is >> >>>>>>>>>>>>>>>> enabled/disabled... What should be the behavior if >> >> there >> >>>> is >> >>>>> no >> >>>>>>>>>>>> store >> >>>>>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) and >> >>>>>>>>>> querying is >> >>>>>>>>>>>>>>>> enabled? >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> It seems we have two choices: >> >>>>>>>>>>>>>>>> 1. we can force creation of a state store in this case, >> >>>> so >> >>>>> the >> >>>>>>>>>>>> store >> >>>>>>>>>>>>>>>> can be used to serve the queries >> >>>>>>>>>>>>>>>> 2. we can provide just a queriable view, basically >> >>>> letting >> >>>>> IQ >> >>>>>>>>>> query >> >>>>>>>>>>>>>>>> into the "KTableValueGetter", which would transparently >> >>>>>>>>>> construct >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> query response by applying the operator logic to the >> >>>>> upstream >> >>>>>>>>>>>> state if >> >>>>>>>>>>>>>>>> the operator state isn't already stored. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Offhand, it seems like the second is actually a pretty >> >>>>> awesome >> >>>>>>>>>>>>>>>> capability. But it might have an awkward interaction >> >> with >> >>>>> the >> >>>>>>>>>>>> current >> >>>>>>>>>>>>>>>> semantics. Presently, if I provide a >> >>>> Materialized.withName, >> >>>>> it >> >>>>>>>>>>>> implies >> >>>>>>>>>>>>>>>> that querying should be enabled AND that the view >> >> should >> >>>>>>>>>> actually >> >>>>>>>>>>>> be >> >>>>>>>>>>>>>>>> stored in a state store. Under option 2 above, this >> >>>> behavior >> >>>>>>>>>> would >> >>>>>>>>>>>>>>>> change to NOT provision a state store and instead just >> >>>>> consult >> >>>>>>>>>> the >> >>>>>>>>>>>>>>>> ValueGetter. To get back to the current behavior, users >> >>>>> would >> >>>>>>>>>> have >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>>>> add a "bytes store supplier" to the Materialized to >> >>>> indicate >> >>>>>>>>>> that, >> >>>>>>>>>>>>>>>> yes, they really want a state store there. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Behavior changes are always kind of scary, but I think >> >> in >> >>>>> this >> >>>>>>>>>>>> case, >> >>>>>>>>>>>>>>>> it might actually be preferable. In the event where >> >> only >> >>>> the >> >>>>>>>>>> name >> >>>>>>>>>>>> is >> >>>>>>>>>>>>>>>> provided, it means that people just wanted to make the >> >>>>>>>>>> operation >> >>>>>>>>>>>>>>>> result queriable. If we automatically convert this to a >> >>>>>>>>>> non-stored >> >>>>>>>>>>>>>>>> view, then simply upgrading results in the same >> >>>> observable >> >>>>>>>>>> behavior >> >>>>>>>>>>>>>>>> and semantics, but a linear reduction in local storage >> >>>>>>>>>> requirements >> >>>>>>>>>>>>>>>> and disk i/o, as well as a corresponding linear >> >>>> reduction in >> >>>>>>>>>> memory >> >>>>>>>>>>>>>>>> usage both on and off heap. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> What do you think? >> >>>>>>>>>>>>>>>> -John >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck < >> >>>>>> bbej...@gmail.com >> >>>>>>>>>>> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> All, >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I'd like to start a discussion for adding a >> >> Materialized >> >>>>>>>>>>>>>> configuration >> >>>>>>>>>>>>>>>>> object to KStream.join for naming state stores >> >> involved >> >>>> in >> >>>>>>>>>> joins. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Your comments and suggestions are welcome. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Thanks, >> >>>>>>>>>>>>>>>>> Bill >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> -- >> >>>>>>>>>>>>>>> -- Guozhang >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> -- >> >>>>>>>>>>>>> -- Guozhang >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> -- >> >>>>>>>>>>> -- Guozhang >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>>> >> >>>> -- >> >>>> -- Guozhang >> >>>> >> >>> >> >> >> > >> > >> >>