Hi Kyle, Thanks for your input. Really appreciated.
On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <[email protected]> wrote: > I like more of a builder pattern even though others have voiced against > it. The reason I like it is because it makes it clear to the user that a > call to KGroupedStream#count will return a KTable not some intermediate > class that I need to undetstand. > Yes, that makes sense. > When trying to program in the fluent API that has been discussed most it > feels difficult to know when you will actually get an object you can reuse. > What if I make one KGroupedStream that I want to reuse, is it legal to > reuse it or does this approach expect you to call grouped each time? I'd anticipate that once you have a KGroupedStream you can re-use it as you can today. > This question doesn’t pop into my head at all in the builder pattern I > assume I can reuse everything. > Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan of > the grouped. > > Yes, grouped() was more for demonstration and because groupBy() and groupByKey() were taken! So i'd imagine the api would actually want to be groupByKey(/** no required args***/).withOptionalArg() and groupBy(KeyValueMapper m).withOpitionalArg(...) of course this all depends on maintaining backward compatibility. > Unfortunately, the below approach would require atleast 2 (probably 3) > overloads (one for returning a KTable and one for returning a KTable with > Windowed Key, probably would want to split windowed and sessionwindowed for > ease of implementation) of each count, reduce, and aggregate. > Obviously not exhaustive but enough for you to get the picture. Count, > Reduce, and Aggregate supply 3 static methods to initialize the builder: > // Count > KTable<String, Long> count = > groupedStream.count(Count.count().withQueryableStoreName("my-store")); > > // Windowed Count > KTable<Windowed<String>, Long> windowedCount = > groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store")); > > // Session Count > KTable<Windowed<String>, Long> sessionCount = > groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store")); > > Above and below, i think i'd prefer it to be: groupedStream.count(/** non windowed count**/) groupedStream.windowed(TimeWindows.of(10L)).count(...) groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...) > // Reduce > Reducer<Long> reducer; > KTable<String, Long> reduce = groupedStream.reduce(reducer, > Reduce.reduce().withQueryableStoreName("my-store")); > > // Aggregate Windowed with Custom Store > Initializer<String> initializer; > Aggregator<String, Long, String> aggregator; > KTable<Windowed<String>, String> aggregate = > groupedStream.aggregate(initializer, aggregator, > Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier))); > > // Cogroup SessionWindowed > KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1) > .cogroup(groupedStream2, aggregator2) > .aggregate(initializer, aggregator, > Aggregate.sessionWindowed(SessionWindows.with(10L), > sessionMerger).withQueryableStoreName("my-store")); > > > > public class Count { > > public static class Windowed extends Count { > private Windows windows; > } > public static class SessionWindowed extends Count { > private SessionWindows sessionWindows; > } > > public static Count count(); > public static Windowed windowed(Windows windows); > public static SessionWindowed sessionWindowed(SessionWindows > sessionWindows); > > // All withXXX(...) methods. > } > > public class KGroupedStream { > public KTable<K, Long> count(Count count); > public KTable<Windowed<K>, Long> count(Count.Windowed count); > public KTable<Windowed<K>, Long> count(Count.SessionWindowed count); > … > } > > > Thanks, > Kyle > > From: Guozhang Wang > Sent: Wednesday, June 28, 2017 7:45 PM > To: [email protected] > Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring > > I played the current proposal a bit with https://github.com/dguy/kafka/ > tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment>, > and here are my observations: > > 1. Personally I prefer > > "stream.group(mapper) / stream.groupByKey()" > > than > > "stream.group().withKeyMapper(mapper) / stream.group()" > > Since 1) withKeyMapper is not enforced programmatically though it is not > "really" optional like others, 2) syntax-wise it reads more natural. > > I think it is okay to add the APIs in ( > > https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java > ) > in KGroupedStream. > > > 2. For the "withStateStoreSupplier" API, are the user supposed to pass in > the most-inner state store supplier (e.g. then one whose get() return > RocksDBStore), or it is supposed to return the most-outer supplier with > logging / metrics / etc? I think it would be more useful to only require > users pass in the inner state store supplier while specifying caching / > logging through other APIs. > > In addition, the "GroupedWithCustomStore" is a bit suspicious to me: we are > allowing users to call other APIs like "withQueryableName" multiple time, > but only call "withStateStoreSupplier" only once in the end. Why is that? > > > 3. The current DSL seems to be only for aggregations, what about joins? > > > 4. I think it is okay to keep the "withLogConfig": for the > StateStoreSupplier it will still be user code specifying the topology so I > do not see there is a big difference. > > > 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the > windowed state store supplier to enforce typing? > > > Below are minor ones: > > 6. "withQueryableName": maybe better "withQueryableStateName"? > > 7. "withLogConfig": maybe better "withLoggingTopicConfig()"? > > > > Guozhang > > > > On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <[email protected]> > wrote: > > > I see your point about "when to add the processor to the topology". That > > is indeed an issue. Not sure it we could allow "updates" to the > topology... > > > > I don't see any problem with having all the withXX() in KTable interface > > -- but this might be subjective. > > > > > > However, I don't understand your argument about putting aggregate() > > after the withXX() -- all the calls to withXX() set optional parameters > > for aggregate() and not for groupBy() -- but a groupBy().withXX() > > indicates that the withXX() belongs to the groupBy(). IMHO, this might > > be quite confusion for developers. > > > > > > -Matthias > > > > On 6/28/17 2:55 AM, Damian Guy wrote: > > >> I also think that mixing optional parameters with configs is a bad > idea. > > >> Have not proposal for this atm but just wanted to mention it. Hope to > > >> find some time to come up with something. > > >> > > >> > > > Yes, i don't like the mix of config either. But the only real config > here > > > is the logging config - which we don't really need as it can already be > > > done via a custom StateStoreSupplier. > > > > > > > > >> What I don't like in the current proposal is the > > >> .grouped().withKeyMapper() -- the current solution with .groupBy(...) > > >> and .groupByKey() seems better. For clarity, we could rename to > > >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find > > >> some better names). > > >> > > >> > > > it could be groupByKey(), groupBy() or something different bt > > > > > > > > > > > >> The proposed pattern "chains" grouping and aggregation too close > > >> together. I would rather separate both more than less, ie, do into the > > >> opposite direction. > > >> > > >> I am also wondering, if we could so something more "fluent". The > initial > > >> proposal was like: > > >> > > >>>> groupedStream.count() > > >>>> .withStoreName("name") > > >>>> .withCachingEnabled(false) > > >>>> .withLoggingEnabled(config) > > >>>> .table() > > >> > > >> The .table() statement in the end was kinda alien. > > >> > > > > > > I agree, but then all of the withXXX methods need to be on KTable which > > is > > > worse in my opinion. You also need something that is going to "build" > the > > > internal processors and add them to the topology. > > > > > > > > >> The current proposal put the count() into the end -- ie, the optional > > >> parameter for count() have to specified on the .grouped() call -- this > > >> does not seems to be the best way either. > > >> > > >> > > > I actually prefer this method as you are building a grouped stream that > > you > > > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..) > > etc > > > seems natural to me. > > > > > > > > >> I did not think this through in detail, but can't we just do the > initial > > >> proposal with the .table() ? > > >> > > >> groupedStream.count().withStoreName("name").mapValues(...) > > >> > > >> Each .withXXX(...) return the current KTable and all the .withXXX() > are > > >> just added to the KTable interface. Or do I miss anything why this > wont' > > >> work or any obvious disadvantage? > > >> > > >> > > >> > > > See above. > > > > > > > > >> > > >> -Matthias > > >> > > >> On 6/22/17 4:06 AM, Damian Guy wrote: > > >>> Thanks everyone. My latest attempt is below. It builds on the fluent > > >>> approach, but i think it is slightly nicer. > > >>> I agree with some of what Eno said about mixing configy stuff in the > > DSL, > > >>> but i think that enabling caching and enabling logging are things > that > > >>> aren't actually config. I'd probably not add withLogConfig(...) (even > > >>> though it is below) as this is actually config and we already have a > > way > > >> of > > >>> doing that, via the StateStoreSupplier. Arguably we could use the > > >>> StateStoreSupplier for disabling caching etc, but as it stands that > is > > a > > >>> bit of a tedious process for someone that just wants to use the > default > > >>> storage engine, but not have caching enabled. > > >>> > > >>> There is also an orthogonal concern that Guozhang alluded to.... If > you > > >>> want to plug in a custom storage engine and you want it to be logged > > etc, > > >>> you would currently need to implement that yourself. Ideally we can > > >> provide > > >>> a way where we will wrap the custom store with logging, metrics, > etc. I > > >>> need to think about where this fits, it is probably more appropriate > on > > >> the > > >>> Stores API. > > >>> > > >>> final KeyValueMapper<String, String, Long> keyMapper = null; > > >>> // count with mapped key > > >>> final KTable<Long, Long> count = stream.grouped() > > >>> .withKeyMapper(keyMapper) > > >>> .withKeySerde(Serdes.Long()) > > >>> .withValueSerde(Serdes.String()) > > >>> .withQueryableName("my-store") > > >>> .count(); > > >>> > > >>> // windowed count > > >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped() > > >>> .withQueryableName("my-window-store") > > >>> .windowed(TimeWindows.of(10L).until(10)) > > >>> .count(); > > >>> > > >>> // windowed reduce > > >>> final Reducer<String> windowedReducer = null; > > >>> final KTable<Windowed<String>, String> windowedReduce = > > stream.grouped() > > >>> .withQueryableName("my-window-store") > > >>> .windowed(TimeWindows.of(10L).until(10)) > > >>> .reduce(windowedReducer); > > >>> > > >>> final Aggregator<String, String, Long> aggregator = null; > > >>> final Initializer<Long> init = null; > > >>> > > >>> // aggregate > > >>> final KTable<String, Long> aggregate = stream.grouped() > > >>> .withQueryableName("my-aggregate-store") > > >>> .aggregate(aggregator, init, Serdes.Long()); > > >>> > > >>> final StateStoreSupplier<KeyValueStore<String, Long>> > > stateStoreSupplier > > >> = null; > > >>> > > >>> // aggregate with custom store > > >>> final KTable<String, Long> aggWithCustomStore = stream.grouped() > > >>> .withStateStoreSupplier(stateStoreSupplier) > > >>> .aggregate(aggregator, init); > > >>> > > >>> // disable caching > > >>> stream.grouped() > > >>> .withQueryableName("name") > > >>> .withCachingEnabled(false) > > >>> .count(); > > >>> > > >>> // disable logging > > >>> stream.grouped() > > >>> .withQueryableName("q") > > >>> .withLoggingEnabled(false) > > >>> .count(); > > >>> > > >>> // override log config > > >>> final Reducer<String> reducer = null; > > >>> stream.grouped() > > >>> .withLogConfig(Collections.singletonMap("segment.size", > "10")) > > >>> .reduce(reducer); > > >>> > > >>> > > >>> If anyone wants to play around with this you can find the code here: > > >>> https://github.com/dguy/kafka/tree/dsl-experiment > > >>> > > >>> Note: It won't actually work as most of the methods just return null. > > >>> > > >>> Thanks, > > >>> Damian > > >>> > > >>> > > >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <[email protected]> wrote: > > >>> > > >>>> Thanks Damian. I think both options have pros and cons. And both are > > >> better > > >>>> than overload abuse. > > >>>> > > >>>> The fluent API approach reads better, no mention of builder or build > > >>>> anywhere. The main downside is that the method signatures are a > little > > >> less > > >>>> clear. By reading the method signature, one doesn't necessarily > knows > > >> what > > >>>> it returns. Also, one needs to figure out the special method > > (`table()` > > >> in > > >>>> this case) that gives you what you actually care about (`KTable` in > > this > > >>>> case). Not major issues, but worth mentioning while doing the > > >> comparison. > > >>>> > > >>>> The builder approach avoids the issues mentioned above, but it > doesn't > > >> read > > >>>> as well. > > >>>> > > >>>> Ismael > > >>>> > > >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <[email protected]> > > >> wrote: > > >>>> > > >>>>> Hi, > > >>>>> > > >>>>> I'd like to get a discussion going around some of the API choices > > we've > > >>>>> made in the DLS. In particular those that relate to stateful > > operations > > >>>>> (though this could expand). > > >>>>> As it stands we lean heavily on overloaded methods in the API, i.e, > > >> there > > >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy > > and > > >> i > > >>>>> feel it is only going to get worse as we add more optional params. > In > > >>>>> particular we've had some requests to be able to turn caching off, > or > > >>>>> change log configs, on a per operator basis (note this can be done > > now > > >>>> if > > >>>>> you pass in a StateStoreSupplier, but this can be a bit > cumbersome). > > >>>>> > > >>>>> So this is a bit of an open question. How can we change the DSL > > >> overloads > > >>>>> so that it flows, is simple to use and understand, and is easily > > >> extended > > >>>>> in the future? > > >>>>> > > >>>>> One option would be to use a fluent API approach for providing the > > >>>> optional > > >>>>> params, so something like this: > > >>>>> > > >>>>> groupedStream.count() > > >>>>> .withStoreName("name") > > >>>>> .withCachingEnabled(false) > > >>>>> .withLoggingEnabled(config) > > >>>>> .table() > > >>>>> > > >>>>> > > >>>>> > > >>>>> Another option would be to provide a Builder to the count method, > so > > it > > >>>>> would look something like this: > > >>>>> groupedStream.count(new > > >>>>> CountBuilder("storeName").withCachingEnabled(false).build()) > > >>>>> > > >>>>> Another option is to say: Hey we don't need this, what are you on > > >> about! > > >>>>> > > >>>>> The above has focussed on state store related overloads, but the > same > > >>>> ideas > > >>>>> could be applied to joins etc, where we presently have many join > > >> methods > > >>>>> and many overloads. > > >>>>> > > >>>>> Anyway, i look forward to hearing your opinions. > > >>>>> > > >>>>> Thanks, > > >>>>> Damian > > >>>>> > > >>>> > > >>> > > >> > > >> > > > > > > > > > > -- > -- Guozhang > >
