Guozhang,

>> Regarding the last option to catch "store exist already" exception and
>> fallback to connect stores, I'm a bit concerned it may be hiding actual
>> user bugs.

I agree with this concern. From my original email:

> The only disadvantage I see, might be
> potential bugs about sharing state if two different stores are named the
> same by mistake (this would not be detected).


For your new proposal: I am not sure if it addresses Paul's original
idea -- I hope Paul can clarify. From my understanding, the idea was to
encapsulate a store and its processor. As many stores are not shared,
this seems to be quite useful. Your proposal falls a little short to
support encapsulation for none-shared stores.


-Matthias



On 12/15/18 1:40 AM, Guozhang Wang wrote:
> Matthias,
> 
> Thanks for your feedbacks.
> 
> Regarding the last option to catch "store exist already" exception and
> fallback to connect stores, I'm a bit concerned it may be hiding actual
> user bugs.
> 
> Thinking about Paul's proposal and your suggestion again, I'd like to
> propose another alternative somewhere in the middle of your approaches,
> i.e. we still let users to create sharable state stores via
> `addStateStore`, and we allow the TransformerSupplier to return a list of
> state stores that it needs, i.e.:
> 
> public interface TransformerSupplier<K, V, R> {
>     Transformer<K, V, R> get();
>     default List<String> stateStoreNames() {
>         return Collections.emptyList();
> <https://cwiki.apache.org/confluence/pages/Collections.emptyList();>
>     }
> }
> 
> by doing this users can still "consolidate" the references of store names
> in a single place in the transform call, e.g.:
> 
> public class MyTransformerSupplier<K, V, R> {
>     private String storeName;
> 
>     public class MyTransformer<K, V, R> {
> 
>        ....
> 
>        init() {
>           store = context.getStateStore(storeName);
>        }
>     }
> 
>     default List<String> stateStoreNames() {
>         return Collections.singletonList(storeName);
> <https://cwiki.apache.org/confluence/pages/Collections.emptyList();>
>     }
> }
> 
> Basically, we move the parameters from the caller of `transform` to inside
> the TransformSuppliers. DSL implementations would not change much, simply
> calling `connectStateStore` by getting the list of names from the provided
> function.
> 
> Guozhang
> 
> 
> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Just a meta comment: do we really need to deprecate existing
>> `transform()` etc methods?
>>
>> The last argument is a vararg, and thus, just keeping the existing API
>> for this part seems to work too, allowing to implement both patterns?
>>
>> Also, instead of adding a default method, we could also add a new
>> interface `StoreBuilderSupplier` with method `List<StoreBuilder>
>> stateStores()` -- users could implement `TransformerSupplier` and
>> `StoreBuilderSupplier` at once; and for this case, we require that users
>> don't provide store name in `transform()`.
>>
>> Similar, we could add an interface `StoreNameSupplier` with method
>> `List<String> stateStores()`. This allows to "auto-wire" a transformer
>> to existing stores (to avoid the issue to add the same store multiple
>> times).
>>
>> Hence, for shared stores, there would be one "main" transformer that
>> implements `StoreBuilderSupplier` and that must be added first to the
>> topology. The other transformers would implement `StoreNameSupplier` and
>> just connect to those stores.
>>
>> Another possibility to avoid the issue of adding the same stores
>> multiple times would be, that the DSL always calls `addStateStore()` but
>> catches a potential "store exists already" exception and falls back to
>> `connectProcessorAndStateStore()` for this case. Thus, we would not need
>> the `StoreNameSupplier` interface and the order in which transformers
>> are added would not matter either. The only disadvantage I see, might be
>> potential bugs about sharing state if two different stores are named the
>> same by mistake (this would not be detected).
>>
>>
>>
>> Just some ideas I wanted to share. What do you think?
>>
>>
>>
>> -Matthias
>>
>> On 12/11/18 3:46 AM, Paul Whalen wrote:
>>> Ah yes of course, this was an oversight, I completely ignored the
>> multiple
>>> processors sharing the same state store when writing up the KIP.  Which
>> is
>>> funny, because I've actually done this (different processors sharing
>> state
>>> stores) a fair amount myself, and I've settled on a pattern where I group
>>> the Processors in an enclosing class, and that enclosing class handles as
>>> much as possible.  Here's a gist showing the rough structure, just for
>>> context:
>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
>>> . Note how it adds the stores to the topology, as well as providing a
>>> public method with the store names.
>>>
>>> I don't think my proposal completely conflicts with the multiple
>> processors
>>> sharing state stores use case, since you can create a supplier that
>>> provides the store name you want, somewhat independently of your actual
>>> Processor logic.  The issue I do see though, is that
>>> topology.addStateStore() can only be called once for a given store.  So
>> for
>>> your example, if the there was a single TransformerSupplier that was
>> passed
>>> into both transform() calls, "store1" would be added (under the hood) to
>>> the topology twice, which is no good.
>>>
>>> Perhaps this suggests that one of my alternatives on the KIP might be
>>> desirable: either not having the suppliers return StoreBuilders (just
>> store
>>> names), or not deprecating the old methods that take "String...
>>> stateStoreNames". I'll have to think about it a bit.
>>>
>>> Paul
>>>
>>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>
>>>> Hello Paul,
>>>>
>>>> Thanks for the great writeup (very detailed and crystal motivation
>>>> sections!).
>>>>
>>>> This is quite an interesting idea and I do like the API cleanness you
>>>> proposed. The original motivation of letting StreamsTopology to add
>> state
>>>> stores though, is to allow different processors to share the state
>> store.
>>>> For example:
>>>>
>>>> builder.addStore("store1");
>>>>
>>>> // a path of stream transformations that leads to KStream stream1.
>>>> stream1.transform(..., "store1");
>>>>
>>>> // another path that generates a KStream stream2.
>>>> stream2.transform(..., "store1");
>>>>
>>>> Behind the scene, Streams will make sure stream1 / stream2
>> transformations
>>>> will always be grouped together as a single group of tasks, each of
>> which
>>>> will be executed by a single thread and hence there's no concurrency
>> issues
>>>> on accessing the store from different operators within the same task.
>> I'm
>>>> not sure how common this use case is, but I'd like to hear if you have
>> any
>>>> thoughts maintaining this since the current proposal seems exclude this
>>>> possibility.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> wrote:
>>>>
>>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change that I
>>>>> think could greatly increase the usability of the low-level processor
>>>> API.
>>>>> I have some code written but will wait to see if there is buy in before
>>>>> going all out and creating a pull request.  It seems like most of the
>>>> work
>>>>> would be in updating documentation and tests.
>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>>>
>>>>> Thanks!
>>>>> Paul
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to