Just to clarify I'll update `as(StoreSupplier, StoreSupplier)` to
`with(..., ...)` and change the class name from `StreamJoined` to
`StreamJoin`

-Bill

On Tue, Sep 17, 2019 at 2:09 PM Bill Bejeck <bbej...@gmail.com> wrote:

> Thanks for the comments, Matthias.
>
> I like both suggestions regarding the names and I'll update the KIP
> accordingly.
>
> On Tue, Sep 17, 2019 at 11:22 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Thanks Bill!
>>
>> Using a new configuration object was suggested by John in the original
>> DISCUSS thread already. We rejected it because we wanted to avoid a new
>> configuration class. However, given your analysis, it seems it's
>> actually the right choice to introduce a new configuration class.
>>
>> Hence, overall I am +1 on the proposal.
>>
>>
>> Some nits about names (as always :))
>>
>> - `StreamJoined.as()` does not sound right for the `StoreSupplier`
>> overload. Maybe it's better to call that static method `with` (not 100%
>> sure)
>>
>> - Should we use plural instead of singular -> `Stream{s}Joined`
>>   -> or keep singular but call it `StreamJoin`
>>   `Joined` seems to refer to the input stream(s) while `Join` would
>> refer to the join-operator
>>
>> Thoughts?
>>
>>
>> You suggest to deprecate existing overload what I support -- can you
>> list deprecated method in the "Public Interface" section?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 9/17/19 9:39 AM, Guozhang Wang wrote:
>> > Hello Bill,
>> >
>> > Thanks for the updated KIP. I made a pass on the StreamJoined section.
>> Just
>> > a quick question from user's perspective: if a user wants to provide a
>> > customized store-supplier, she is forced to also provide a name via the
>> > store-supplier. So there's no way to say "I want to provide my own store
>> > engine but let the library decide its name", is that right?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbej...@gmail.com> wrote:
>> >
>> >> Bumping this discussion as we need to re-vote before the KIP deadline.
>> >>
>> >> On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbej...@gmail.com>
>> wrote:
>> >>
>> >>> Hi All,
>> >>>
>> >>> While working on the implementation of KIP-479, some issues came to
>> light
>> >>> that the KIP as written won't work.  I have updated the KIP with a
>> >> solution
>> >>> I believe will solve the original problem as well as address the
>> >>> impediment to the initial approach.
>> >>>
>> >>> This update is a significant change, so please review the updated KIP
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
>> >> and
>> >>> provide feedback.  After we conclude the discussion, there will be a
>> >>> re-vote.
>> >>>
>> >>> Thanks!
>> >>> Bill
>> >>>
>> >>> On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang <wangg...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> Hi Bill, thanks for your explanations. I'm on board with your
>> decision
>> >>>> too.
>> >>>>
>> >>>>
>> >>>> Guozhang
>> >>>>
>> >>>> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbej...@gmail.com>
>> wrote:
>> >>>>
>> >>>>> Thanks for the response, John.
>> >>>>>
>> >>>>>> If I can offer my thoughts, it seems better to just document on the
>> >>>>>> Stream join javadoc for the Materialized parameter that it will not
>> >>>>>> make the join result queriable. I'm not opposed to the queriable
>> >> flag
>> >>>>>> in general, but introducing it is a much larger consideration that
>> >> has
>> >>>>>> previously derailed this KIP discussion. In the interest of just
>> >>>>>> closing the gap and keeping the API change small, it seems better
>> to
>> >>>>>> just go with documentation for now.
>> >>>>>
>> >>>>> I agree with your statement here.  IMHO the most important goal of
>> >> this
>> >>>> KIP
>> >>>>> is to not breaking existing users and gain some consistency of the
>> >> API.
>> >>>>>
>> >>>>> I'll update the KIP accordingly.
>> >>>>>
>> >>>>> -Bill
>> >>>>>
>> >>>>> On Tue, Jul 16, 2019 at 11:55 AM John Roesler <j...@confluent.io>
>> >>>> wrote:
>> >>>>>
>> >>>>>> Hi Bill,
>> >>>>>>
>> >>>>>> Thanks for driving this KIP toward a conclusion. I'm on board with
>> >>>>>> your decision.
>> >>>>>>
>> >>>>>> You didn't mention whether you're still proposing to add the
>> >>>>>> "queriable" flag to the Materialized config object, or just
>> document
>> >>>>>> that a Stream join is never queriable. Both options have come up
>> >>>>>> earlier in the discussion, so it would be good to pin this down.
>> >>>>>>
>> >>>>>> If I can offer my thoughts, it seems better to just document on the
>> >>>>>> Stream join javadoc for the Materialized parameter that it will not
>> >>>>>> make the join result queriable. I'm not opposed to the queriable
>> >> flag
>> >>>>>> in general, but introducing it is a much larger consideration that
>> >> has
>> >>>>>> previously derailed this KIP discussion. In the interest of just
>> >>>>>> closing the gap and keeping the API change small, it seems better
>> to
>> >>>>>> just go with documentation for now.
>> >>>>>>
>> >>>>>> Thanks again,
>> >>>>>> -John
>> >>>>>>
>> >>>>>> On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck <bbej...@gmail.com>
>> >>>> wrote:
>> >>>>>>>
>> >>>>>>> Thanks all for the great discussion so far.
>> >>>>>>>
>> >>>>>>> Everyone has made excellent points, and I appreciate the detail
>> >>>>> everyone
>> >>>>>>> has put into their arguments.
>> >>>>>>>
>> >>>>>>> However, after carefully evaluating all the points made so far,
>> >>>>> creating
>> >>>>>> an
>> >>>>>>> overload with Materialized is still my #1 option.
>> >>>>>>> My reasoning for saying so is two-fold:
>> >>>>>>>
>> >>>>>>>    1. It's a small change, and IMHO since it's consistent with our
>> >>>>>> current
>> >>>>>>>    API concerning state store usage, the cognitive load on users
>> >>>> will
>> >>>>> be
>> >>>>>>>    minimal.
>> >>>>>>>    2. It achieves the most important goal of this KIP, namely to
>> >>>> close
>> >>>>>> the
>> >>>>>>>    gap of naming state stores independently of the join operator
>> >>>> name.
>> >>>>>>>
>> >>>>>>> Additionally, I agree with the points made by Matthias earlier (I
>> >>>>> realize
>> >>>>>>> there is some overlap here).
>> >>>>>>>
>> >>>>>>>>  - the main purpose of this KIP is to close the naming gap what
>> >> we
>> >>>>>> achieve
>> >>>>>>>>  - we can allow people to use the new in-memory store
>> >>>>>>>>  - we allow people to enable/disable caching
>> >>>>>>>>  - we unify the API
>> >>>>>>>>  - we decouple querying from naming
>> >>>>>>>>  - it's a small API change
>> >>>>>>>
>> >>>>>>> Although it's not a perfect solution,  IMHO the positives of using
>> >>>>>>> Materialize far outweigh the negatives, and from what we've
>> >>>> discussed
>> >>>>> so
>> >>>>>>> far, anything we implement seems to involve an additional change
>> >>>> down
>> >>>>> the
>> >>>>>>> road.
>> >>>>>>>
>> >>>>>>> If others are still strongly opposed to using Materialized, my
>> >> other
>> >>>>>>> preferences would be
>> >>>>>>>
>> >>>>>>>    1. Add a "withStoreName" to Joined.  Although I agree with
>> >>>> Guozhang
>> >>>>>> that
>> >>>>>>>    having a parameter that only applies to one use-case would be
>> >>>>> clumsy.
>> >>>>>>>    2. Add a String overload for naming the store, but this would
>> >> be
>> >>>> my
>> >>>>>>>    least favorite option as IMHO this seems to be a step backward
>> >>>> from
>> >>>>>> why we
>> >>>>>>>    introduced configuration objects in the first place.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Bill
>> >>>>>>>
>> >>>>>>> On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax <
>> >>>> matth...@confluent.io
>> >>>>>>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks for the KIP Bill!
>> >>>>>>>>
>> >>>>>>>> Great discussion to far.
>> >>>>>>>>
>> >>>>>>>> About John's idea about querying upstream stores and don't
>> >>>>> materialize
>> >>>>>> a
>> >>>>>>>> store: I agree with Bill that this seems to be an orthogonal
>> >>>>> question,
>> >>>>>>>> and it might be better to treat it as an independent
>> >> optimization
>> >>>> and
>> >>>>>>>> exclude from this KIP.
>> >>>>>>>>
>> >>>>>>>>> What should be the behavior if there is no store
>> >>>>>>>>> configured (e.g., if Materialized with only serdes) and
>> >>>> querying is
>> >>>>>>>>> enabled?
>> >>>>>>>>
>> >>>>>>>> IMHO, this could be an error case. If one wants to query a
>> >> store,
>> >>>>> they
>> >>>>>>>> need to provide a name -- if you don't know the name, how would
>> >>>> you
>> >>>>>>>> actually query the store (even if it would be possible to get
>> >> the
>> >>>>> name
>> >>>>>>>> from the `TopologyDescription`, it seems clumsy).
>> >>>>>>>>
>> >>>>>>>> If we don't want to throw an error, materializing seems to be
>> >> the
>> >>>>> right
>> >>>>>>>> option, to exclude "query optimization" from this KIP. I would
>> >> be
>> >>>> ok
>> >>>>>>>> with this option, even if it's clumsy to get the name from
>> >>>>>>>> `TopologyDescription`; hence, I would prefer to treat it as an
>> >>>> error.
>> >>>>>>>>
>> >>>>>>>>> To get back to the current behavior, users would have to
>> >>>>>>>>> add a "bytes store supplier" to the Materialized to indicate
>> >>>> that,
>> >>>>>>>>> yes, they really want a state store there.
>> >>>>>>>>
>> >>>>>>>> This sound like a quite subtle semantic difference on how to use
>> >>>> the
>> >>>>>>>> API. Might be hard to explain to users. I would prefer to not
>> >>>>>> introduce it.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> About Guozhang's points:
>> >>>>>>>>
>> >>>>>>>> 1a) That is actually a good point. However, I believe we cannot
>> >>>> get
>> >>>>>>>> around this issue easily, and it seems ok to me, to expose the
>> >>>> actual
>> >>>>>>>> store type we are using. (More thoughts later.)
>> >>>>>>>>
>> >>>>>>>> 1b) I don't see an issue with allowing users to query all
>> >> stores?
>> >>>>> What
>> >>>>>>>> is the rational behind it? What do we gain by not allowing it?
>> >>>>>>>>
>> >>>>>>>> 2) While I understand what you are saying, we also want/need to
>> >>>> have
>> >>>>> a
>> >>>>>>>> way in the PAPI to allow users adding "internal/private"
>> >>>>> non-queryable
>> >>>>>>>> stores to a topology. That's possible via
>> >>>>>>>> `Materialized#withQueryingDisabled()`. We could also update
>> >>>>>>>> `Topology#addStateStore(StoreBuilder, boolean isQueryable,
>> >>>>> String...)`
>> >>>>>>>> to address this. Again, I agree with Bill that the current API
>> >> is
>> >>>>> built
>> >>>>>>>> in a certain way, and if we want to change it, it should be a
>> >>>>> separate
>> >>>>>>>> KIP, as it seems to be an orthogonal concern.
>> >>>>>>>>
>> >>>>>>>>> Instead, we just restrict KIP-307 to NOT
>> >>>>>>>>> use the Joined.name for state store names and always use
>> >>>> internal
>> >>>>>> names
>> >>>>>>>> as
>> >>>>>>>>> well, which admittedly indeed leaves a hole of not being able
>> >> to
>> >>>>>> cover
>> >>>>>>>> all
>> >>>>>>>>> internal names here
>> >>>>>>>>
>> >>>>>>>> I think it's important to close this gap. Naming entities seems
>> >>>> to a
>> >>>>>>>> binary feature: if there is a gap, the feature is more or less
>> >>>>> useless,
>> >>>>>>>> rendering KIP-307 void.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> I like John's detailed list of required features and what
>> >>>>>>>> Materialized/WindowByteStoreSuppliers offers. My take is, that
>> >>>> adding
>> >>>>>>>> Materialized including the required run-time checks is the best
>> >>>>> option
>> >>>>>>>> we have, for the following reasons:
>> >>>>>>>>
>> >>>>>>>>  - the main purpose of this KIP is to close the naming gap what
>> >> we
>> >>>>>> achieve
>> >>>>>>>>  - we can allow people to use the new in-memory store
>> >>>>>>>>  - we allow people to enable/disable caching
>> >>>>>>>>  - we unify the API
>> >>>>>>>>  - we decouple querying from naming
>> >>>>>>>>  - it's a small API change
>> >>>>>>>>
>> >>>>>>>> Adding an overload and only passing in a name, would address the
>> >>>> main
>> >>>>>>>> purpose of the KIP. However, it falls short on all the other
>> >>>>> "goodies".
>> >>>>>>>> As you mentioned, passing in `Materialized` might not be perfect
>> >>>> and
>> >>>>>>>> maybe we need to deprecate is at some point; but this is also
>> >> true
>> >>>>> for
>> >>>>>>>> passing in just a name.
>> >>>>>>>>
>> >>>>>>>> I am also not convinced, that a `StreamJoinStore` would resolve
>> >>>> all
>> >>>>> the
>> >>>>>>>> issues. In the end, as long as we are using a `WindowedStore`
>> >>>>>>>> internally, we need to expose this "implemenation detail" to
>> >>>> users to
>> >>>>>>>> allow them to plug in a custom store. Adding `Materialized` seem
>> >>>> to
>> >>>>> be
>> >>>>>>>> the best short-term fix from my point of view.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 6/27/19 9:56 AM, Guozhang Wang wrote:
>> >>>>>>>>> Hi John,
>> >>>>>>>>>
>> >>>>>>>>> I actually feels better about a new interface but I'm not sure
>> >>>> if
>> >>>>> we
>> >>>>>>>> would
>> >>>>>>>>> need the full configuration of store / log / cache, now or in
>> >>>> the
>> >>>>>> future
>> >>>>>>>>> ever for stream-stream join.
>> >>>>>>>>>
>> >>>>>>>>> Right now I feel that 1) we want to improve our implementation
>> >>>> of
>> >>>>>>>>> stream-stream join, and potentially also allow users to
>> >>>> customize
>> >>>>>> this
>> >>>>>>>>> implementation but with a more suitable interface than the
>> >>>> current
>> >>>>>>>>> WindowStore interface, how to do that is less clear and
>> >>>>>> execution-wise
>> >>>>>>>> it's
>> >>>>>>>>> (arguably..) not urgent; 2) we want to close the last gap
>> >>>>>> (Stream-stream
>> >>>>>>>>> join) of allowing users to specify all internal names to help
>> >> on
>> >>>>>> backward
>> >>>>>>>>> compatibility, which is urgent.
>> >>>>>>>>>
>> >>>>>>>>> Therefore if we want to unblock 2) from 1) in the near term, I
>> >>>> feel
>> >>>>>>>>> slightly inclined to just add overload functions that takes
>> >> in a
>> >>>>>> store
>> >>>>>>>> name
>> >>>>>>>>> for stream-stream joins only -- and admittedly, in the future
>> >>>> this
>> >>>>>>>> function
>> >>>>>>>>> maybe deprecated -- i.e. if we have to do something that we
>> >> "may
>> >>>>>> regret"
>> >>>>>>>> in
>> >>>>>>>>> the future, I'd like to pick the least intrusive option.
>> >>>>>>>>>
>> >>>>>>>>> About `Joined#withStoreName`: since the Joined class itself is
>> >>>> also
>> >>>>>> used
>> >>>>>>>> in
>> >>>>>>>>> other join types, I feel less comfortable to have a
>> >>>>>>>> `Joined#withStoreName`
>> >>>>>>>>> which is only going to be used by stream-stream join. Or
>> >> maybe I
>> >>>>> miss
>> >>>>>>>>> something here about the "latter" case that you are referring
>> >>>> to?
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Guozhang
>> >>>>>>>>>
>> >>>>>>>>> On Mon, Jun 24, 2019 at 12:16 PM John Roesler <
>> >>>> j...@confluent.io>
>> >>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Thanks Guozhang,
>> >>>>>>>>>>
>> >>>>>>>>>> Yep. Maybe we can consider just exactly what the join needs:
>> >>>>>>>>>>
>> >>>>>>>>>>> the WindowStore<Bytes, byte[]> itself is fine, if overly
>> >>>> broad,
>> >>>>>>>>>>> since the only two methods we need are `window.put(key,
>> >> value,
>> >>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> iter =
>> >>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`.
>> >>>>>>>>>>
>> >>>>>>>>>> One "middle ground" would be to extract _this_ into a new
>> >> store
>> >>>>>>>>>> interface, which only supports these API calls, like
>> >>>>>>>>>> StreamJoinStore<K, V>. This would give us the latitude we
>> >> need
>> >>>> to
>> >>>>>>>>>> efficiently support the exact operation without concerning
>> >>>>> ourselves
>> >>>>>>>>>> with all the other things a WindowStore can do (which are
>> >>>>>> unreachable
>> >>>>>>>>>> for the join use case). It would also let us drop "store
>> >>>>> duplicates"
>> >>>>>>>>>> from the main WindowStore interface, since it only exists to
>> >>>>> support
>> >>>>>>>>>> the join use case.
>> >>>>>>>>>>
>> >>>>>>>>>> If we were to add a new StreamJoinStore interface, then it'd
>> >> be
>> >>>>>>>>>> straightforward how we could add also
>> >>>>>>>>>> `Materialized.as(StreamJoinBytesStoreSupplier)` and use
>> >>>>>> Materialized,
>> >>>>>>>>>> or alternatively add the ability to set the bytes store on
>> >>>> Joined.
>> >>>>>>>>>>
>> >>>>>>>>>> Personally, I'm kind of leaning toward the latter (and also
>> >>>> doing
>> >>>>>>>>>> `Joined#withStoreName`), since adding the new interface to
>> >>>>>>>>>> Materialized then also pollutes the interface for its
>> >> _actual_
>> >>>> use
>> >>>>>>>>>> case of materializing a table view. Of course, to solve the
>> >>>>>> immediate
>> >>>>>>>>>> problem, all we need is the store name, but we might feel
>> >>>> better
>> >>>>>> about
>> >>>>>>>>>> adding the store name to Joined if we _also_ feel like in the
>> >>>>>> future,
>> >>>>>>>>>> we would add store/log/cache configuration to Joined as well.
>> >>>>>>>>>>
>> >>>>>>>>>> -John
>> >>>>>>>>>>
>> >>>>>>>>>> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang <
>> >>>>> wangg...@gmail.com>
>> >>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Hello John,
>> >>>>>>>>>>>
>> >>>>>>>>>>> My main concern is exactly the first point at the bottom of
>> >>>> your
>> >>>>>>>> analysis
>> >>>>>>>>>>> here: "* configure the bytes store". I'm not sure if using a
>> >>>>> window
>> >>>>>>>> bytes
>> >>>>>>>>>>> store would be ideal for stream-stream windowed join; e.g.
>> >> we
>> >>>>> could
>> >>>>>>>>>>> consider two dimensional list sorted by timestamps and then
>> >> by
>> >>>>>> keys to
>> >>>>>>>> do
>> >>>>>>>>>>> the join, whereas a windowed bytes store is basically sorted
>> >>>> by
>> >>>>> key
>> >>>>>>>>>> first,
>> >>>>>>>>>>> then by timestamp. If we expose the Materialized to let user
>> >>>> pass
>> >>>>>> in a
>> >>>>>>>>>>> windowed bytes store, then we would need to change that if
>> >> we
>> >>>>> want
>> >>>>>> to
>> >>>>>>>>>>> replace it with a different implementation interface.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Guozhang
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler <
>> >>>> j...@confluent.io>
>> >>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hey Guozhang and Bill,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> For what it's worth, I agree with you both!
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I think it might help the discussion to look concretely at
>> >>>> what
>> >>>>>>>>>>>> Materialized does:
>> >>>>>>>>>>>> * set a WindowBytesStoreSupplier
>> >>>>>>>>>>>> * set a name
>> >>>>>>>>>>>> * set the key/value serdes
>> >>>>>>>>>>>> * disable/enable/configure change-logging
>> >>>>>>>>>>>> * disable/enable caching
>> >>>>>>>>>>>> * configure retention
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Further, looking into the WindowBytesStoreSupplier, the
>> >>>>> interface
>> >>>>>> lets
>> >>>>>>>>>> you:
>> >>>>>>>>>>>> * get the segment interval
>> >>>>>>>>>>>> * get the window size
>> >>>>>>>>>>>> * get whether "duplicates" are enabled
>> >>>>>>>>>>>> * get the retention period
>> >>>>>>>>>>>> * (obviously) get a WindowStore<Bytes, byte[]>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> We know that Materialized isn't exactly what we need for
>> >>>> stream
>> >>>>>> joins,
>> >>>>>>>>>>>> but we can see how close Materialized is to what we need.
>> >> If
>> >>>> it
>> >>>>> is
>> >>>>>>>>>>>> close, maybe we can use it and document the gaps, and if it
>> >>>> is
>> >>>>> not
>> >>>>>>>>>>>> close, then maybe we should just add what we need to
>> >> Joined.
>> >>>>>>>>>>>> Stream Join's requirements for its stores:
>> >>>>>>>>>>>> * a multimap store (i.e., it keeps duplicates) for storing
>> >>>>> general
>> >>>>>>>>>>>> (not windowed) keyed records associated with their
>> >> insertion
>> >>>>>> time, and
>> >>>>>>>>>>>> allows efficient time-bounded lookups and also efficient
>> >>>> purges
>> >>>>>> of old
>> >>>>>>>>>>>> data.
>> >>>>>>>>>>>> ** Note, a properly configured WindowBytesStoreSupplier
>> >>>>> satisfies
>> >>>>>> this
>> >>>>>>>>>>>> requirement, and the interface supports the queries we need
>> >>>> to
>> >>>>>> verify
>> >>>>>>>>>>>> the configuration at run-time
>> >>>>>>>>>>>> * set a name for the store
>> >>>>>>>>>>>> * do _not_ set the serdes (they are already set in Joined)
>> >>>>>>>>>>>> * logging could be configurable (set to enabled now)
>> >>>>>>>>>>>> * caching could be configurable (set to enabled now)
>> >>>>>>>>>>>> * do _not_ configure retention (determined by JoinWindows)
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> So, out of six capabilities for Materialized, there are two
>> >>>> we
>> >>>>>> don't
>> >>>>>>>>>>>> want (serdes and retention). These would become run-time
>> >>>> checks
>> >>>>>> if we
>> >>>>>>>>>>>> use it.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> A third questionable capability is to provide a
>> >>>>>>>>>>>> WindowBytesStoreSupplier. Looking at whether the
>> >>>>>>>>>>>> WindowBytesStoreSupplier is the right interface for Stream
>> >>>> Join:
>> >>>>>>>>>>>> * configuring segment interval is fine
>> >>>>>>>>>>>> * should _not_ configure window size (it's determined by
>> >>>>>> JoinWindows)
>> >>>>>>>>>>>> * duplicates _must_ be enabled
>> >>>>>>>>>>>> * retention should be _at least_ windowSize + gracePeriod,
>> >>>> but
>> >>>>>> note
>> >>>>>>>>>>>> that (unlike for Table window stores) there is no utility
>> >> in
>> >>>>>> having a
>> >>>>>>>>>>>> longer retention time.
>> >>>>>>>>>>>> * the WindowStore<Bytes, byte[]> itself is fine, if overly
>> >>>>> broad,
>> >>>>>>>>>>>> since the only two methods we need are `window.put(key,
>> >>>> value,
>> >>>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> iter =
>> >>>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thus, flattening out the overlap for
>> >> WindowBytesStoreSupplier
>> >>>>>> onto the
>> >>>>>>>>>>>> overlap for Materialized, we have 9 capabilities total
>> >> (note
>> >>>>>> retention
>> >>>>>>>>>>>> is duplicated), we have 4 that we don't want:
>> >>>>>>>>>>>> * do _not_ set the serdes (they are already set in Joined)
>> >>>>>>>>>>>> * do _not_ configure retention (determined by JoinWindows)
>> >>>>>>>>>>>> * should _not_ configure window size (it's determined by
>> >>>>>> JoinWindows)
>> >>>>>>>>>>>> * duplicates _must_ be enabled
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> These gaps would have to be covered with run-time checks if
>> >>>> we
>> >>>>>> re-use
>> >>>>>>>>>>>> Materialized and WindowStoreBytesStoreSupplier both. Maybe
>> >>>> this
>> >>>>>> sounds
>> >>>>>>>>>>>> bad, but consider the other side, that we get 5 new
>> >>>> capabilities
>> >>>>>> we
>> >>>>>>>>>>>> don't require, but are still pretty nice:
>> >>>>>>>>>>>> * configure the bytes store
>> >>>>>>>>>>>> * set a name for the store
>> >>>>>>>>>>>> * configure caching
>> >>>>>>>>>>>> * configure logging
>> >>>>>>>>>>>> * configure segment interval
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Not sure where this nets us out, but it's food for thought.
>> >>>>>>>>>>>> -John
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <
>> >>>>> wangg...@gmail.com
>> >>>>>>>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi Bill,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I think by giving a Materialized param into stream-stream
>> >>>> join,
>> >>>>>> it's
>> >>>>>>>>>> okay
>> >>>>>>>>>>>>> (though still ideal) to say "we still would not expose the
>> >>>>> store
>> >>>>>> for
>> >>>>>>>>>>>>> queries", but it would sound a bit awkward to say "we
>> >> would
>> >>>>> also
>> >>>>>>>>>> ignore
>> >>>>>>>>>>>>> whatever the passed in store supplier but just use our
>> >>>> default
>> >>>>>> ones"
>> >>>>>>>>>> --
>> >>>>>>>>>>>>> again the concern is that, if in the future we'd want to
>> >>>> change
>> >>>>>> the
>> >>>>>>>>>>>> default
>> >>>>>>>>>>>>> implementation of join algorithm which no longer rely on a
>> >>>>> window
>> >>>>>>>>>> store
>> >>>>>>>>>>>>> with deduping enabled, then we need to change this API
>> >>>> again by
>> >>>>>>>>>> changing
>> >>>>>>>>>>>>> the store supplier type.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> If we do want to fill this hole for stream-stream join, I
>> >>>> feel
>> >>>>>> just
>> >>>>>>>>>>>> adding
>> >>>>>>>>>>>>> a String typed store-name would even be less
>> >>>> future-intrusive
>> >>>>> if
>> >>>>>> we
>> >>>>>>>>>>>> expect
>> >>>>>>>>>>>>> this parameter to be modified later.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Does that makes sense?
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Guozhang
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <
>> >>>>> bbej...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Thanks for the comments John and Guozhang, I'll address
>> >>>> each
>> >>>>>> one of
>> >>>>>>>>>>>> your
>> >>>>>>>>>>>>>> comments in turn.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> John,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the truth
>> >>>> table
>> >>>>>>>>>> involving
>> >>>>>>>>>>>>>>> whether a Materialized is stored or not and querying is
>> >>>>>>>>>>>>>>> enabled/disabled... What should be the behavior if there
>> >>>> is
>> >>>>> no
>> >>>>>>>>>> store
>> >>>>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) and
>> >>>>>> querying
>> >>>>>>>>>> is
>> >>>>>>>>>>>>>> enabled?
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> It seems we have two choices:
>> >>>>>>>>>>>>>>> 1. we can force creation of a state store in this case,
>> >> so
>> >>>>> the
>> >>>>>>>>>> store
>> >>>>>>>>>>>>>>> can be used to serve the queries
>> >>>>>>>>>>>>>>> 2. we can provide just a queriable view, basically
>> >>>> letting IQ
>> >>>>>>>>>> query
>> >>>>>>>>>>>>>>> into the "KTableValueGetter", which would transparently
>> >>>>>>>>>> construct the
>> >>>>>>>>>>>>>>> query response by applying the operator logic to the
>> >>>> upstream
>> >>>>>>>>>> state
>> >>>>>>>>>>>> if
>> >>>>>>>>>>>>>>> the operator state isn't already stored.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I agree with your assertion about a missing quadrant from
>> >>>> the
>> >>>>>> truth
>> >>>>>>>>>>>> table.
>> >>>>>>>>>>>>>> Additionally, I too like the concept of a queriable view.
>> >>>>> But I
>> >>>>>>>>>> think
>> >>>>>>>>>>>> that
>> >>>>>>>>>>>>>> goes a bit beyond the scope of this KIP and would like to
>> >>>>> pursue
>> >>>>>>>>>> that
>> >>>>>>>>>>>>>> feature as follow-on work.  Also thinking about this KIP
>> >>>> some
>> >>>>>>>>>> more, I'm
>> >>>>>>>>>>>>>> thinking of the changes to Materialized might be a reach
>> >> as
>> >>>>>> well.
>> >>>>>>>>>>>>>> Separating the naming from a store and its queryable
>> >> state
>> >>>>> seems
>> >>>>>>>>>> like a
>> >>>>>>>>>>>>>> complex issue in and of itself and should be treated
>> >>>>>> accordingly.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> So here's what I'm thinking now.  We add Materialzied to
>> >>>> Join,
>> >>>>>> but
>> >>>>>>>>>> for
>> >>>>>>>>>>>> now,
>> >>>>>>>>>>>>>> we internally disable querying.  I know this breaks our
>> >>>>> current
>> >>>>>>>>>>>> semantic
>> >>>>>>>>>>>>>> approach, but I think it's crucial that we do two things
>> >> in
>> >>>>> this
>> >>>>>>>>>> KIP
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>    1. Break the naming of the state stores from Joined to
>> >>>>>>>>>>>> Materialized, so
>> >>>>>>>>>>>>>>    the naming of state stores follows our current pattern
>> >>>> and
>> >>>>>>>>>> enables
>> >>>>>>>>>>>>>> upgrades
>> >>>>>>>>>>>>>>    from 2.3 to 2.4
>> >>>>>>>>>>>>>>    2. Offer the ability to configure the state stores of
>> >>>> the
>> >>>>>> join,
>> >>>>>>>>>> even
>> >>>>>>>>>>>>>>    providing a different implementation (i.e. in-memory)
>> >> if
>> >>>>>>>>>> desired.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> With that in mind I'm considering changing the KIP to
>> >>>> remove
>> >>>>> the
>> >>>>>>>>>>>> changes to
>> >>>>>>>>>>>>>> Materialized, and we document very clearly that by
>> >>>> providing a
>> >>>>>>>>>>>> Materialized
>> >>>>>>>>>>>>>> object with a name is only for naming the state store,
>> >>>> hence
>> >>>>> the
>> >>>>>>>>>>>> changelog
>> >>>>>>>>>>>>>> topics and any possible configurations of the store, but
>> >>>> this
>> >>>>>> store
>> >>>>>>>>>>>> *will
>> >>>>>>>>>>>>>> not be available for IQ.*
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> WDYT?
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Guozhang,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 1. About not breaking compatibility of stream-stream
>> >> join
>> >>>>>>>>>>>> materialized
>> >>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, but
>> >> after
>> >>>>>>>>>> thinking
>> >>>>>>>>>>>> about
>> >>>>>>>>>>>>>>> it once more I'm not sure if exposing Materialized would
>> >>>> be a
>> >>>>>>>>>> good
>> >>>>>>>>>>>>>> solution
>> >>>>>>>>>>>>>>> here. My rationles:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of
>> >>>> window-store
>> >>>>>> is
>> >>>>>>>>>> not
>> >>>>>>>>>>>>>> ideal,
>> >>>>>>>>>>>>>>> and we want to modify it in the near future to be more
>> >>>>>>>>>> efficient. Not
>> >>>>>>>>>>>>>>> allowing users to override such state store backend
>> >> gives
>> >>>> us
>> >>>>>> such
>> >>>>>>>>>>>> freedom
>> >>>>>>>>>>>>>>> (which was also considered in the original DSL design),
>> >>>>> whereas
>> >>>>>>>>>>>> getting a
>> >>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out that
>> >> freedom
>> >>>>> out
>> >>>>>>>>>> of the
>> >>>>>>>>>>>>>>> window.
>> >>>>>>>>>>>>>>> 1.b For strema-stream join, in our original design we
>> >>>> intend
>> >>>>> to
>> >>>>>>>>>>>> "never"
>> >>>>>>>>>>>>>>> want users to query the state, since it is just for
>> >>>> buffering
>> >>>>>> the
>> >>>>>>>>>>>>>> upcoming
>> >>>>>>>>>>>>>>> records from the stream. Now I know that some users may
>> >>>>> indeed
>> >>>>>>>>>> want
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>>> query it from the debugging perspective, but still I
>> >>>>> concerned
>> >>>>>>>>>> about
>> >>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes would be
>> >> the
>> >>>>> right
>> >>>>>>>>>>>> solution
>> >>>>>>>>>>>>>>> here. And adding Materialized object opens the door to
>> >> let
>> >>>>>> users
>> >>>>>>>>>>>> query
>> >>>>>>>>>>>>>>> about it (unless we did something intentionally to still
>> >>>>>> forbids
>> >>>>>>>>>> it),
>> >>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>> also restricts us in the future.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 2. About the coupling between Materialized.name() and
>> >>>>>> queryable:
>> >>>>>>>>>>>> again I
>> >>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if the
>> >>>> current
>> >>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the
>> >>>> best
>> >>>>>>>>>>>> approach.
>> >>>>>>>>>>>>>>> Here I think I agree with John, that generally speaking
>> >>>> it's
>> >>>>>>>>>> better
>> >>>>>>>>>>>> be a
>> >>>>>>>>>>>>>>> control function on the `KTable` itself, rather than on
>> >>>>>>>>>>>> `Materialized`,
>> >>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>> fixing it via adding functions through `Materialized`
>> >>>> seems
>> >>>>>> not a
>> >>>>>>>>>>>> natural
>> >>>>>>>>>>>>>> approach either.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I understand your thoughts here, and up to a point, I
>> >> agree
>> >>>>> with
>> >>>>>>>>>> you.
>> >>>>>>>>>>>>>> But concerning not providing Materialized as it may
>> >>>> restrict
>> >>>>> us
>> >>>>>> in
>> >>>>>>>>>> the
>> >>>>>>>>>>>>>> future for delivering different implementations, I'm
>> >>>> wondering
>> >>>>>> if
>> >>>>>>>>>> we
>> >>>>>>>>>>>> are
>> >>>>>>>>>>>>>> doing some premature optimization here.
>> >>>>>>>>>>>>>> My rationale for saying so
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>    1. I think the cost of not allowing the naming of
>> >> state
>> >>>>>> stores
>> >>>>>>>>>> for
>> >>>>>>>>>>>> joins
>> >>>>>>>>>>>>>>    is too big of a gap to leave.   IMHO for joins to
>> >> follow
>> >>>>> the
>> >>>>>>>>>> current
>> >>>>>>>>>>>>>>    pattern of using Materialized for naming state stores
>> >>>> would
>> >>>>>> be
>> >>>>>>>>>> what
>> >>>>>>>>>>>> most
>> >>>>>>>>>>>>>>    users would expect to use.  As I said in my comments
>> >>>>> above, I
>> >>>>>>>>>> think
>> >>>>>>>>>>>> we
>> >>>>>>>>>>>>>>    should *not include* the changes to Materialized and
>> >>>>> enforce
>> >>>>>>>>>> named
>> >>>>>>>>>>>>>>    stores for joins as unavailable for IQ.
>> >>>>>>>>>>>>>>    2. We'll still have the join methods available
>> >> without a
>> >>>>>>>>>>>> Materialized
>> >>>>>>>>>>>>>>    allowing us to do something different internally if a
>> >>>>>>>>>> Materialized
>> >>>>>>>>>>>> is
>> >>>>>>>>>>>>>> not
>> >>>>>>>>>>>>>>    provided.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use two
>> >> stones
>> >>>>>> rather
>> >>>>>>>>>>>> than
>> >>>>>>>>>>>>>> one
>> >>>>>>>>>>>>>>> to kill these two birds, and probably for this KIP we
>> >> just
>> >>>>>> focus
>> >>>>>>>>>> on
>> >>>>>>>>>>>> 1)
>> >>>>>>>>>>>>>>> above. And for that I'd like to not expose the
>> >>>> Materialized
>> >>>>>>>>>> either
>> >>>>>>>>>>>> for
>> >>>>>>>>>>>>>>> rationales that I've listed above. Instead, we just
>> >>>> restrict
>> >>>>>>>>>> KIP-307
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>> NOT
>> >>>>>>>>>>>>>>> use the Joined.name for state store names and always use
>> >>>>>> internal
>> >>>>>>>>>>>> names
>> >>>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not being
>> >>>> able
>> >>>>>> to
>> >>>>>>>>>>>> cover
>> >>>>>>>>>>>>>> all
>> >>>>>>>>>>>>>>> internal names here, but now I feel this `hole` may
>> >>>> better be
>> >>>>>>>>>> filled
>> >>>>>>>>>>>> by,
>> >>>>>>>>>>>>>>> e.g. not creating changelog topics but just use the
>> >>>> upstream
>> >>>>> to
>> >>>>>>>>>>>>>>> re-bootstrap the materialized store, more concretely:
>> >> when
>> >>>>>>>>>>>> materializing
>> >>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic on an
>> >>>>> existing
>> >>>>>>>>>>>> topic,
>> >>>>>>>>>>>>>> e.g.
>> >>>>>>>>>>>>>>> a) if the stream is coming directly from some source
>> >> topic
>> >>>>>>>>>> (including
>> >>>>>>>>>>>>>>> repartition topic), make that as changelog topic and if
>> >>>> it is
>> >>>>>>>>>>>> repartition
>> >>>>>>>>>>>>>>> topic change the retention / data purging policy
>> >>>> necessarily
>> >>>>> as
>> >>>>>>>>>>>> well; b)
>> >>>>>>>>>>>>>> if
>> >>>>>>>>>>>>>>> the stream is coming from some stateless operators,
>> >>>> delegate
>> >>>>>> that
>> >>>>>>>>>>>>>> stateless
>> >>>>>>>>>>>>>>> operator to the parent stream similar as a); if the
>> >>>> stream is
>> >>>>>>>>>> coming
>> >>>>>>>>>>>> from
>> >>>>>>>>>>>>>> a
>> >>>>>>>>>>>>>>> stream-stream join which is the only stateful operator
>> >>>> that
>> >>>>> can
>> >>>>>>>>>>>> result in
>> >>>>>>>>>>>>>> a
>> >>>>>>>>>>>>>>> stream, consider merging the join into multi-way joins
>> >>>> (yes,
>> >>>>>>>>>> this is
>> >>>>>>>>>>>> a
>> >>>>>>>>>>>>>> very
>> >>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we do not
>> >>>> try
>> >>>>> to
>> >>>>>>>>>>>> tackle it
>> >>>>>>>>>>>>>>> now but leave it for a better solution :).
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I really like this idea!  I agree with you in that this
>> >>>>> approach
>> >>>>>>>>>> to too
>> >>>>>>>>>>>>>> much for adding in this KIP, but we could pick it up
>> >> later
>> >>>> and
>> >>>>>>>>>>>> leverage the
>> >>>>>>>>>>>>>> Optimization framework to accomplish this re-use.
>> >>>>>>>>>>>>>> Again, while I agree we should break the naming of join
>> >>>> state
>> >>>>>>>>>> stores
>> >>>>>>>>>>>> from
>> >>>>>>>>>>>>>> KIP-307, IMHO it's something we should fix now as it will
>> >>>> be
>> >>>>> the
>> >>>>>>>>>> last
>> >>>>>>>>>>>> piece
>> >>>>>>>>>>>>>> we can provide to give users the ability to completely
>> >> make
>> >>>>>> their
>> >>>>>>>>>>>>>> topologies "upgrade proof" when adding additional
>> >>>> operations.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Thanks again to both of you for comments and I look
>> >>>> forward to
>> >>>>>>>>>> hearing
>> >>>>>>>>>>>> back
>> >>>>>>>>>>>>>> from you.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>> Bill
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <
>> >>>>>> wangg...@gmail.com>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Hello Bill,
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Thanks for the KIP. Glad to see that we can likely
>> >>>> shooting
>> >>>>> two
>> >>>>>>>>>> birds
>> >>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>> one stone. I have some concerns though about those "two
>> >>>>> birds"
>> >>>>>>>>>>>>>> themselves:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 1. About not breaking compatibility of stream-stream
>> >> join
>> >>>>>>>>>>>> materialized
>> >>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, but
>> >> after
>> >>>>>>>>>> thinking
>> >>>>>>>>>>>> about
>> >>>>>>>>>>>>>>> it once more I'm not sure if exposing Materialized would
>> >>>> be a
>> >>>>>>>>>> good
>> >>>>>>>>>>>>>> solution
>> >>>>>>>>>>>>>>> here. My rationles:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of
>> >>>> window-store
>> >>>>>> is
>> >>>>>>>>>> not
>> >>>>>>>>>>>>>> ideal,
>> >>>>>>>>>>>>>>> and we want to modify it in the near future to be more
>> >>>>>>>>>> efficient. Not
>> >>>>>>>>>>>>>>> allowing users to override such state store backend
>> >> gives
>> >>>> us
>> >>>>>> such
>> >>>>>>>>>>>> freedom
>> >>>>>>>>>>>>>>> (which was also considered in the original DSL design),
>> >>>>> whereas
>> >>>>>>>>>>>> getting a
>> >>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out that
>> >> freedom
>> >>>>> out
>> >>>>>>>>>> of the
>> >>>>>>>>>>>>>>> window.
>> >>>>>>>>>>>>>>> 1.b For strema-stream join, in our original design we
>> >>>> intend
>> >>>>> to
>> >>>>>>>>>>>> "never"
>> >>>>>>>>>>>>>>> want users to query the state, since it is just for
>> >>>> buffering
>> >>>>>> the
>> >>>>>>>>>>>>>> upcoming
>> >>>>>>>>>>>>>>> records from the stream. Now I know that some users may
>> >>>>> indeed
>> >>>>>>>>>> want
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>>> query it from the debugging perspective, but still I
>> >>>>> concerned
>> >>>>>>>>>> about
>> >>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes would be
>> >> the
>> >>>>> right
>> >>>>>>>>>>>> solution
>> >>>>>>>>>>>>>>> here. And adding Materialized object opens the door to
>> >> let
>> >>>>>> users
>> >>>>>>>>>>>> query
>> >>>>>>>>>>>>>>> about it (unless we did something intentionally to still
>> >>>>>> forbids
>> >>>>>>>>>> it),
>> >>>>>>>>>>>>>> which
>> >>>>>>>>>>>>>>> also restricts us in the future.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 2. About the coupling between Materialized.name() and
>> >>>>>> queryable:
>> >>>>>>>>>>>> again I
>> >>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if the
>> >>>> current
>> >>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at Materialized is the
>> >>>> best
>> >>>>>>>>>>>> approach.
>> >>>>>>>>>>>>>>> Here I think I agree with John, that generally speaking
>> >>>> it's
>> >>>>>>>>>> better
>> >>>>>>>>>>>> be a
>> >>>>>>>>>>>>>>> control function on the `KTable` itself, rather than on
>> >>>>>>>>>>>> `Materialized`,
>> >>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>> fixing it via adding functions through `Materialized`
>> >>>> seems
>> >>>>>> not a
>> >>>>>>>>>>>> natural
>> >>>>>>>>>>>>>>> approach either.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use two
>> >> stones
>> >>>>>> rather
>> >>>>>>>>>>>> than
>> >>>>>>>>>>>>>> one
>> >>>>>>>>>>>>>>> to kill these two birds, and probably for this KIP we
>> >> just
>> >>>>>> focus
>> >>>>>>>>>> on
>> >>>>>>>>>>>> 1)
>> >>>>>>>>>>>>>>> above. And for that I'd like to not expose the
>> >>>> Materialized
>> >>>>>>>>>> either
>> >>>>>>>>>>>> for
>> >>>>>>>>>>>>>>> rationales that I've listed above. Instead, we just
>> >>>> restrict
>> >>>>>>>>>> KIP-307
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>> NOT
>> >>>>>>>>>>>>>>> use the Joined.name for state store names and always use
>> >>>>>> internal
>> >>>>>>>>>>>> names
>> >>>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not being
>> >>>> able
>> >>>>>> to
>> >>>>>>>>>>>> cover
>> >>>>>>>>>>>>>> all
>> >>>>>>>>>>>>>>> internal names here, but now I feel this `hole` may
>> >>>> better be
>> >>>>>>>>>> filled
>> >>>>>>>>>>>> by,
>> >>>>>>>>>>>>>>> e.g. not creating changelog topics but just use the
>> >>>> upstream
>> >>>>> to
>> >>>>>>>>>>>>>>> re-bootstrap the materialized store, more concretely:
>> >> when
>> >>>>>>>>>>>> materializing
>> >>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic on an
>> >>>>> existing
>> >>>>>>>>>>>> topic,
>> >>>>>>>>>>>>>> e.g.
>> >>>>>>>>>>>>>>> a) if the stream is coming directly from some source
>> >> topic
>> >>>>>>>>>> (including
>> >>>>>>>>>>>>>>> repartition topic), make that as changelog topic and if
>> >>>> it is
>> >>>>>>>>>>>> repartition
>> >>>>>>>>>>>>>>> topic change the retention / data purging policy
>> >>>> necessarily
>> >>>>> as
>> >>>>>>>>>>>> well; b)
>> >>>>>>>>>>>>>> if
>> >>>>>>>>>>>>>>> the stream is coming from some stateless operators,
>> >>>> delegate
>> >>>>>> that
>> >>>>>>>>>>>>>> stateless
>> >>>>>>>>>>>>>>> operator to the parent stream similar as a); if the
>> >>>> stream is
>> >>>>>>>>>> coming
>> >>>>>>>>>>>>>> from a
>> >>>>>>>>>>>>>>> stream-stream join which is the only stateful operator
>> >>>> that
>> >>>>> can
>> >>>>>>>>>>>> result
>> >>>>>>>>>>>>>> in a
>> >>>>>>>>>>>>>>> stream, consider merging the join into multi-way joins
>> >>>> (yes,
>> >>>>>>>>>> this is
>> >>>>>>>>>>>> a
>> >>>>>>>>>>>>>> very
>> >>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we do not
>> >>>> try
>> >>>>> to
>> >>>>>>>>>>>> tackle it
>> >>>>>>>>>>>>>>> now but leave it for a better solution :).
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Guozhang
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler <
>> >>>>>> j...@confluent.io
>> >>>>>>>>>>>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Hi Bill,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Thanks for the KIP! Awesome job catching this
>> >> unexpected
>> >>>>>>>>>>>> consequence
>> >>>>>>>>>>>>>>>> of the prior KIPs before it was released.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> The proposal looks good to me. On top of just fixing
>> >> the
>> >>>>>>>>>> problem,
>> >>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>> seems to address two other pain points:
>> >>>>>>>>>>>>>>>> * that naming a state store automatically causes it to
>> >>>>> become
>> >>>>>>>>>>>>>> queriable.
>> >>>>>>>>>>>>>>>> * that there's currently no way to configure the bytes
>> >>>> store
>> >>>>>>>>>> for
>> >>>>>>>>>>>> join
>> >>>>>>>>>>>>>>>> windows.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> It's awesome that we can fix this issue and two others
>> >>>> with
>> >>>>>> one
>> >>>>>>>>>>>>>> feature.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the truth
>> >>>> table
>> >>>>>>>>>>>> involving
>> >>>>>>>>>>>>>>>> whether a Materialized is stored or not and querying is
>> >>>>>>>>>>>>>>>> enabled/disabled... What should be the behavior if
>> >> there
>> >>>> is
>> >>>>> no
>> >>>>>>>>>>>> store
>> >>>>>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) and
>> >>>>>>>>>> querying is
>> >>>>>>>>>>>>>>>> enabled?
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> It seems we have two choices:
>> >>>>>>>>>>>>>>>> 1. we can force creation of a state store in this case,
>> >>>> so
>> >>>>> the
>> >>>>>>>>>>>> store
>> >>>>>>>>>>>>>>>> can be used to serve the queries
>> >>>>>>>>>>>>>>>> 2. we can provide just a queriable view, basically
>> >>>> letting
>> >>>>> IQ
>> >>>>>>>>>> query
>> >>>>>>>>>>>>>>>> into the "KTableValueGetter", which would transparently
>> >>>>>>>>>> construct
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>> query response by applying the operator logic to the
>> >>>>> upstream
>> >>>>>>>>>>>> state if
>> >>>>>>>>>>>>>>>> the operator state isn't already stored.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Offhand, it seems like the second is actually a pretty
>> >>>>> awesome
>> >>>>>>>>>>>>>>>> capability. But it might have an awkward interaction
>> >> with
>> >>>>> the
>> >>>>>>>>>>>> current
>> >>>>>>>>>>>>>>>> semantics. Presently, if I provide a
>> >>>> Materialized.withName,
>> >>>>> it
>> >>>>>>>>>>>> implies
>> >>>>>>>>>>>>>>>> that querying should be enabled AND that the view
>> >> should
>> >>>>>>>>>> actually
>> >>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>> stored in a state store. Under option 2 above, this
>> >>>> behavior
>> >>>>>>>>>> would
>> >>>>>>>>>>>>>>>> change to NOT provision a state store and instead just
>> >>>>> consult
>> >>>>>>>>>> the
>> >>>>>>>>>>>>>>>> ValueGetter. To get back to the current behavior, users
>> >>>>> would
>> >>>>>>>>>> have
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>> add a "bytes store supplier" to the Materialized to
>> >>>> indicate
>> >>>>>>>>>> that,
>> >>>>>>>>>>>>>>>> yes, they really want a state store there.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Behavior changes are always kind of scary, but I think
>> >> in
>> >>>>> this
>> >>>>>>>>>>>> case,
>> >>>>>>>>>>>>>>>> it might actually be preferable. In the event where
>> >> only
>> >>>> the
>> >>>>>>>>>> name
>> >>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>> provided, it means that people just wanted to make the
>> >>>>>>>>>> operation
>> >>>>>>>>>>>>>>>> result queriable. If we automatically convert this to a
>> >>>>>>>>>> non-stored
>> >>>>>>>>>>>>>>>> view, then simply upgrading results in the same
>> >>>> observable
>> >>>>>>>>>> behavior
>> >>>>>>>>>>>>>>>> and semantics, but a linear reduction in local storage
>> >>>>>>>>>> requirements
>> >>>>>>>>>>>>>>>> and disk i/o, as well as a corresponding linear
>> >>>> reduction in
>> >>>>>>>>>> memory
>> >>>>>>>>>>>>>>>> usage both on and off heap.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> What do you think?
>> >>>>>>>>>>>>>>>> -John
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <
>> >>>>>> bbej...@gmail.com
>> >>>>>>>>>>>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> All,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I'd like to start a discussion for adding a
>> >> Materialized
>> >>>>>>>>>>>>>> configuration
>> >>>>>>>>>>>>>>>>> object to KStream.join for naming state stores
>> >> involved
>> >>>> in
>> >>>>>>>>>> joins.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Your comments and suggestions are welcome.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>>>>>>> Bill
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>> -- Guozhang
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> --
>> >>>>>>>>>>>>> -- Guozhang
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> --
>> >>>>>>>>>>> -- Guozhang
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> -- Guozhang
>> >>>>
>> >>>
>> >>
>> >
>> >
>>
>>

Reply via email to