Guozhang, >> Regarding the last option to catch "store exist already" exception and >> fallback to connect stores, I'm a bit concerned it may be hiding actual >> user bugs.
I agree with this concern. From my original email: > The only disadvantage I see, might be > potential bugs about sharing state if two different stores are named the > same by mistake (this would not be detected). For your new proposal: I am not sure if it addresses Paul's original idea -- I hope Paul can clarify. From my understanding, the idea was to encapsulate a store and its processor. As many stores are not shared, this seems to be quite useful. Your proposal falls a little short to support encapsulation for none-shared stores. -Matthias On 12/15/18 1:40 AM, Guozhang Wang wrote: > Matthias, > > Thanks for your feedbacks. > > Regarding the last option to catch "store exist already" exception and > fallback to connect stores, I'm a bit concerned it may be hiding actual > user bugs. > > Thinking about Paul's proposal and your suggestion again, I'd like to > propose another alternative somewhere in the middle of your approaches, > i.e. we still let users to create sharable state stores via > `addStateStore`, and we allow the TransformerSupplier to return a list of > state stores that it needs, i.e.: > > public interface TransformerSupplier<K, V, R> { > Transformer<K, V, R> get(); > default List<String> stateStoreNames() { > return Collections.emptyList(); > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();> > } > } > > by doing this users can still "consolidate" the references of store names > in a single place in the transform call, e.g.: > > public class MyTransformerSupplier<K, V, R> { > private String storeName; > > public class MyTransformer<K, V, R> { > > .... > > init() { > store = context.getStateStore(storeName); > } > } > > default List<String> stateStoreNames() { > return Collections.singletonList(storeName); > <https://cwiki.apache.org/confluence/pages/Collections.emptyList();> > } > } > > Basically, we move the parameters from the caller of `transform` to inside > the TransformSuppliers. DSL implementations would not change much, simply > calling `connectStateStore` by getting the list of names from the provided > function. > > Guozhang > > > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Just a meta comment: do we really need to deprecate existing >> `transform()` etc methods? >> >> The last argument is a vararg, and thus, just keeping the existing API >> for this part seems to work too, allowing to implement both patterns? >> >> Also, instead of adding a default method, we could also add a new >> interface `StoreBuilderSupplier` with method `List<StoreBuilder> >> stateStores()` -- users could implement `TransformerSupplier` and >> `StoreBuilderSupplier` at once; and for this case, we require that users >> don't provide store name in `transform()`. >> >> Similar, we could add an interface `StoreNameSupplier` with method >> `List<String> stateStores()`. This allows to "auto-wire" a transformer >> to existing stores (to avoid the issue to add the same store multiple >> times). >> >> Hence, for shared stores, there would be one "main" transformer that >> implements `StoreBuilderSupplier` and that must be added first to the >> topology. The other transformers would implement `StoreNameSupplier` and >> just connect to those stores. >> >> Another possibility to avoid the issue of adding the same stores >> multiple times would be, that the DSL always calls `addStateStore()` but >> catches a potential "store exists already" exception and falls back to >> `connectProcessorAndStateStore()` for this case. Thus, we would not need >> the `StoreNameSupplier` interface and the order in which transformers >> are added would not matter either. The only disadvantage I see, might be >> potential bugs about sharing state if two different stores are named the >> same by mistake (this would not be detected). >> >> >> >> Just some ideas I wanted to share. What do you think? >> >> >> >> -Matthias >> >> On 12/11/18 3:46 AM, Paul Whalen wrote: >>> Ah yes of course, this was an oversight, I completely ignored the >> multiple >>> processors sharing the same state store when writing up the KIP. Which >> is >>> funny, because I've actually done this (different processors sharing >> state >>> stores) a fair amount myself, and I've settled on a pattern where I group >>> the Processors in an enclosing class, and that enclosing class handles as >>> much as possible. Here's a gist showing the rough structure, just for >>> context: >> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65 >>> . Note how it adds the stores to the topology, as well as providing a >>> public method with the store names. >>> >>> I don't think my proposal completely conflicts with the multiple >> processors >>> sharing state stores use case, since you can create a supplier that >>> provides the store name you want, somewhat independently of your actual >>> Processor logic. The issue I do see though, is that >>> topology.addStateStore() can only be called once for a given store. So >> for >>> your example, if the there was a single TransformerSupplier that was >> passed >>> into both transform() calls, "store1" would be added (under the hood) to >>> the topology twice, which is no good. >>> >>> Perhaps this suggests that one of my alternatives on the KIP might be >>> desirable: either not having the suppliers return StoreBuilders (just >> store >>> names), or not deprecating the old methods that take "String... >>> stateStoreNames". I'll have to think about it a bit. >>> >>> Paul >>> >>> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Hello Paul, >>>> >>>> Thanks for the great writeup (very detailed and crystal motivation >>>> sections!). >>>> >>>> This is quite an interesting idea and I do like the API cleanness you >>>> proposed. The original motivation of letting StreamsTopology to add >> state >>>> stores though, is to allow different processors to share the state >> store. >>>> For example: >>>> >>>> builder.addStore("store1"); >>>> >>>> // a path of stream transformations that leads to KStream stream1. >>>> stream1.transform(..., "store1"); >>>> >>>> // another path that generates a KStream stream2. >>>> stream2.transform(..., "store1"); >>>> >>>> Behind the scene, Streams will make sure stream1 / stream2 >> transformations >>>> will always be grouped together as a single group of tasks, each of >> which >>>> will be executed by a single thread and hence there's no concurrency >> issues >>>> on accessing the store from different operators within the same task. >> I'm >>>> not sure how common this use case is, but I'd like to hear if you have >> any >>>> thoughts maintaining this since the current proposal seems exclude this >>>> possibility. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com> wrote: >>>> >>>>> Here's KIP-401 for discussion, a minor Kafka Streams API change that I >>>>> think could greatly increase the usability of the low-level processor >>>> API. >>>>> I have some code written but will wait to see if there is buy in before >>>>> going all out and creating a pull request. It seems like most of the >>>> work >>>>> would be in updating documentation and tests. >>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756 >>>>> >>>>> Thanks! >>>>> Paul >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature