Thanks Paul!

On 8/15/19 7:28 PM, Paul Whalen wrote:
> I updated the KIP (and PR) to relax the restriction on connecting state
> stores via either means; it definitely makes sense to me at this point.
> I'd love to hear if there are any other concerns or broad objections to the
> KIP.
> 
> Paul
> 
> On Thu, Aug 8, 2019 at 10:12 PM Paul Whalen <pgwha...@gmail.com> wrote:
> 
>> Matthias,
>>
>> You did summarize my thinking correctly, thanks for writing it out.  I
>> think the disconnect on opinion is due to a couple things influenced by my
>> habits while writing streams code:
>>
>> 1) I don't see state stores that are "individually owned" versus "shared"
>> as that much different at all, at least from the perspective of the
>> business logic for the Processor. So it is actually a negative to separate
>> the connecting of stores, because it appears in the topology wiring that
>> fewer stores are being used by the Processor than actually are.  A reader
>> might assume that the Processor doesn't need other state to do its job
>> which could cause confusion.
>> 2) In practice, my addProcessor() and addStateStore() (or
>> builder.addStateStore() and stream.process() ) calls are very near each
>> other anyway, so the shared dependency on StoreBuilder is not a burden;
>> passing the same object could even bring clarity to the idea that the store
>> is shared and not individually owned.
>>
>> Hearing your thoughts though, I think I have imposed a bit too much of my
>> own style and assumptions on the API, especially with the shared dependency
>> on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
>> going to update the KIP since the one +1 vote comes from John who is favor
>> of relaxing the restriction anyway.
>>
>> Paul
>>
>> On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> I am not sure if I full understand, hence, I try to rephrase:
>>>
>>>> I can't think of an example that would require both ways, or would
>>>> even be more readable using both ways.
>>>
>>> Example:
>>>
>>> There are two processor A and B, and one store S that both need to
>>> access and one store S_b that only B needs to access:
>>>
>>> If we don't allow to mix both approaches, it would be required to write
>>> the following code:
>>>
>>>   Topology t = new Topology();
>>>   t.addProcessor("A", ...); // does not add any store
>>>   t.addProceccor("B", ...); // does not add any store
>>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>>>
>>> // DSL example:
>>>
>>>   StreamsBiulder b = new StreamsBuilder();
>>>   b.addStateStore() // adds S
>>>   b.addStateStore() // adds S_b
>>>   stream1.process(..., "S") // add A and connect S
>>>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>>>
>>>
>>> If we allow to mixes both approaches, the code could be (simplified to):
>>>
>>>   Topology t = new Topology();
>>>   t.addProcessor("A", ...); // does not add any store
>>>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>>
>>> // DSL example
>>>
>>>   StreamsBiulder b = new StreamsBuilder();
>>>   b.addStateStore() // adds S
>>>   stream1.process(..., "S") // add A and connect S
>>>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
>>> implicitly
>>>
>>> The fact that B has a "private store" could be encapsulated and I don't
>>> see why this would be bad?
>>>
>>>> If you can
>>>> do both ways, the actual full set of state stores being connected could
>>> be
>>>> in wildly different places in the code, which could create confusion.
>>>
>>> Ie, I don't see why the second version would be confusing, or why the
>>> first version would be more readable (I don't argue it's less readable
>>> either though; I think both are equally readable)?
>>>
>>>
>>>
>>> Or do you argue that we should allow the following:
>>>
>>>> Shared stores can be passed from
>>>> the outside in an anonymous ProcessorSupplier if desired, making it
>>>> effectively the same as passing the stateStoreNames var args
>>>
>>>   Topology t = new Topology();
>>>   t.addProcessor("A", ...); // adds/connects S implicitly
>>>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>>>
>>> // DSL example
>>>
>>>   StreamsBiulder b = new StreamsBuilder();
>>>   stream1.process(...) // add A and add/connect S implicitly
>>>   stream2.process(...) // add B and add/connect S and S_b implicitly
>>>
>>> For this case, the second implicit adding of S would require to return
>>> the same `StoreBuilder` instance to make it idempotent what seems hard
>>> to achieve, because both `ProcessorSuppliers` now have a cross
>>> dependency to us the same object.
>>>
>>> Hence, I don't think this would be a good approach.
>>>
>>>
>>> Also, because we require for a unique store name to always pass the same
>>> `StoreBuilder` instance, we have actually a good protection against user
>>> bug that may add two stores with the same name but different builders
>>> twice.
>>>
>>>
>>> I also do not feel super strong about it, but see some advantages to
>>> allow the mixed approach, and don't see disadvantages. Would be good to
>>> get input from others, too.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 8/7/19 7:29 PM, Paul Whalen wrote:
>>>> My thinking on restricting the API to enforce only one way of connecting
>>>> stores would make it more simple to use and end up with more readable
>>>> code.  I can't think of an example that would require both ways, or
>>> would
>>>> even be more readable using both ways.  Shared stores can be passed from
>>>> the outside in an anonymous ProcessorSupplier if desired, making it
>>>> effectively the same as passing the stateStoreNames var args.  If you
>>> can
>>>> do both ways, the actual full set of state stores being connected could
>>> be
>>>> in wildly different places in the code, which could create confusion.  I
>>>> personally can't imagine a case in which that would be useful.
>>>>
>>>> All that being said, I don't feel terribly strongly about it.  I'm just
>>>> trying to make the API as straightforward as possible.  Admittedly a
>>>> runtime check doesn't make for a great API, but I see it more as an
>>>> opportunity to educate the user to make it clear that "connecting" a
>>> state
>>>> store is a thing that can be done in two different ways, but there is no
>>>> reason to mix both.  If it seems like there's a compelling reason to mix
>>>> them then I would abandon the idea in a heartbeat.
>>>>
>>>> Paul
>>>>
>>>> On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> Sorry for the long silence on this KIP Paul! I guess the 2.3 release
>>>>> distracted us somewhat.
>>>>>
>>>>> Overall, I am +1.
>>>>>
>>>>> With regard to John's point about owned vs shared state stores, I think
>>>>> it describe a valid use case, and throwing an exception if people want
>>>>> to mix both features might be too restrictive?
>>>>>
>>>>> We could of course later relax the restriction, but atm I am not sure
>>>>> what the main argument for adding the restriction is?
>>>>>
>>>>> (a) In the current API, one could connect the same store multiple times
>>>>> to the same processor without getting an exception, because the
>>>>> operation is idempotent.
>>>>>
>>>>> (b) The KIP also suggest to relax the current restriction to add the
>>>>> same store twice, as long as store name and `StoreBuilder` instance are
>>>>> the same, because it's an idempotent (hence, safe) operation too.
>>>>>
>>>>> Because we have already (a) and (b) and consider both as safe, it seems
>>>>> we could also treat the case of mixing both patterns as idempotent and
>>>>> hence safe. And if we do this, we enable to mix both patterns for
>>>>> different stores implicitly.
>>>>>
>>>>>
>>>>> Thoughts?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 6/17/19 2:31 PM, John Roesler wrote:
>>>>>> Hey, all,
>>>>>>
>>>>>> Sorry I'm late to the party. I meant to read into this KIP before, but
>>>>>> didn't get around to it. I was just reminded when Paul mentioned it in
>>>>>> a different thread. Please feel free to bump a discussion any time it
>>>>>> stalls!
>>>>>>
>>>>>> I've just read through the whole discussion so far, and, to echo the
>>>>>> earlier sentiments, the motivation seems very clear. I remember how
>>>>>> hard it was to figure out how to actually wire up a stateful processor
>>>>>> properly the first couple of times. Not a very good user experience.
>>>>>>
>>>>>> I looked over the whole conversation to date, as well as the KIP and
>>>>>> the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
>>>>>> current approach looks good to me. I was concerned about the "cheat
>>>>>> codes"-style mixin interface. Discoverability would have been a
>>>>>> problem, and it's also not a very normal pattern for Java APIs. It
>>>>>> actually looks a little more like something you'd do with an
>>>>>> annotation.
>>>>>>
>>>>>> So the current approach seems good:
>>>>>> * The new interface with a default to return `null` is effectively
>>>>>> shipping the feature flagged "off" (which is nice and safe)
>>>>>> * Shared stores are "supported" the same way they always have been, by
>>>>>> connecting them externally. This makes sense, since those stores
>>>>>> aren't "owned" by any of the connected processors.
>>>>>> * Processors that do own their stores can configure them in the same
>>>>>> file they use them, which decreases the probability of cast exceptions
>>>>>> when they get the stores from the context.
>>>>>> * Stateful processors that own their stores are available for one-shot
>>>>>> definition of the stores and the processor all in the same file (this
>>>>>> is the main point of the KIP)
>>>>>>
>>>>>> The runtime check that stores can't be both defined in the processor
>>>>>> and referenced by name might be a little restrictive (since we already
>>>>>> have the restriction that same-name stores can't be registered), but
>>>>>> it would also be easy to remove it later. I'm just thinking that if I
>>>>>> have a processor that owns one store and shares another, it would be
>>>>>> pretty obvious how to hook it up in the proposed API, except for that
>>>>>> check.
>>>>>>
>>>>>> One last thought, regarding the all-important interface name: If you
>>>>>> wanted to indicate more that the stores are available for Streams to
>>>>>> connect, rather than that they are already connected, you could call
>>>>>> it ConnectableStoreProvider (similar to AutoCloseable).
>>>>>>
>>>>>> I just thought I'd summarize the current state, since it's been a
>>>>>> while and no one has voted yet. I'll go ahead and vote now on the
>>>>>> voting thread, since I'm +1 on the current proposal.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>> On Mon, May 27, 2019 at 1:59 PM Paul Whalen <pgwha...@gmail.com>
>>> wrote:
>>>>>>>
>>>>>>> It wasn't much of a lift changing option B to work for option C, so I
>>>>>>> closed that PR and made a new one, which should be identical to the
>>> KIP
>>>>>>> right now: https://github.com/apache/kafka/pull/6824.  There are a
>>> few
>>>>>>> todos still which I will hold off until the KIP is accepted.
>>>>>>>
>>>>>>> I created a voting thread about a month ago, so I'll bump that now
>>> that
>>>>>>> we're nearly there.
>>>>>>>
>>>>>>> Paul
>>>>>>>
>>>>>>> On Sun, May 26, 2019 at 2:21 PM Paul Whalen <pgwha...@gmail.com>
>>> wrote:
>>>>>>>
>>>>>>>> Per Matthias's suggestion from a while ago, I actually implemented a
>>>>> good
>>>>>>>> amount of option B to get a sense of the user experience and
>>>>> documentation
>>>>>>>> requirements.  For a few reasons mentioned below, I think it's not
>>> my
>>>>>>>> favorite option, and I prefer option C.  But since I did the work
>>> and
>>>>> it
>>>>>>>> can help discussion, I may as well share:
>>>>>>>> https://github.com/apache/kafka/pull/6821.
>>>>>>>>
>>>>>>>> Things I learned along the way implementing Option B:
>>>>>>>>  - For the name of the interface, I like ConnectedStoreProvider.  It
>>>>> isn't
>>>>>>>> perfect but it seems to capture the general gist without being
>>> overly
>>>>>>>> verbose.  I get that from a strict standpoint it's not "providing
>>>>> connected
>>>>>>>> stores" but is instead "providing stores to be connected," but I
>>> think
>>>>> that
>>>>>>>> in context and with documentation, the risk of someone being
>>> confused
>>>>> by
>>>>>>>> that is low.
>>>>>>>>  - I definitely felt the discoverability issue while trying to write
>>>>> clear
>>>>>>>> documentation; you really have to make sure to connect the dots for
>>> the
>>>>>>>> user when the interface isn't connected to anything.
>>>>>>>>  - Another problem with a separate interface found while writing
>>>>>>>> tests/examples: defining a ProcessorSupplier that also implements
>>>>>>>> ConnectedStoreProvider cannot be done anonymously, since you can't
>>>>> define
>>>>>>>> an anonymous class in Java that implements multiple interfaces.  I
>>>>> actually
>>>>>>>> consider this a fairly major usability issue - it means a user
>>> always
>>>>> has
>>>>>>>> to have a custom class rather than doing it inline.  We could
>>> provide
>>>>> an
>>>>>>>> abstract class that implements the two, but at that point, we're not
>>>>> that
>>>>>>>> far from option A or C anyway.
>>>>>>>>
>>>>>>>> I updated the KIP with my current thinking, which as mentioned is
>>>>>>>> Matthias's option C.  Once again for clarity, that *is not* what is
>>> in
>>>>> the
>>>>>>>> linked pull request.  The current KIP is my proposal.
>>>>>>>>
>>>>>>>> Thanks everyone for the input!
>>>>>>>>
>>>>>>>> P.S.  What do folks use to edit the HTML documentation, e.g.
>>>>>>>> processor-api.html?  I looked at doing it by hand it but it kind of
>>>>> looked
>>>>>>>> like agony with all the small tags required for formatting code, so
>>> I'm
>>>>>>>> sort of assuming there's tooling for it.
>>>>>>>>
>>>>>>>> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the discussion mixed approaches a little bit, hence, let me
>>>>>>>>> rephrase my understanding:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> A) add new method with default implementation to
>>> `ProcessorSupplier`:
>>>>>>>>>
>>>>>>>>> For this case, we don't add a new interface, but only add a new
>>> method
>>>>>>>>> to `ProcessorSupplier` -- to keep backward compatibility, we need
>>> to
>>>>> add
>>>>>>>>> a default implementation. Users opt into the new feature by
>>>>> overwriting
>>>>>>>>> the default implementation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> B) We add a new interface with new method:
>>>>>>>>>
>>>>>>>>> For this case, `ProcessorSupplier` interface is not changed and it
>>>>> does
>>>>>>>>> also _not_ extend the new interface. Because `ProcessorSupplier` is
>>>>> not
>>>>>>>>> changed, it's naturally backward compatible. Users opt into the new
>>>>>>>>> feature, by adding the new interface to their ProcessorSupplier
>>>>>>>>> implementation and they need to implement the new method because
>>> there
>>>>>>>>> is no default implementation. Kafka Streams can use `instanceof` to
>>>>>>>>> detect if the new interface is used or not and thus, to the right
>>>>> thing.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What was also discussed is a mix of both:
>>>>>>>>>
>>>>>>>>> C) We add a new interface with new method and let
>>> `ProcessorSupplier`
>>>>>>>>> extend the new interface:
>>>>>>>>>
>>>>>>>>> Here, we need to add a default implementation to preserve backward
>>>>>>>>> compatibility. Similar to (A), users opt into the feature by
>>>>> overwriting
>>>>>>>>> the default implementation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Option (C) is the same as (A) from a user point of view because a
>>> user
>>>>>>>>> won't care about the new interface. It only makes a difference for
>>> our
>>>>>>>>> code base, as we can share the default implementation of the new
>>>>> method
>>>>>>>>> This is only a small gain, as the implementation is trivial but
>>> also a
>>>>>>>>> small drawback as we add new public interface that is useless to
>>> the
>>>>>>>>> user because the user would never implement the interface directly.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> For (A/C), it might be simpler for users to detect the feature. For
>>>>> (B),
>>>>>>>>> we have the advantage that users must implement the method if they
>>> use
>>>>>>>>> the new interface.
>>>>>>>>>
>>>>>>>>> Overall, it seems that (A) might be the best choice because it
>>> makes
>>>>> the
>>>>>>>>> feature easier discoverable and does not add a "useless"
>>> interface. If
>>>>>>>>> you want to go with (C) to share the default implementation code,
>>>>> that's
>>>>>>>>> also fine with me. I am convinced now (even if I brought it up),
>>> that
>>>>>>>>> (B) might be not optimal because feature discoverability seems to
>>> be
>>>>>>>>> important.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> About `null` vs `emptyList`: I still tend to like `null` better but
>>>>> it's
>>>>>>>>> really a detail and not too important. Note, that the question only
>>>>>>>>> arises for (A/C), but not for (B) because for (B) we don't need a
>>>>>>>>> default implementation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> @Paul: It's unclear to me atm what your final proposal is because
>>> you
>>>>>>>>> mentioned that you might want to rename `StateStoreConnector`? It's
>>>>> also
>>>>>>>>> unclear to me atm, if you prefer (A), (B), or (C).
>>>>>>>>>
>>>>>>>>> Maybe you can update the KIP if necessary and clearly state what
>>> you
>>>>>>>>> final proposal is. Beside this, it seems we can move to a VOTE?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 5/2/19 3:01 PM, Bruno Cadonna wrote:
>>>>>>>>>> Hi Paul,
>>>>>>>>>>
>>>>>>>>>> I will try to express myself a bit clearer.
>>>>>>>>>>
>>>>>>>>>> Ad 1)
>>>>>>>>>> My assumption is that if `StateStoreConnector#stateStores()`
>>> returns
>>>>>>>>> `null`
>>>>>>>>>> Kafka Streams will throw an NPE because on purpose no null check
>>> is
>>>>>>>>>> performed before the loop that calls
>>>>> `StreamsBuilder#addStateStore()`.
>>>>>>>>> When
>>>>>>>>>> the user finally understands the cause of the NPE, she knows that
>>> she
>>>>>>>>> has
>>>>>>>>>> to override `StateStoreConnector#stateStores()` in her
>>>>> implementation.
>>>>>>>>> My
>>>>>>>>>> question was, why let the user discover that she has to overwrite
>>> the
>>>>>>>>>> method at runtime if you could not provide a default
>>> implementation
>>>>> for
>>>>>>>>>> `StateStoreConnector#stateStores()` and let the compiler tell the
>>>>> user
>>>>>>>>> the
>>>>>>>>>> need to overwrite the method. Not providing a default
>>> implementation
>>>>>>>>>> without separating the interfaces implies not being
>>>>> backward-compatible.
>>>>>>>>>> That means, if we choose to not provide a default implementation
>>> and
>>>>> let
>>>>>>>>>> the compiler signal the necessity to override the method, we have
>>> to
>>>>>>>>>> separate the interfaces in any case.
>>>>>>>>>>
>>>>>>>>>> Ad 2)
>>>>>>>>>> If you check for `null` or empty list in `process` and do not call
>>>>>>>>>> `addStateStores` in those cases, the advantage of returning `null`
>>>>> to be
>>>>>>>>>> saver to detect bugs as mentioned by Matthias would be lost. But
>>>>> maybe
>>>>>>>>> I am
>>>>>>>>>> missing something here.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Bruno
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, May 1, 2019 at 6:27 AM Paul Whalen <pgwha...@gmail.com>
>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I definitely don't mind anyone jumping, Bruno, thanks for the
>>>>> comments!
>>>>>>>>>>>
>>>>>>>>>>> 1) I'm not totally sure I'm clear on your point, but I think
>>> we're
>>>>> on
>>>>>>>>> the
>>>>>>>>>>> same page - if we're adding a method to the XSupplier interfaces
>>> (by
>>>>>>>>> making
>>>>>>>>>>> them inherit from a super interface StateStoreConnector) then we
>>>>>>>>> definitely
>>>>>>>>>>> need a default implementation to maintain compatibility.  Whether
>>>>> the
>>>>>>>>>>> default implementation returns null or an empty list is somewhat
>>> of
>>>>> a
>>>>>>>>>>> detail.
>>>>>>>>>>>
>>>>>>>>>>> 2) If stream.process() sees that
>>> StateStoreConnector#stateStores()
>>>>>>>>> returns
>>>>>>>>>>> either null or an empty list, it would handle that case
>>> specifically
>>>>>>>>> and
>>>>>>>>>>> not try to call addStateStore at all.  Or is this not what you're
>>>>>>>>> asking?
>>>>>>>>>>>
>>>>>>>>>>> Separately, I'm still hacking away at the details of the PR and
>>> will
>>>>>>>>>>> continue to get something into a discussable state, but I'll
>>> share
>>>>> some
>>>>>>>>>>> thoughts I've run into.
>>>>>>>>>>>
>>>>>>>>>>> A) I'm tentatively going the separate interface route (Matthias's
>>>>>>>>>>> suggestion) and naming it ConnectedStoreProvider.  Still don't
>>> love
>>>>> the
>>>>>>>>>>> name, but there's something nice about the name indicating *why*
>>>>> this
>>>>>>>>> thing
>>>>>>>>>>> is providing the store, not just that it is providing it.
>>>>>>>>>>>
>>>>>>>>>>> B) It has occurred to me that topology.addProcessor() could also
>>>>>>>>> recognize
>>>>>>>>>>> if ProcessorSupplier implements ConnectedStoreProvider and add
>>> and
>>>>>>>>> connect
>>>>>>>>>>> stores appropriately.  This isn't in the KIP and I think the
>>>>> value-add
>>>>>>>>> is
>>>>>>>>>>> lower (if you're reaching that low level, surely the "auto
>>>>> add/connect
>>>>>>>>>>> store" isn't too important to you), but I think it would be a
>>>>>>>>> confusing if
>>>>>>>>>>> it didn't, and I don't see any real downside.
>>>>>>>>>>>
>>>>>>>>>>> Paul
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna <
>>> br...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> @Paul: Thank you for the KIP!
>>>>>>>>>>>>
>>>>>>>>>>>> I hope you do not mind that I jump in.
>>>>>>>>>>>>
>>>>>>>>>>>> I have the following comments:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) `null` vs empty list in the default implementation
>>>>>>>>>>>> IIUC, returning `null` in the default implementation should
>>>>> basically
>>>>>>>>>>>> signal that the method `stateStores` was not overridden. Why
>>> then
>>>>>>>>>>> provide a
>>>>>>>>>>>> default implementation in the first place? Without default
>>>>>>>>> implementation
>>>>>>>>>>>> you would discover the missing implementation already at
>>>>> compile-time
>>>>>>>>> and
>>>>>>>>>>>> not only at runtime. If you decide not to provide a default
>>>>>>>>>>> implementation,
>>>>>>>>>>>> `XSupplier extends StateStoreConnector` would break existing
>>> code
>>>>> as
>>>>>>>>>>>> Matthias has already pointed out.
>>>>>>>>>>>>
>>>>>>>>>>>> 2) `process` method adding the StoreBuilders to the topology
>>>>>>>>>>>> If the default implementation returned `null` and `XSupplier
>>>>> extends
>>>>>>>>>>>> StateStoreConnector`, then existing code would break, because
>>>>>>>>>>>> `StreamsBuilder#addStateStore()` would throw a NPE.
>>>>>>>>>>>>
>>>>>>>>>>>> +1 for opening a WIP PR
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Bruno
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax <
>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thank Paul!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I agree with all of that. If we think that the general design
>>> is
>>>>>>>>> good,
>>>>>>>>>>>>> refactoring a PR if we want to pick a different name should
>>> not be
>>>>>>>>> too
>>>>>>>>>>>>> much additional work (hopefully). Thus, if you want to open a
>>> WIP
>>>>> PR
>>>>>>>>>>> and
>>>>>>>>>>>>> we use it to nail the open details, it might help to find a
>>> good
>>>>>>>>>>>>> conclusion.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) Default method vs new interface:
>>>>>>>>>>>>>
>>>>>>>>>>>>> This seems to be the hardest tradeoff. I see the point about
>>>>>>>>>>>>> discoveability... Might be good to get input from others, which
>>>>>>>>> version
>>>>>>>>>>>>> they would prefer.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Just to make clear, my suggestion from the last email was, that
>>>>>>>>>>>>> `Transformer` etc does not extend the new interface. Instead, a
>>>>> user
>>>>>>>>>>>>> that want to use this feature would need to implement both
>>>>>>>>> interfaces.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If `Transformer extends StoreProvider` (just picking a name
>>> here)
>>>>>>>>>>>>> without default implementation existing code would break and
>>> thus
>>>>> it
>>>>>>>>>>> not
>>>>>>>>>>>>> a an option because of breaking backward compatibility.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 4/28/19 8:37 PM, Paul Whalen wrote:
>>>>>>>>>>>>>> Great thoughts Matthias, thanks! I think we're all agreed that
>>>>>>>>> naming
>>>>>>>>>>>> and
>>>>>>>>>>>>>> documentation/education are the biggest hurdles for this KIP,
>>>>> and in
>>>>>>>>>>>>> light
>>>>>>>>>>>>>> of that, I think it makes sense for me to just take a stab at
>>> a
>>>>> full
>>>>>>>>>>>>>> fledged PR with documentation to convince us that it's
>>> possible
>>>>> to
>>>>>>>>> do
>>>>>>>>>>>> it
>>>>>>>>>>>>>> with enough clarity.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In response to your specific thoughts:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) StateStoreConnector as a name: Really good point about
>>>>> defining
>>>>>>>>>>> the
>>>>>>>>>>>>>> difference between "adding" and "connecting."  Guozhang
>>> suggested
>>>>>>>>>>>>>> StateStoreConnector which was definitely an improvement over
>>> my
>>>>>>>>>>>>>> StateStoresSupplier, but I think you're right that we need to
>>> be
>>>>>>>>>>>> careful
>>>>>>>>>>>>> to
>>>>>>>>>>>>>> make it clear that it's really accomplishing both.  Thinking
>>>>> about
>>>>>>>>> it
>>>>>>>>>>>>> now,
>>>>>>>>>>>>>> one problem with Connector is that the implementer of the
>>>>> interface
>>>>>>>>>>> is
>>>>>>>>>>>>> not
>>>>>>>>>>>>>> really doing any connecting, it's providing/supplying the
>>> store
>>>>> that
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be both added and connected.  StoreProvider seems reasonable
>>> to
>>>>> me
>>>>>>>>>>> and
>>>>>>>>>>>>>> probably the best candidate at the moment, but it would be
>>> nice
>>>>> if
>>>>>>>>>>> the
>>>>>>>>>>>>> name
>>>>>>>>>>>>>> could convey that it's providing the store specifically so the
>>>>>>>>> caller
>>>>>>>>>>>> can
>>>>>>>>>>>>>> add it to the topology and connect it to the associated
>>>>> transformer.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In general I think that really calling out what "adding"
>>> versus
>>>>>>>>>>>>>> "connecting" is in the documentation will help make the entire
>>>>>>>>>>> purpose
>>>>>>>>>>>> of
>>>>>>>>>>>>>> this feature more clear to the user.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2) Default method vs new interface: The choice of a default
>>>>> method
>>>>>>>>>>> was
>>>>>>>>>>>>>> influenced by Guozhang's fear about API
>>> bloat/discoverability.  I
>>>>>>>>> can
>>>>>>>>>>>>>> definitely see it both ways   Would the separate interface be
>>> a
>>>>>>>>>>>>>> sub-interface of Processor/TransformerSupplier or
>>> standalone?  It
>>>>>>>>>>> seems
>>>>>>>>>>>>>> like you're suggesting standalone and I think that's what I
>>>>> favor.
>>>>>>>>>>> My
>>>>>>>>>>>>> only
>>>>>>>>>>>>>> concern there is that the interface wouldn't actually be a
>>> type
>>>>> to
>>>>>>>>>>> any
>>>>>>>>>>>>>> public API which sort of hurts discoverability.  You would
>>> have
>>>>> to
>>>>>>>>>>> read
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> javadocs for stream.process/transform() to discover that
>>>>>>>>> implementing
>>>>>>>>>>>> the
>>>>>>>>>>>>>> interface in addition to Processor/TransformerSupplier would
>>> add
>>>>> and
>>>>>>>>>>>>>> connect the store for you.  But that added burden actually
>>>>> probably
>>>>>>>>>>>> helps
>>>>>>>>>>>>>> us in terms of making sure people don't mix and match, like
>>> you
>>>>>>>>> said.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3) Returning null instead of empty: Seems fair to me.  I
>>> always
>>>>>>>>> worry
>>>>>>>>>>>>> about
>>>>>>>>>>>>>> returning null when an empty collection can be used instead,
>>> but
>>>>>>>>>>> given
>>>>>>>>>>>>> that
>>>>>>>>>>>>>> the library is the caller rather than the client I think your
>>>>> point
>>>>>>>>>>>> makes
>>>>>>>>>>>>>> sense here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 4) Returning Set instead of Collection: Agreed, don't see why
>>>>> not to
>>>>>>>>>>>> make
>>>>>>>>>>>>>> it more specific.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax <
>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, sorry for the long pause. Just trying to catch up here.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think it save to allow `addStateStore()` to be idempotent
>>> for
>>>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>>>>> `StoreBuilder` object. In fact, the `name` is "hard coded"
>>> and
>>>>> thus
>>>>>>>>>>>> it's
>>>>>>>>>>>>>>> not really possible to use the same `StoreBuilder` object to
>>>>> create
>>>>>>>>>>>>>>> different stores.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I also agree with the concern, that only allowing a single
>>> store
>>>>>>>>> (as
>>>>>>>>>>>>>>> proposed by Ivan) might be too restrictive.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Overall, the current KIP version LGTM. I don't have mayor
>>>>> concerns
>>>>>>>>>>>> about
>>>>>>>>>>>>>>> user education for this case, but I agree that we need to
>>>>> document
>>>>>>>>>>>> this
>>>>>>>>>>>>>>> clearly.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Some further comments:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (1) I am not sure if `StateStoreConnector` is the best name
>>> for
>>>>> the
>>>>>>>>>>>> new
>>>>>>>>>>>>>>> interface. Note, that there are two concepts about stores:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  - adding a store: this makes the store available in the
>>>>> topology
>>>>>>>>> in
>>>>>>>>>>>>>>> general (however, the store is still "dangling", and not
>>> used)
>>>>>>>>>>>>>>>  - connecting a store: this allows a processor etc to use a
>>>>> store
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The new interface does both, but its name only indicates that
>>>>>>>>> second
>>>>>>>>>>>>>>> part what might be confusing. It might be especially
>>> confusing
>>>>>>>>>>> because
>>>>>>>>>>>>>>> we want to disallow to mix the exiting "manually add and
>>>>> connect"
>>>>>>>>>>>>>>> pattern, with a new pattern to "auto add+connect". If the new
>>>>>>>>>>>> interface
>>>>>>>>>>>>>>> name indicates the connect part only, user might think they
>>>>> need to
>>>>>>>>>>>> add
>>>>>>>>>>>>>>> stores manually and can connect automatically.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Unfortunately, I don't have a much better suggestion for a
>>> name
>>>>>>>>>>>> either.
>>>>>>>>>>>>>>> The only idea that came to my mind was `StoreProvider`: to
>>> me, a
>>>>>>>>>>>>>>> provider is a "service" interface that does work for us, ie,
>>> it
>>>>>>>>> adds
>>>>>>>>>>>> and
>>>>>>>>>>>>>>> connects a store. Not sure if this is too subtle, if we
>>> consider
>>>>>>>>>>> that
>>>>>>>>>>>>>>> there is already the `StoreSupplier` interface?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But maybe somebody else might still have a good idea on how
>>> the
>>>>>>>>>>>> improve
>>>>>>>>>>>>>>> the name.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In any case, I would suggest to shorten the name to
>>>>>>>>> `StoreConnector`
>>>>>>>>>>>>>>> instead of `StateStoreConnector`, because we also have
>>>>>>>>>>> `StoreSupplier`
>>>>>>>>>>>>>>> and `StoreBuilder`.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (2) The KIP proposes to add the new interface to
>>>>>>>>> `ProcessorSupplier`
>>>>>>>>>>>> etc
>>>>>>>>>>>>>>> and to add a default implementation for the new method.
>>> Hence,
>>>>> user
>>>>>>>>>>>>>>> would need to overwrite this default implementation to op-in
>>> to
>>>>> the
>>>>>>>>>>>>>>> feature. I am wonder if it might be better to not add the new
>>>>>>>>>>>> interface
>>>>>>>>>>>>>>> to `ProcessorSupplier` etc and to just provide a new
>>> interface
>>>>> with
>>>>>>>>>>> no
>>>>>>>>>>>>>>> default implementation. Users would opt-in by adding the
>>>>> interface
>>>>>>>>>>>>>>> explicitly to their existing `ProcessorSupplier`
>>> implementation.
>>>>>>>>>>>>>>> Overwriting a default method and getting different behavior
>>>>> seems
>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>> a little subtle to me, especially, because we don't want to
>>>>> allow
>>>>>>>>> to
>>>>>>>>>>>>>>> mix-and-match the old and new approaches. Think: I only
>>>>> overwrite a
>>>>>>>>>>>>>>> default method and my code breaks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (3) If we keep the current default implementation for the new
>>>>>>>>>>> method,
>>>>>>>>>>>> I
>>>>>>>>>>>>>>> am wondering if it should return `null` instead of an empty
>>>>>>>>>>>> collection?
>>>>>>>>>>>>>>> This might be saver to detect bugs in user code for which,
>>> per
>>>>>>>>>>>> accident,
>>>>>>>>>>>>>>> an empty collection could be returned.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (4) Should the new method return a `Set` instead of a
>>>>> `Collection`
>>>>>>>>>>> to
>>>>>>>>>>>>>>> indicate the semantics clearly (ie, returning the same
>>>>>>>>>>> `StoreBuilder`
>>>>>>>>>>>>>>> multiple times is idempotent and one cannot add+connect to it
>>>>>>>>>>> twice).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 4/6/19 12:27 PM, Paul Whalen wrote:
>>>>>>>>>>>>>>>> Ivan and Guozhang,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the thoughts!  Ivan's use case is definitely
>>>>>>>>>>> interesting.
>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>> way I see it, if we can achieve the main goal of the KIP
>>>>> (allowing
>>>>>>>>>>>>>>>> Processor/TransformerSuppliers to encapsulate their usage of
>>>>> state
>>>>>>>>>>>>>>> stores),
>>>>>>>>>>>>>>>> we will enable this kind of thing in "user space" very
>>> easily.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I will say that I'm not totally sure that most use cases of
>>>>>>>>>>>> transform()
>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> just one state store.  It's hard to know since I haven't
>>> seen
>>>>> many
>>>>>>>>>>>>>>> examples
>>>>>>>>>>>>>>>> in public, but my team's usages almost exclusively require
>>>>>>>>> multiple
>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>> stores.  We only reach for the low level processor API when
>>> we
>>>>>>>>> need
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> complexity, and it's somewhat hard to imagine many use cases
>>>>> that
>>>>>>>>>>>> only
>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> one state store, since the high level DSL can usually
>>>>> accomplish
>>>>>>>>>>>> those
>>>>>>>>>>>>>>>> tasks.  The example Ivan presented for instance looks like a
>>>>>>>>>>>>>>>> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious
>>>>> what
>>>>>>>>>>>> sort
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> other usages you're imagining.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That being said, perhaps the Processor API should really
>>> just
>>>>> be
>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>> a separate paradigm in Streams, not just a lower level that
>>> we
>>>>>>>>>>> reach
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> when necessary.  In which case it would be beneficial to
>>> make
>>>>> the
>>>>>>>>>>>>> simple
>>>>>>>>>>>>>>>> use cases easier.  I've definitely talked about this with my
>>>>> own
>>>>>>>>>>>> team -
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> you're less familiar with the kind of functional style that
>>> the
>>>>>>>>>>> high
>>>>>>>>>>>>>>> level
>>>>>>>>>>>>>>>> DSL offers, it might be easier to "see" your state and
>>> interact
>>>>>>>>>>> with
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> directly.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Anyway, I've updated the KIP to reflect my current PR with
>>>>>>>>>>> Guozhang's
>>>>>>>>>>>>>>>> suggestions.  It seems like there is at least some interest
>>> in
>>>>>>>>> that
>>>>>>>>>>>> on
>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>> own and not a ton of pushback, so I think I will try to
>>> start a
>>>>>>>>>>> vote.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev <
>>>>>>>>>>> iponoma...@mail.ru>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi all!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I was about to write another KIP, but found out that
>>> KIP-401
>>>>>>>>>>>> addresses
>>>>>>>>>>>>>>>>> exactly the problem I faced. So let me jump into your
>>>>> discussion
>>>>>>>>>>> and
>>>>>>>>>>>>> ask
>>>>>>>>>>>>>>>>> you to assess another idea.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I fully agree with the KIP-401's motivation part. E. g in
>>> my
>>>>>>>>>>>> project I
>>>>>>>>>>>>>>> had
>>>>>>>>>>>>>>>>> to invent a wrapper class that hides the details of
>>>>> KeyValueStore
>>>>>>>>>>>>>>>>> management from business logic. Of course this should be
>>> done
>>>>>>>>>>> better
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> KStreams API.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> But I was about to look at this problem from another side
>>> and
>>>>>>>>>>>> propose
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> simple alternative in high-level DSL, that will not fit all
>>>>> the
>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>> most of them. Hence my idea does not exclude the Paul's
>>>>> proposal.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What if we restrict ourselves to *only one* KeyValueStore
>>> and
>>>>>>>>>>>> propose
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> method that resembles  `aggregate` and `reduce` methods,
>>> like
>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>    .map(...)
>>>>>>>>>>>>>>>>>    .filter(...)
>>>>>>>>>>>>>>>>>    .transform ((k, v, s)->{....}, Transformed.with(....))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>> * k, v -- input key & value
>>>>>>>>>>>>>>>>> * s -- a KeyValueStore provided as an argument
>>>>>>>>>>>>>>>>> * return value of the lambda should be KeyValue.pair(...)
>>>>>>>>>>>>>>>>> * Transformed.with... is a builder, used in order to define
>>>>> the
>>>>>>>>>>>>>>>>> Transformer and KeyValueStore building parameters. Some of
>>>>> these
>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>>>> should be:
>>>>>>>>>>>>>>>>> ** store's KeySerde,
>>>>>>>>>>>>>>>>> ** store's ValueSerde,
>>>>>>>>>>>>>>>>> ** whether the store is persistent or in-memory,
>>>>>>>>>>>>>>>>> ** store's name -- optional parameter, the system should be
>>>>> able
>>>>>>>>>>> to
>>>>>>>>>>>>>>> devise
>>>>>>>>>>>>>>>>> the name of the store transparently for the user, if we
>>> don't
>>>>>>>>> want
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> devise it ourselves/share the store between processors.
>>>>>>>>>>>>>>>>> ** scheduled punctuation.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Imagine we have a KStream<String, Integer>, and we need to
>>>>>>>>>>>> calculate a
>>>>>>>>>>>>>>>>> `derivative` stream, that is, a stream of 'deltas' of the
>>>>>>>>> provided
>>>>>>>>>>>>>>> integer
>>>>>>>>>>>>>>>>> values.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This could be achieved as simple as
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stream.transform((key, value, stateStore) -> {
>>>>>>>>>>>>>>>>>         int previousValue =
>>>>>>>>>>>>>>>>> Optional.ofNullable(stateStore.get(key)).orElse(0);
>>>>>>>>>>>>>>>>>         stateStore.put(key, value);
>>>>>>>>>>>>>>>>>         return KeyValue.pair(key, value - previousValue);
>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>         //we do not need to bother with store name,
>>>>> punctuation
>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>         //may be even Serde part can be omitted, since we
>>> can
>>>>>>>>>>>> inherit
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> serdes from stream by default
>>>>>>>>>>>>>>>>>         , Transformed.with(Serdes.String(),
>>> Serdes.Integer())
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The hard part of it is that new `transform` method
>>> definition
>>>>>>>>>>> should
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> parameterized by six type parameters:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * input/output/KeyValueStore key type,
>>>>>>>>>>>>>>>>> * input/output/KeyValueStore value type.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, it seems that all these types can be inferred from
>>>>> the
>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>>>> lambda and Transformed.with instances.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you think about this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 27.03.2019 20:45, Guozhang Wang пишет:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the uploaded PR and the detailed description!
>>> I've
>>>>>>>>>>> made a
>>>>>>>>>>>>>>> pass
>>>>>>>>>>>>>>>>> on it and left some comments.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Overall I think I agree with you that passing in the
>>>>> storebuilder
>>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>>>> that store name is more convienent as it does not require
>>>>> another
>>>>>>>>>>>>>>>>> `addStore` call, but we just need to spend some more
>>>>>>>>> documentation
>>>>>>>>>>>>>>> effort
>>>>>>>>>>>>>>>>> on educating users about the two ways of connecting their
>>>>> stores.
>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>> slightly concerned about this education curve but I can be
>>>>>>>>>>> convinced
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> most people felt it is worthy.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <
>>>>> pgwha...@gmail.com>
>>>>>>>>>>> <
>>>>>>>>>>>>>>> pgwha...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'd like to resurrect this discussion with a cursory,
>>>>>>>>>>>> proof-of-concept
>>>>>>>>>>>>>>>>> implementation of the KIP which combines many of our ideas:
>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/6496.  I tried to keep
>>> the
>>>>>>>>>>> diff
>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> small as possible for now, just using it to convey the main
>>>>>>>>> ideas.
>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>> I'll separately address some of our earlier discussion:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    - Will there be a new, separate interface for users to
>>>>>>>>>>> implement
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>    new functionality? No, to hopefully keep things simple,
>>>>> all of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>    Processor/TransformerSupplier interfaces will just
>>> extend
>>>>>>>>>>>>>>>>>    StateStoresSupplier, allowing users to opt in to this
>>>>>>>>>>>> functionality
>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>    overriding the default implementation that gives an
>>> empty
>>>>>>>>> list.
>>>>>>>>>>>>>>>>>    - Will the interface allow users to specify the store
>>>>> name, or
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>    entire StoreBuilder? The entire StoreBuilder, so the
>>>>>>>>>>>>>>>>>    Processor/TransformerSupplier can completely encapsulate
>>>>> name
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>    implementation of a state store if desired.
>>>>>>>>>>>>>>>>>    - Will the old way of specifying store names alongside
>>> the
>>>>>>>>>>>> supplier
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>    calling stream.process/transform() be deprecated? No,
>>> this
>>>>> is
>>>>>>>>>>>>> still a
>>>>>>>>>>>>>>>>>    legitimate way to wire up Processors/Transformers and
>>> their
>>>>>>>>>>>> stores.
>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>    would recommend not allowing stream.process/transform()
>>>>> calls
>>>>>>>>>>>> that
>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>    store declaration mechanisms (this restriction is not in
>>>>> the
>>>>>>>>>>>> proof
>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>    concept)
>>>>>>>>>>>>>>>>>    - How will we handle adding the same state store to the
>>>>>>>>>>> topology
>>>>>>>>>>>>>>>>>    multiple times because different
>>>>>>>>> Processor/TransformerSuppliers
>>>>>>>>>>>>>>> declare
>>>>>>>>>>>>>>>>> it?
>>>>>>>>>>>>>>>>>    topology.addStateStore() will be slightly relaxed for
>>>>>>>>>>>> convenience,
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>    allow adding the same StoreBuilder multiple times as
>>> long
>>>>> as
>>>>>>>>>>> the
>>>>>>>>>>>>>>> exact
>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>    StoreBuilder instance is being added for the same store
>>>>> name.
>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>    to prevent in practice the issue of accidentally making
>>> two
>>>>>>>>>>> state
>>>>>>>>>>>>>>> stores
>>>>>>>>>>>>>>>>>    one by adding with the same name.  For additional
>>> safety,
>>>>> if
>>>>>>>>> we
>>>>>>>>>>>>>>> wanted
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>    (not in the proof of concept), we could allow for this
>>>>>>>>>>> relaxation
>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>    internal callers of topology.addStateStore().
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So, in summary, the use cases look like:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    - 1 transformer/processor that owns its store: Using the
>>>>> new
>>>>>>>>>>>>>>>>>    StateStoresSupplier interface method to supply its
>>>>>>>>>>> StoreBuilders
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>    be added to the topology automatically.
>>>>>>>>>>>>>>>>>    - Multiple transformer/processors that share the same
>>>>> store:
>>>>>>>>>>>> Either
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>    1. The old way: the StoreBuilder is defined "far away"
>>> from
>>>>>>>>> the
>>>>>>>>>>>>>>>>>    Transformer/Processor implementations, and is added to
>>> the
>>>>>>>>>>>> topology
>>>>>>>>>>>>>>>>>    manually by the user
>>>>>>>>>>>>>>>>>    2. The new way: the StoreBuilder is defined closer to
>>> the
>>>>>>>>>>>>>>>>>    Transformer/Processor implementations, and the same
>>>>> instance
>>>>>>>>> is
>>>>>>>>>>>>>>>>> returned by
>>>>>>>>>>>>>>>>>    all Transformer/ProcessorSuppliers that need it
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This makes the KIP wiki a bit stale; I'll update if we
>>> want to
>>>>>>>>>>> bring
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> design to a vote.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <
>>>>>>>>> wangg...@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>> <
>>>>>>>>>>>>>>> wangg...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Matthias / Paul,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The concern I had about introducing `StoreBuilderSupplier`
>>> is
>>>>>>>>>>> simply
>>>>>>>>>>>>>>>>> because it is another XXSupplier to the public API, so I'd
>>>>> like
>>>>>>>>> to
>>>>>>>>>>>> ask
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> we really have to add it :)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The difference between encapsulating the store name and
>>>>>>>>>>>> encapsulating
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> full state store builder is that, in the former:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> String storeName = "store1";
>>>>>>>>>>>>>>>>> builder.addStore(new MyStoreBuilder(storeName));
>>>>>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplier(storeName));
>>>  //
>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> proposal, that the store name can be passed in and used for
>>>>> both
>>>>>>>>>>>>>>>>> `listStores` and in the `Transformer#init`; so the
>>> Transformer
>>>>>>>>>>>>> function
>>>>>>>>>>>>>>>>> does not need to get the constant string name again.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>                          // one caveat to admit, is that
>>>>>>>>>>>>>>>>> MyTransofmerSupplier logic may be just unique to `store1`
>>> so
>>>>> it
>>>>>>>>>>>> cannot
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> reused with a different store name anyways.
>>>>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> While in the latter:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stream1.transform(new MyTransformerSupplierForStore1);   //
>>>>> the
>>>>>>>>>>> name
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> just indicating that we may have one such supplier for each
>>>>>>>>> store.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -----------
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I understand the latter introduce more convenience from the
>>>>> API,
>>>>>>>>>>> but
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> cost is that since we still cannot completely
>>>>> `builder.addStore`,
>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> reduce its semantic scope to shared state stores only,;
>>> hence
>>>>>>>>>>> users
>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> learn two ways of creating state stores for those two
>>>>> patterns.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My argument is that more public APIs requires longer
>>> learning
>>>>>>>>>>> curve
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> users, and introduces more usage patterns that may confuse
>>>>> users
>>>>>>>>>>>> (the
>>>>>>>>>>>>>>>>> proposal I had tries to replace one with another
>>> completely).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <
>>>>> pgwha...@gmail.com>
>>>>>>>>>>> <
>>>>>>>>>>>>>>> pgwha...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the great thoughts Matthias and Guozhang!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If I'm not mistaken, Guozhang's suggestion is what my
>>> second
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> alternative
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the KIP is ("Have the added method on the Supplier
>>> interfaces
>>>>>>>>> only
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> store names, not builders").  I do think it would be a
>>>>> worthwhile
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> usability
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> improvement on its own, but to Matthias's point, it doesn't
>>>>>>>>>>> achieve
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> full goal of completing encapsulating a state store and
>>> it's
>>>>>>>>>>>> processor
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> encapsulates the name, but not the StateStoreBuilder.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm really intrigued by Matthias's idea that forgoes the
>>>>> default
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> method I proposed.  Having smaller, separate interfaces is
>>> a
>>>>>>>>>>>> powerful
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> idea
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and I think a cleaner API than what I proposed.  The
>>>>> non-shared
>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> case is handled well here, and the shared store use case is
>>>>>>>>>>>> possible,
>>>>>>>>>>>>>>>>> though maybe still not as graceful as we would like
>>> (having to
>>>>>>>>> add
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> StoreBuilderSupplier before the StoreNameSupplier seems
>>> maybe
>>>>> too
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> subtle
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> me).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We're all agreed that one of the big problems with the
>>> shared
>>>>>>>>>>> store
>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>> case is how to deal with adding the same store to the
>>> topology
>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>> times.  Catching the "store already added" exception is
>>> risky.
>>>>>>>>>>>> Here's
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> maybe radical idea: change `topology.addStateStore()` to be
>>>>>>>>>>>> idempotent
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> adding a given state store name and `StoreBuilder`.  In
>>> other
>>>>>>>>>>> words,
>>>>>>>>>>>>>>>>> `addStateStore` would not throw the "store already added"
>>>>>>>>>>> exception
>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> `StoreBuilder` being added for a given name has the same
>>>>> identity
>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> one that has already been added.  Does this eliminate all
>>> the
>>>>>>>>> bugs
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> we're
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> worried about?  Thinking about it for a few minutes, it
>>> seems
>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> eliminate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> most at least (would a user really use the exact same
>>>>>>>>> StoreBuilder
>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>> they intend there to be two stores?).  It might make the
>>> API
>>>>>>>>>>>> slightly
>>>>>>>>>>>>>>>>> harder to use if a user isn't immediately aware of that
>>>>> subtlety,
>>>>>>>>>>>> but
>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> good error message should ease the pain, and it would
>>> happen
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> immediately
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> during development.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> And with regards to Matthias's comment about whether we
>>> need
>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> deprecate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> existing varargs transform methods - I don't think we need
>>> to,
>>>>>>>>> but
>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> be nice for there only to be one way to do things, assuming
>>>>>>>>>>> whatever
>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> come up with supports all existing use cases.  I don't feel
>>>>>>>>>>> strongly
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> this, but if we don't deprecate, I do think it's important
>>> to
>>>>> add
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> checks
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that prevent users from trying to do the same thing in two
>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ways,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> as we've discussed.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <
>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding the last option to catch "store exist already"
>>>>>>>>> exception
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be
>>>>> hiding
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> user bugs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I agree with this concern. From my original email:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The only disadvantage I see, might be
>>>>>>>>>>>>>>>>> potential bugs about sharing state if two different stores
>>> are
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> same by mistake (this would not be detected).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For your new proposal: I am not sure if it addresses Paul's
>>>>>>>>>>> original
>>>>>>>>>>>>>>>>> idea -- I hope Paul can clarify. From my understanding, the
>>>>> idea
>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> encapsulate a store and its processor. As many stores are
>>> not
>>>>>>>>>>>> shared,
>>>>>>>>>>>>>>>>> this seems to be quite useful. Your proposal falls a little
>>>>> short
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> support encapsulation for none-shared stores.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 12/15/18 1:40 AM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Matthias,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for your feedbacks.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding the last option to catch "store exist already"
>>>>>>>>> exception
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> fallback to connect stores, I'm a bit concerned it may be
>>>>> hiding
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> user bugs.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thinking about Paul's proposal and your suggestion again,
>>> I'd
>>>>>>>>> like
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> propose another alternative somewhere in the middle of your
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> approaches,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> i.e. we still let users to create sharable state stores via
>>>>>>>>>>>>>>>>> `addStateStore`, and we allow the TransformerSupplier to
>>>>> return a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> list
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> state stores that it needs, i.e.:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public interface TransformerSupplier<K, V, R> {
>>>>>>>>>>>>>>>>>     Transformer<K, V, R> get();
>>>>>>>>>>>>>>>>>     default List<String> stateStoreNames() {
>>>>>>>>>>>>>>>>>         return Collections.emptyList();
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ;>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> by doing this users can still "consolidate" the references
>>> of
>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> names
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> in a single place in the transform call, e.g.:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public class MyTransformerSupplier<K, V, R> {
>>>>>>>>>>>>>>>>>     private String storeName;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     public class MyTransformer<K, V, R> {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>        ....
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>        init() {
>>>>>>>>>>>>>>>>>           store = context.getStateStore(storeName);
>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     default List<String> stateStoreNames() {
>>>>>>>>>>>>>>>>>         return Collections.singletonList(storeName);
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/pages/Collections.emptyList()
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ;>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Basically, we move the parameters from the caller of
>>>>> `transform`
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> inside
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the TransformSuppliers. DSL implementations would not
>>> change
>>>>>>>>> much,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> simply
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> calling `connectStateStore` by getting the list of names
>>> from
>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Just a meta comment: do we really need to deprecate
>>> existing
>>>>>>>>>>>>>>>>> `transform()` etc methods?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The last argument is a vararg, and thus, just keeping the
>>>>>>>>> existing
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for this part seems to work too, allowing to implement both
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> patterns?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also, instead of adding a default method, we could also
>>> add a
>>>>> new
>>>>>>>>>>>>>>>>> interface `StoreBuilderSupplier` with method
>>>>> `List<StoreBuilder>
>>>>>>>>>>>>>>>>> stateStores()` -- users could implement
>>> `TransformerSupplier`
>>>>> and
>>>>>>>>>>>>>>>>> `StoreBuilderSupplier` at once; and for this case, we
>>> require
>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> don't provide store name in `transform()`.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Similar, we could add an interface `StoreNameSupplier` with
>>>>>>>>> method
>>>>>>>>>>>>>>>>> `List<String> stateStores()`. This allows to "auto-wire" a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> transformer
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to existing stores (to avoid the issue to add the same
>>> store
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> times).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hence, for shared stores, there would be one "main"
>>>>> transformer
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> implements `StoreBuilderSupplier` and that must be added
>>>>> first to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> topology. The other transformers would implement
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> `StoreNameSupplier`
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> just connect to those stores.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Another possibility to avoid the issue of adding the same
>>>>> stores
>>>>>>>>>>>>>>>>> multiple times would be, that the DSL always calls
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> `addStateStore()`
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> catches a potential "store exists already" exception and
>>> falls
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> `connectProcessorAndStateStore()` for this case. Thus, we
>>>>> would
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the `StoreNameSupplier` interface and the order in which
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> transformers
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> are added would not matter either. The only disadvantage I
>>>>> see,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> potential bugs about sharing state if two different stores
>>> are
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> named
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> same by mistake (this would not be detected).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Just some ideas I wanted to share. What do you think?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 12/11/18 3:46 AM, Paul Whalen wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ah yes of course, this was an oversight, I completely
>>> ignored
>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> processors sharing the same state store when writing up the
>>>>> KIP.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Which
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> funny, because I've actually done this (different
>>> processors
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> sharing
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stores) a fair amount myself, and I've settled on a pattern
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> where I
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> group
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the Processors in an enclosing class, and that enclosing
>>> class
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> handles
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> much as possible.  Here's a gist showing the rough
>>> structure,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> context:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> . Note how it adds the stores to the topology, as well as
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> providing a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public method with the store names.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I don't think my proposal completely conflicts with the
>>>>> multiple
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> processors
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> sharing state stores use case, since you can create a
>>> supplier
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> provides the store name you want, somewhat independently of
>>>>> your
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Processor logic.  The issue I do see though, is that
>>>>>>>>>>>>>>>>> topology.addStateStore() can only be called once for a
>>> given
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> your example, if the there was a single TransformerSupplier
>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> passed
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> into both transform() calls, "store1" would be added (under
>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> hood)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the topology twice, which is no good.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Perhaps this suggests that one of my alternatives on the
>>> KIP
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> desirable: either not having the suppliers return
>>>>> StoreBuilders
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (just
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> names), or not deprecating the old methods that take
>>>>> "String...
>>>>>>>>>>>>>>>>> stateStoreNames". I'll have to think about it a bit.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the great writeup (very detailed and crystal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> motivation
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> sections!).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is quite an interesting idea and I do like the API
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> cleanness
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> proposed. The original motivation of letting
>>> StreamsTopology
>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stores though, is to allow different processors to share
>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> builder.addStore("store1");
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> // a path of stream transformations that leads to KStream
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stream1.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stream1.transform(..., "store1");
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> // another path that generates a KStream stream2.
>>>>>>>>>>>>>>>>> stream2.transform(..., "store1");
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Behind the scene, Streams will make sure stream1 / stream2
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> transformations
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> will always be grouped together as a single group of tasks,
>>>>> each
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> will be executed by a single thread and hence there's no
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> issues
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> on accessing the store from different operators within the
>>>>> same
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> task.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> not sure how common this use case is, but I'd like to hear
>>> if
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> thoughts maintaining this since the current proposal seems
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> exclude
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> possibility.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <
>>>>> pgwha...@gmail.com>
>>>>>>>>> <
>>>>>>>>>>>>>>> pgwha...@gmail.com>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API
>>>>> change
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> think could greatly increase the usability of the low-level
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> processor
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have some code written but will wait to see if there is
>>> buy
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> going all out and creating a pull request.  It seems like
>>> most
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> would be in updating documentation and tests.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to