On Fri, 30 Jun 2017 at 18:40 Matthias J. Sax <matth...@confluent.io> wrote:

> > Hmmm, i don't agree. Until is a property of the window. It is going to be
> > potentially different for every window operation you do in a streams app.
>
> I am not sure.
>
> (1) for me, it is definitely a config for the operator for how long
> windows are maintained. It's not part of the window definition (as per
> definition there is nothing like a retention time -- that's just a
> practical necessity as we don't have infinite memory).
>

You could also argue that the size of the window is config.


> (2) Maybe you want different values for different windows, but than we
> need operator level configs instead of global configs. But it's not a
> reason to add config methods to the DSL.
>

I don't think we should go down this path. It is pretty simple as it is and
config driven development is not something i'd like to strive for.


>
> (3) I am actually not too sure if local configs for window retention are
> too useful.
> (a) If you have consecutive windows, there is some dependency anyway, as
> a downstream window cannot exploit a larger retention time than an
> upstream window.
> (b) Shouldn't retention time be more or less applied to the "input
> topics" -- the point is, how long do you want to accept late date? That
> sound like a global question for the applications with regard to the
> input -- not necessarily a operator/window question.
>
>
Sure, within a sub-topolgy it makes sense for the window retention to be
the same. However, different sub-toplogies may have different constraints.


>
> -Matthias
>
> On 6/30/17 12:31 AM, Damian Guy wrote:
> > Thanks Matthias
> >
> > On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> I am just catching up on this thread, so sorry for the long email in
> >> advance... Also, it's to some extend a dump of thoughts and not always a
> >> clear proposal. Still need to think about this in more detail. But maybe
> >> it helps other to get new ideas :)
> >>
> >>
> >>>> However, I don't understand your argument about putting aggregate()
> >>>> after the withXX() -- all the calls to withXX() set optional
> parameters
> >>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
> >>>> indicates that the withXX() belongs to the groupBy(). IMHO, this might
> >>>> be quite confusion for developers.
> >>>>
> >>>>
> >>> I see what you are saying, but the grouped stream is effectively a
> no-op
> >>> until you call one of the aggregate/count/reduce etc functions. So the
> >>> optional params are ones that are applicable to any of the operations
> you
> >>> can perform on this grouped stream. Then the final
> >>> count()/reduce()/aggregate() call has any of the params that are
> >>> required/specific to that function.
> >>>
> >>
> >> I understand your argument, but you don't share the conclusion. If we
> >> need a "final/terminal" call, the better way might be
> >>
> >> .groupBy().count().withXX().build()
> >>
> >> (with a better name for build() though)
> >>
> >>
> > The point is that all the other calls, i.e,withBlah, windowed, etc apply
> > too all the aggregate functions. The terminal call being the actual type
> of
> > aggregation you want to do. I personally find this more natural than
> > groupBy().count().withBlah().build()
> >
> >
> >>> groupedStream.count(/** non windowed count**/)
> >>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
> >>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
> >>
> >>
> >> I like this. However, I don't see a reason to have windowed() and
> >> sessionWindowed(). We should have one top-level `Windows` interface that
> >> both `TimeWindows` and `SessionWindows` implement and just have a single
> >> windowed() method that accepts all `Windows`. (I did not like the
> >> separation of `SessionWindows` in the first place, and this seems to be
> >> an opportunity to clean this up. It was hard to change when we
> >> introduced session windows)
> >>
> >
> > Yes - true we should look into that.
> >
> >
> >>
> >> Btw: we do you the imperative groupBy() and groupByKey(), and thus we
> >> might also want to use windowBy() (instead of windowed()). Not sure how
> >> important this is, but it seems to be inconsistent otherwise.
> >>
> >>
> > Makes sense
> >
> >
> >>
> >> About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I think,
> >> defining an inner/left/outer join is not an optional argument but a
> >> first class concept and should have a proper representation in the API
> >> (like the current methods join(), leftJoin, outerJoin()).
> >>
> >>
> > Yep, i did originally have it as a required param and maybe that is what
> we
> > go with. It could have a default, but maybe that is confusing.
> >
> >
> >
> >> About the two join API proposals, the second one has too much boiler
> >> plate code for my taste. Also, the actual join() operator has only one
> >> argument what is weird to me, as in my thinking process, the main
> >> operator call, should have one parameter per mandatory argument but your
> >> proposal put the mandatory arguments into Joins.streamStreamJoin() call.
> >> This is far from intuitive IMHO.
> >>
> >>
> > This is the builder pattern, you only need one param as the builder has
> > captured all of the required and optional arguments.
> >
> >
> >> The first join proposal also seems to align better with the pattern
> >> suggested for aggregations and having the same pattern for all operators
> >> is important (as you stated already).
> >>
> >>
> > This is why i offered two alternatives as i started out with. 1 is the
> > builder pattern, the other is the more fluent pattern.
> >
> >
> >>
> >>
> >> Coming back to the config vs optional parameter. What about having a
> >> method withConfig[s](...) that allow to put in the configuration?
> >>
> >>
> > Sure, it is currently called withLogConfig() as that is the only thing
> that
> > is really config.
> >
> >
> >> This also raises the question if until() is a windows property?
> >> Actually, until() seems to be a configuration parameter and thus, should
> >> not not have it's own method.
> >>
> >>
> > Hmmm, i don't agree. Until is a property of the window. It is going to be
> > potentially different for every window operation you do in a streams app.
> >
> >
> >>
> >>
> >> Browsing throw your example DSL branch, I also saw this one:
> >>
> >>> final KTable<Windowed<String>, Long> windowed>
> >>  groupedStream.counting()
> >>>                  .windowed(TimeWindows.of(10L).until(10))
> >>>                  .table();
> >>
> >> This is an interesting idea, and it remind my on some feedback about "I
> >> wanted to count a stream, but there was no count() method -- I first
> >> needed to figure out, that I need to group the stream first to be able
> >> to count it. It does make sense in hindsight but was not obvious in the
> >> beginning". Thus, carrying out this thought, we could also do the
> >> following:
> >>
> >> stream.count().groupedBy().windowedBy().table();
> >>
> >> -> Note, I use "grouped" and "windowed" instead of imperative here, as
> >> it comes after the count()
> >>
> >> This would be more consistent than your proposal (that has grouping
> >> before but windowing after count()). It might even allow us to enrich
> >> the API with a some syntactic sugar like `stream.count().table()` to get
> >> the overall count of all records (this would obviously not scale, but we
> >> could support it -- if not now, maybe later).
> >>
> >>
> > I guess i'd prefer
> > stream.groupBy().windowBy().count()
> > stream.groupBy().windowBy().reduce()
> > stream.groupBy().count()
> >
> > As i said above, everything that happens before the final aggregate call
> > can be applied to any of them. So it makes sense to me to do those things
> > ahead of the final aggregate call.
> >
> >
> >> Last about builder pattern. I am convinced that we need some "terminal"
> >> operator/method that tells us when to add the processor to the topology.
> >> But I don't see the need for a plain builder pattern that feels alien to
> >> me (see my argument about the second join proposal). Using .stream() /
> >> .table() as use in many examples might work. But maybe a more generic
> >> name that we can use in all places like build() or apply() might also be
> >> an option.
> >>
> >>
> > Sure, a generic name might be ok.
> >
> >
> >
> >
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/29/17 7:37 AM, Damian Guy wrote:
> >>> Thanks Kyle.
> >>>
> >>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.k...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Damian,
> >>>>
> >>>>>>>> When trying to program in the fluent API that has been discussed
> >> most
> >>>> it
> >>>>>>>> feels difficult to know when you will actually get an object you
> can
> >>>> reuse.
> >>>>>>>> What if I make one KGroupedStream that I want to reuse, is it
> legal
> >> to
> >>>>>>>> reuse it or does this approach expect you to call grouped each
> time?
> >>>>
> >>>>>> I'd anticipate that once you have a KGroupedStream you can re-use it
> >> as
> >>>> you
> >>>>>> can today.
> >>>>
> >>>> You said it yourself in another post that the grouped stream is
> >>>> effectively a no-op until a count, reduce, or aggregate. The way I see
> >> it
> >>>> you wouldn’t be able to reuse anything except KStreams and KTables,
> >> because
> >>>> most of this fluent api would continue returning this (this being the
> >>>> builder object currently being manipulated).
> >>>
> >>> So, if you ever store a reference to anything but KStreams and KTables
> >> and
> >>>> you use it in two different ways then its possible you make
> conflicting
> >>>> withXXX() calls on the same builder.
> >>>>
> >>>>
> >>> No necessarily true. It could return a new instance of the builder,
> i.e.,
> >>> the builders being immutable. So if you held a reference to the builder
> >> it
> >>> would always be the same as it was when it was created.
> >>>
> >>>
> >>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
> >>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
> >>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
> >>>>
> >>>> I’ll admit that this shouldn’t happen but some user is going to do it
> >>>> eventually…
> >>>> Depending on implementation uses of groupedStreamWithDefaultSerdes
> would
> >>>> most likely be equivalent to the version withDeclaredSerdes. One work
> >>>> around would be to always make copies of the config objects you are
> >>>> building, but this approach has its own problem because now we have to
> >>>> identify which configs are equivalent so we don’t create repeated
> >>>> processors.
> >>>>
> >>>> The point of this long winded example is that we always have to be
> >>>> thinking about all of the possible ways it could be misused by a user
> >>>> (causing them to see hard to diagnose problems).
> >>>>
> >>>
> >>> Exactly! That is the point of the discussion really.
> >>>
> >>>
> >>>>
> >>>> In my attempt at a couple methods with builders I feel that I could
> >>>> confidently say the user couldn’t really mess it up.
> >>>>> // Count
> >>>>> KTable<String, Long> count =
> >>>>>
> kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
> >>>> The kGroupedStream is reusable and if they attempted to reuse the
> Count
> >>>> for some reason it would throw an error message saying that a store
> >> named
> >>>> “my-store” already exists.
> >>>>
> >>>>
> >>> Yes i agree and i think using builders is my preferred pattern.
> >>>
> >>> Cheers,
> >>> Damian
> >>>
> >>>
> >>>> Thanks,
> >>>> Kyle
> >>>>
> >>>> From: Damian Guy
> >>>> Sent: Thursday, June 29, 2017 3:59 AM
> >>>> To: dev@kafka.apache.org
> >>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
> >>>>
> >>>> Hi Kyle,
> >>>>
> >>>> Thanks for your input. Really appreciated.
> >>>>
> >>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.k...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> I like more of a builder pattern even though others have voiced
> against
> >>>>> it. The reason I like it is because it makes it clear to the user
> that
> >> a
> >>>>> call to KGroupedStream#count will return a KTable not some
> intermediate
> >>>>> class that I need to undetstand.
> >>>>>
> >>>>
> >>>> Yes, that makes sense.
> >>>>
> >>>>
> >>>>> When trying to program in the fluent API that has been discussed most
> >> it
> >>>>> feels difficult to know when you will actually get an object you can
> >>>> reuse.
> >>>>> What if I make one KGroupedStream that I want to reuse, is it legal
> to
> >>>>> reuse it or does this approach expect you to call grouped each time?
> >>>>
> >>>>
> >>>> I'd anticipate that once you have a KGroupedStream you can re-use it
> as
> >> you
> >>>> can today.
> >>>>
> >>>>
> >>>>> This question doesn’t pop into my head at all in the builder pattern
> I
> >>>>> assume I can reuse everything.
> >>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big
> fan
> >> of
> >>>>> the grouped.
> >>>>>
> >>>>> Yes, grouped() was more for demonstration and because groupBy() and
> >>>> groupByKey() were taken! So i'd imagine the api would actually want to
> >> be
> >>>> groupByKey(/** no required args***/).withOptionalArg() and
> >>>> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all
> >> depends
> >>>> on maintaining backward compatibility.
> >>>>
> >>>>
> >>>>> Unfortunately, the below approach would require atleast 2 (probably
> 3)
> >>>>> overloads (one for returning a KTable and one for returning a KTable
> >> with
> >>>>> Windowed Key, probably would want to split windowed and
> sessionwindowed
> >>>> for
> >>>>> ease of implementation) of each count, reduce, and aggregate.
> >>>>> Obviously not exhaustive but enough for you to get the picture.
> Count,
> >>>>> Reduce, and Aggregate supply 3 static methods to initialize the
> >> builder:
> >>>>> // Count
> >>>>> KTable<String, Long> count =
> >>>>>
> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
> >>>>>
> >>>>> // Windowed Count
> >>>>> KTable<Windowed<String>, Long> windowedCount =
> >>>>>
> >>>>
> >>
> groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store"));
> >>>>>
> >>>>> // Session Count
> >>>>> KTable<Windowed<String>, Long> sessionCount =
> >>>>>
> >>>>
> >>
> groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store"));
> >>>>>
> >>>>>
> >>>> Above and below, i think i'd prefer it to be:
> >>>> groupedStream.count(/** non windowed count**/)
> >>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
> >>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>> // Reduce
> >>>>> Reducer<Long> reducer;
> >>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer,
> >>>>> Reduce.reduce().withQueryableStoreName("my-store"));
> >>>>>
> >>>>> // Aggregate Windowed with Custom Store
> >>>>> Initializer<String> initializer;
> >>>>> Aggregator<String, Long, String> aggregator;
> >>>>> KTable<Windowed<String>, String> aggregate =
> >>>>> groupedStream.aggregate(initializer, aggregator,
> >>>>>
> >>>>
> >>
> Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier)));
> >>>>>
> >>>>> // Cogroup SessionWindowed
> >>>>> KTable<String, String> cogrouped =
> groupedStream1.cogroup(aggregator1)
> >>>>>         .cogroup(groupedStream2, aggregator2)
> >>>>>         .aggregate(initializer, aggregator,
> >>>>> Aggregate.sessionWindowed(SessionWindows.with(10L),
> >>>>> sessionMerger).withQueryableStoreName("my-store"));
> >>>>>
> >>>>>
> >>>>>
> >>>>> public class Count {
> >>>>>
> >>>>>     public static class Windowed extends Count {
> >>>>>         private Windows windows;
> >>>>>     }
> >>>>>     public static class SessionWindowed extends Count {
> >>>>>         private SessionWindows sessionWindows;
> >>>>>     }
> >>>>>
> >>>>>     public static Count count();
> >>>>>     public static Windowed windowed(Windows windows);
> >>>>>     public static SessionWindowed sessionWindowed(SessionWindows
> >>>>> sessionWindows);
> >>>>>
> >>>>>     // All withXXX(...) methods.
> >>>>> }
> >>>>>
> >>>>> public class KGroupedStream {
> >>>>>     public KTable<K, Long> count(Count count);
> >>>>>     public KTable<Windowed<K>, Long> count(Count.Windowed count);
> >>>>>     public KTable<Windowed<K>, Long> count(Count.SessionWindowed
> >> count);
> >>>>> …
> >>>>> }
> >>>>>
> >>>>>
> >>>>> Thanks,
> >>>>> Kyle
> >>>>>
> >>>>> From: Guozhang Wang
> >>>>> Sent: Wednesday, June 28, 2017 7:45 PM
> >>>>> To: dev@kafka.apache.org
> >>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
> >>>>>
> >>>>> I played the current proposal a bit with
> >> https://github.com/dguy/kafka/
> >>>>> tree/dsl-experiment <
> https://github.com/dguy/kafka/tree/dsl-experiment
> >>> ,
> >>>>> and here are my observations:
> >>>>>
> >>>>> 1. Personally I prefer
> >>>>>
> >>>>>     "stream.group(mapper) / stream.groupByKey()"
> >>>>>
> >>>>> than
> >>>>>
> >>>>>     "stream.group().withKeyMapper(mapper) / stream.group()"
> >>>>>
> >>>>> Since 1) withKeyMapper is not enforced programmatically though it is
> >> not
> >>>>> "really" optional like others, 2) syntax-wise it reads more natural.
> >>>>>
> >>>>> I think it is okay to add the APIs in (
> >>>>>
> >>>>>
> >>>>
> >>
> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
> >>>>> )
> >>>>> in KGroupedStream.
> >>>>>
> >>>>>
> >>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to
> pass
> >> in
> >>>>> the most-inner state store supplier (e.g. then one whose get() return
> >>>>> RocksDBStore), or it is supposed to return the most-outer supplier
> with
> >>>>> logging / metrics / etc? I think it would be more useful to only
> >> require
> >>>>> users pass in the inner state store supplier while specifying
> caching /
> >>>>> logging through other APIs.
> >>>>>
> >>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to me:
> we
> >>>> are
> >>>>> allowing users to call other APIs like "withQueryableName" multiple
> >> time,
> >>>>> but only call "withStateStoreSupplier" only once in the end. Why is
> >> that?
> >>>>>
> >>>>>
> >>>>> 3. The current DSL seems to be only for aggregations, what about
> joins?
> >>>>>
> >>>>>
> >>>>> 4. I think it is okay to keep the "withLogConfig": for the
> >>>>> StateStoreSupplier it will still be user code specifying the topology
> >> so
> >>>> I
> >>>>> do not see there is a big difference.
> >>>>>
> >>>>>
> >>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
> >>>>> windowed state store supplier to enforce typing?
> >>>>>
> >>>>>
> >>>>> Below are minor ones:
> >>>>>
> >>>>> 6. "withQueryableName": maybe better "withQueryableStateName"?
> >>>>>
> >>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
> >>>>>
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> I see your point about "when to add the processor to the topology".
> >>>> That
> >>>>>> is indeed an issue. Not sure it we could allow "updates" to the
> >>>>> topology...
> >>>>>>
> >>>>>> I don't see any problem with having all the withXX() in KTable
> >>>> interface
> >>>>>> -- but this might be subjective.
> >>>>>>
> >>>>>>
> >>>>>> However, I don't understand your argument about putting aggregate()
> >>>>>> after the withXX() -- all the calls to withXX() set optional
> >> parameters
> >>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
> >>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
> might
> >>>>>> be quite confusion for developers.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 6/28/17 2:55 AM, Damian Guy wrote:
> >>>>>>>> I also think that mixing optional parameters with configs is a bad
> >>>>> idea.
> >>>>>>>> Have not proposal for this atm but just wanted to mention it. Hope
> >>>> to
> >>>>>>>> find some time to come up with something.
> >>>>>>>>
> >>>>>>>>
> >>>>>>> Yes, i don't like the mix of config either. But the only real
> config
> >>>>> here
> >>>>>>> is the logging config - which we don't really need as it can
> already
> >>>> be
> >>>>>>> done via a custom StateStoreSupplier.
> >>>>>>>
> >>>>>>>
> >>>>>>>> What I don't like in the current proposal is the
> >>>>>>>> .grouped().withKeyMapper() -- the current solution with
> >>>> .groupBy(...)
> >>>>>>>> and .groupByKey() seems better. For clarity, we could rename to
> >>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we should
> find
> >>>>>>>> some better names).
> >>>>>>>>
> >>>>>>>>
> >>>>>>> it could be groupByKey(), groupBy() or something different bt
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>> The proposed pattern "chains" grouping and aggregation too close
> >>>>>>>> together. I would rather separate both more than less, ie, do into
> >>>> the
> >>>>>>>> opposite direction.
> >>>>>>>>
> >>>>>>>> I am also wondering, if we could so something more "fluent". The
> >>>>> initial
> >>>>>>>> proposal was like:
> >>>>>>>>
> >>>>>>>>>> groupedStream.count()
> >>>>>>>>>>    .withStoreName("name")
> >>>>>>>>>>    .withCachingEnabled(false)
> >>>>>>>>>>    .withLoggingEnabled(config)
> >>>>>>>>>>    .table()
> >>>>>>>>
> >>>>>>>> The .table() statement in the end was kinda alien.
> >>>>>>>>
> >>>>>>>
> >>>>>>> I agree, but then all of the withXXX methods need to be on KTable
> >>>> which
> >>>>>> is
> >>>>>>> worse in my opinion. You also need something that is going to
> "build"
> >>>>> the
> >>>>>>> internal processors and add them to the topology.
> >>>>>>>
> >>>>>>>
> >>>>>>>> The current proposal put the count() into the end -- ie, the
> >>>> optional
> >>>>>>>> parameter for count() have to specified on the .grouped() call --
> >>>> this
> >>>>>>>> does not seems to be the best way either.
> >>>>>>>>
> >>>>>>>>
> >>>>>>> I actually prefer this method as you are building a grouped stream
> >>>> that
> >>>>>> you
> >>>>>>> will aggregate. So
> >>>> table.grouped(...).withOptionalStuff().aggregate(..)
> >>>>>> etc
> >>>>>>> seems natural to me.
> >>>>>>>
> >>>>>>>
> >>>>>>>> I did not think this through in detail, but can't we just do the
> >>>>> initial
> >>>>>>>> proposal with the .table() ?
> >>>>>>>>
> >>>>>>>> groupedStream.count().withStoreName("name").mapValues(...)
> >>>>>>>>
> >>>>>>>> Each .withXXX(...) return the current KTable and all the
> .withXXX()
> >>>>> are
> >>>>>>>> just added to the KTable interface. Or do I miss anything why this
> >>>>> wont'
> >>>>>>>> work or any obvious disadvantage?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>> See above.
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote:
> >>>>>>>>> Thanks everyone. My latest attempt is below. It builds on the
> >>>> fluent
> >>>>>>>>> approach, but i think it is slightly nicer.
> >>>>>>>>> I agree with some of what Eno said about mixing configy stuff in
> >>>> the
> >>>>>> DSL,
> >>>>>>>>> but i think that enabling caching and enabling logging are things
> >>>>> that
> >>>>>>>>> aren't actually config. I'd probably not add withLogConfig(...)
> >>>> (even
> >>>>>>>>> though it is below) as this is actually config and we already
> have
> >>>> a
> >>>>>> way
> >>>>>>>> of
> >>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could use the
> >>>>>>>>> StateStoreSupplier for disabling caching etc, but as it stands
> that
> >>>>> is
> >>>>>> a
> >>>>>>>>> bit of a tedious process for someone that just wants to use the
> >>>>> default
> >>>>>>>>> storage engine, but not have caching enabled.
> >>>>>>>>>
> >>>>>>>>> There is also an orthogonal concern that Guozhang alluded to....
> If
> >>>>> you
> >>>>>>>>> want to plug in a custom storage engine and you want it to be
> >>>> logged
> >>>>>> etc,
> >>>>>>>>> you would currently need to implement that yourself. Ideally we
> can
> >>>>>>>> provide
> >>>>>>>>> a way where we will wrap the custom store with logging, metrics,
> >>>>> etc. I
> >>>>>>>>> need to think about where this fits, it is probably more
> >>>> appropriate
> >>>>> on
> >>>>>>>> the
> >>>>>>>>> Stores API.
> >>>>>>>>>
> >>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = null;
> >>>>>>>>> // count with mapped key
> >>>>>>>>> final KTable<Long, Long> count = stream.grouped()
> >>>>>>>>>         .withKeyMapper(keyMapper)
> >>>>>>>>>         .withKeySerde(Serdes.Long())
> >>>>>>>>>         .withValueSerde(Serdes.String())
> >>>>>>>>>         .withQueryableName("my-store")
> >>>>>>>>>         .count();
> >>>>>>>>>
> >>>>>>>>> // windowed count
> >>>>>>>>> final KTable<Windowed<String>, Long> windowedCount =
> >>>> stream.grouped()
> >>>>>>>>>         .withQueryableName("my-window-store")
> >>>>>>>>>         .windowed(TimeWindows.of(10L).until(10))
> >>>>>>>>>         .count();
> >>>>>>>>>
> >>>>>>>>> // windowed reduce
> >>>>>>>>> final Reducer<String> windowedReducer = null;
> >>>>>>>>> final KTable<Windowed<String>, String> windowedReduce =
> >>>>>> stream.grouped()
> >>>>>>>>>         .withQueryableName("my-window-store")
> >>>>>>>>>         .windowed(TimeWindows.of(10L).until(10))
> >>>>>>>>>         .reduce(windowedReducer);
> >>>>>>>>>
> >>>>>>>>> final Aggregator<String, String, Long> aggregator = null;
> >>>>>>>>> final Initializer<Long> init = null;
> >>>>>>>>>
> >>>>>>>>> // aggregate
> >>>>>>>>> final KTable<String, Long> aggregate = stream.grouped()
> >>>>>>>>>         .withQueryableName("my-aggregate-store")
> >>>>>>>>>         .aggregate(aggregator, init, Serdes.Long());
> >>>>>>>>>
> >>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>>
> >>>>>> stateStoreSupplier
> >>>>>>>> = null;
> >>>>>>>>>
> >>>>>>>>> // aggregate with custom store
> >>>>>>>>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
> >>>>>>>>>         .withStateStoreSupplier(stateStoreSupplier)
> >>>>>>>>>         .aggregate(aggregator, init);
> >>>>>>>>>
> >>>>>>>>> // disable caching
> >>>>>>>>> stream.grouped()
> >>>>>>>>>         .withQueryableName("name")
> >>>>>>>>>         .withCachingEnabled(false)
> >>>>>>>>>         .count();
> >>>>>>>>>
> >>>>>>>>> // disable logging
> >>>>>>>>> stream.grouped()
> >>>>>>>>>         .withQueryableName("q")
> >>>>>>>>>         .withLoggingEnabled(false)
> >>>>>>>>>         .count();
> >>>>>>>>>
> >>>>>>>>> // override log config
> >>>>>>>>> final Reducer<String> reducer = null;
> >>>>>>>>> stream.grouped()
> >>>>>>>>>         .withLogConfig(Collections.singletonMap("segment.size",
> >>>>> "10"))
> >>>>>>>>>         .reduce(reducer);
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> If anyone wants to play around with this you can find the code
> >>>> here:
> >>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment
> >>>>>>>>>
> >>>>>>>>> Note: It won't actually work as most of the methods just return
> >>>> null.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Damian
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk>
> >>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks Damian. I think both options have pros and cons. And both
> >>>> are
> >>>>>>>> better
> >>>>>>>>>> than overload abuse.
> >>>>>>>>>>
> >>>>>>>>>> The fluent API approach reads better, no mention of builder or
> >>>> build
> >>>>>>>>>> anywhere. The main downside is that the method signatures are a
> >>>>> little
> >>>>>>>> less
> >>>>>>>>>> clear. By reading the method signature, one doesn't necessarily
> >>>>> knows
> >>>>>>>> what
> >>>>>>>>>> it returns. Also, one needs to figure out the special method
> >>>>>> (`table()`
> >>>>>>>> in
> >>>>>>>>>> this case) that gives you what you actually care about (`KTable`
> >>>> in
> >>>>>> this
> >>>>>>>>>> case). Not major issues, but worth mentioning while doing the
> >>>>>>>> comparison.
> >>>>>>>>>>
> >>>>>>>>>> The builder approach avoids the issues mentioned above, but it
> >>>>> doesn't
> >>>>>>>> read
> >>>>>>>>>> as well.
> >>>>>>>>>>
> >>>>>>>>>> Ismael
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <
> damian....@gmail.com
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to get a discussion going around some of the API
> choices
> >>>>>> we've
> >>>>>>>>>>> made in the DLS. In particular those that relate to stateful
> >>>>>> operations
> >>>>>>>>>>> (though this could expand).
> >>>>>>>>>>> As it stands we lean heavily on overloaded methods in the API,
> >>>> i.e,
> >>>>>>>> there
> >>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming
> >>>> noisy
> >>>>>> and
> >>>>>>>> i
> >>>>>>>>>>> feel it is only going to get worse as we add more optional
> >>>> params.
> >>>>> In
> >>>>>>>>>>> particular we've had some requests to be able to turn caching
> >>>> off,
> >>>>> or
> >>>>>>>>>>> change log configs,  on a per operator basis (note this can be
> >>>> done
> >>>>>> now
> >>>>>>>>>> if
> >>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit
> >>>>> cumbersome).
> >>>>>>>>>>>
> >>>>>>>>>>> So this is a bit of an open question. How can we change the DSL
> >>>>>>>> overloads
> >>>>>>>>>>> so that it flows, is simple to use and understand, and is
> easily
> >>>>>>>> extended
> >>>>>>>>>>> in the future?
> >>>>>>>>>>>
> >>>>>>>>>>> One option would be to use a fluent API approach for providing
> >>>> the
> >>>>>>>>>> optional
> >>>>>>>>>>> params, so something like this:
> >>>>>>>>>>>
> >>>>>>>>>>> groupedStream.count()
> >>>>>>>>>>>    .withStoreName("name")
> >>>>>>>>>>>    .withCachingEnabled(false)
> >>>>>>>>>>>    .withLoggingEnabled(config)
> >>>>>>>>>>>    .table()
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Another option would be to provide a Builder to the count
> method,
> >>>>> so
> >>>>>> it
> >>>>>>>>>>> would look something like this:
> >>>>>>>>>>> groupedStream.count(new
> >>>>>>>>>>> CountBuilder("storeName").withCachingEnabled(false).build())
> >>>>>>>>>>>
> >>>>>>>>>>> Another option is to say: Hey we don't need this, what are you
> on
> >>>>>>>> about!
> >>>>>>>>>>>
> >>>>>>>>>>> The above has focussed on state store related overloads, but
> the
> >>>>> same
> >>>>>>>>>> ideas
> >>>>>>>>>>> could  be applied to joins etc, where we presently have many
> join
> >>>>>>>> methods
> >>>>>>>>>>> and many overloads.
> >>>>>>>>>>>
> >>>>>>>>>>> Anyway, i look forward to hearing your opinions.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Damian
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to