Hi Guozhang,

That sounds good to me. I'll include that in the KIP.

Thanks,
-John

On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Let me clarify a bit on what I meant about moving `retentionPeriod` to
> WindowStoreBuilder:
>
> In another discussion we had around KIP-319 / 330, that the "retention
> period" should not really be a window spec, but only a window store spec,
> as it only affects how long to retain each window to be queryable along
> with the storage cost.
>
> More specifically, today the "maintainMs" returned from Windows is used in
> three places:
>
> 1) for windowed aggregations, they are passed in directly into
> `Stores.persistentWindows()` as the retention period parameters. For this
> use case we should just let the WindowStoreBuilder to specify this value
> itself.
>
> NOTE: It is also returned in the KStreamWindowAggregate processor, to
> determine if a received record should be dropped due to its lateness. We
> may need to think of another way to get this value inside the processor
>
> 2) for windowed stream-stream join, it is used as the join range parameter
> but only to check that "windowSizeMs <= retentionPeriodMs". We can do this
> check at the store builder lever instead of at the processor level.
>
>
> If we can remove its usage in both 1) and 2), then we should be able to
> safely remove this from the `Windows` spec.
>
>
> Guozhang
>
>
> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> wrote:
>
> > Thanks for the reply, Guozhang,
> >
> > Good! I agree, that is also a good reason, and I actually made use of
> that
> > in my tests. I'll update the KIP.
> >
> > By the way, I chose "allowedLateness" as I was trying to pick a better
> name
> > than "close", but I think it's actually the wrong name. We don't want to
> > bound the lateness of events in general, only with respect to the end of
> > their window.
> >
> > If we have a window [0,10), with "allowedLateness" of 5, then if we get
> an
> > event with timestamp 3 at time 9, the name implies we'd reject it, which
> > seems silly. Really, we'd only want to start rejecting that event at
> stream
> > time 15.
> >
> > What I meant was more like "allowedLatenessAfterWindowEnd", but that's
> too
> > verbose. I think that "close" + some documentation about what it means
> will
> > be better.
> >
> > 1: "Close" would be measured from the end of the window, so a reasonable
> > default would be "0". Recall that "close" really only needs to be
> specified
> > for final results, and a default of 0 would produce the most intuitive
> > results. If folks later discover that they are missing some late events,
> > they can adjust the parameter accordingly. IMHO, any other value would
> just
> > be a guess on our part.
> >
> > 2a:
> > I think you're saying to re-use "until" instead of adding "close" to the
> > window.
> >
> > The downside here would be that the semantic change could be more
> confusing
> > than deprecating "until" and introducing window "close" and a
> > "retentionTime" on the store builder. The deprecation is a good,
> controlled
> > way for us to make sure people are getting the semantics they think
> they're
> > getting, as well as giving us an opportunity to link people to the API
> they
> > should use instead.
> >
> > I didn't fully understand the second part, but it sounds like you're
> > suggesting to add a new "retentionTime" setter to Windows to bridge the
> gap
> > until we add it to the store builder? That seems kind of roundabout to
> me,
> > if that's what you meant. We could just immediately add it to the store
> > builders in the same PR.
> >
> > 2b: Sounds good to me!
> >
> > Thanks again,
> > -John
> >
> >
> > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > John,
> > >
> > > Thanks for your replies. As for the two options of the API, I think I'm
> > > slightly inclined to the first option as well. My motivation is a bit
> > > different, as I think of the first one maybe more flexible, for
> example:
> > >
> > > KTable<Windowed<..>> table = ... count();
> > >
> > > table.toStream().peek(..);   // want to peek at the changelog stream,
> do
> > > not care about final results.
> > >
> > > table.suppress().toStream().to("topic");    // sending to a topic, want
> > to
> > > only send the final results.
> > >
> > > --------------
> > >
> > > Besides that, I have a few more minor questions:
> > >
> > > 1. For "allowedLateness", what should be the default value? I.e. if
> user
> > do
> > > not specify "allowedLateness" in TimeWindows, what value should we set?
> > >
> > > 2. For API names, some personal suggestions here:
> > >
> > > 2.a) "allowedLateness"  -> "until" (semantics changed, and also value
> is
> > > defined as delta on top of window length), where "until" ->
> > > "retentionPeriod", and the latter will be removed from `Windows` to `
> > > WindowStoreBuilder` in the future.
> > >
> > > 2.b) "BufferConfig" -> "Buffered" ?
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io>
> wrote:
> > >
> > > > Hey Matthias and Guozhang,
> > > >
> > > > Sorry for the slow reply. I was mulling about your feedback and
> > weighing
> > > > some ideas in a sketchbook PR: https://github.com/apache/
> > kafka/pull/5337
> > > .
> > > >
> > > > Your thought about keeping suppression independent of business logic
> > is a
> > > > very good one. I agree that it would make more sense to add some kind
> > of
> > > > "window close" concept to the window definition.
> > > >
> > > > In fact, doing that immediately solves the inconsistency problem
> > Guozhang
> > > > brought up. There's no need to add a "final results" or "emission"
> > option
> > > > to the windowed aggregation.
> > > >
> > > > What do you think about an API more like this:
> > > >
> > > > final StreamsBuilder builder = new StreamsBuilder();
> > > >
> > > > builder
> > > >   .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
> > > >   .groupBy(
> > > >     (String k1, String v1) -> k1,
> > > >     Serialized.with(STRING_SERDE, STRING_SERDE)
> > > >   )
> > > >   .windowedBy(TimeWindows
> > > >     .of(scaledTime(2L))
> > > >     .until(scaledTime(3L))
> > > >     .allowedLateness(scaledTime(1L))
> > > >   )
> > > >   .count(Materialized.as("counts"))
> > > >   .suppress(
> > > >     emitFinalResultsOnly(
> > > >       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
> > SHUT_DOWN)
> > > >     )
> > > >   )
> > > >   .toStream()
> > > >   .to("output-suppressed", Produced.with(STRING_SERDE, LONG_SERDE));
> > > >
> > > > Note that:
> > > >  * "emitFinalResultsOnly" is available *only* on windowed tables
> > > (enforced
> > > > by the type system at compile time), and it determines the time to
> wait
> > > by
> > > > looking at "allowedLateness" on the TimeWindows config.
> > > >  * querying "counts" will produce results (eventually) consistent
> with
> > > > what's observable in "output-suppressed".
> > > >  * in all cases, "suppress" has no effect on business logic, just on
> > > event
> > > > suppression.
> > > >
> > > > Is this API straightforward? Or do you still prefer the version that
> > both
> > > > proposed:
> > > >
> > > >   ...
> > > >   .windowedBy(TimeWindows
> > > >     .of(scaledTime(2L))
> > > >     .until(scaledTime(3L))
> > > >     .allowedLateness(scaledTime(1L))
> > > >   )
> > > >   .count(
> > > >     Materialized.as("counts"),
> > > >     emitFinalResultsOnly(
> > > >       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
> > SHUT_DOWN)
> > > >     )
> > > >   )
> > > >   ...
> > > >
> > > > To me, these two are practically identical, and I still vaguely
> prefer
> > > the
> > > > first one.
> > > >
> > > > The prototype has made clearer to me that users of "final results for
> > > > windows" and users of "suppression for table events" both need to
> > > configure
> > > > the suppression buffer.
> > > >
> > > > This buffer configuration consists of:
> > > > 1. how many keys or bytes to keep in memory
> > > > 2. what to do if memory runs out (shut down, start using disk, ...)
> > > >
> > > > So it's not as simple as setting a "final results" flag. We'll either
> > > have
> > > > an "Emit" config object on the windowed aggregators that takes the
> same
> > > > BufferConfig that the "Suppress" config on the suppression operator,
> or
> > > we
> > > > just use the suppression operator for both.
> > > >
> > > > Perhaps it would sweeten the deal a little to point out that we have
> 2
> > > > overloads already for each windowed aggregator (with and without
> > > > Materialized). Adding "Emitted" or something would mean that we'd
> add a
> > > new
> > > > overload for each one, taking us up to 4 overloads each for "count",
> > > > "aggregate" and "reduce". Using "suppress" means that we don't add
> any
> > > new
> > > > overloads.
> > > >
> > > > Thanks again for helping to hash this out,
> > > > -John
> > > >
> > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > > > I think I agree with Matthias for having dedicated APIs for
> windowed
> > > > > operation final output scenario, PLUS separating the window close
> > which
> > > > the
> > > > > "final output" would rely on, from the window retention time itself
> > > > > (admittedly it would make this KIP effort larger, but if we believe
> > we
> > > > need
> > > > > to do this separation anyways we could just do it now).
> > > > >
> > > > > And then we can have the `KTable#suppress()` for
> > > intermediate-suppression
> > > > > only, not for late-record-suppression, until we've seen that
> becomes
> > a
> > > > > common feature request because our current design still allows to
> be
> > > > > extended for that purpose.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the discussion. I am just catching up.
> > > > > >
> > > > > > In general, I think we have different uses cases and non-windowed
> > and
> > > > > > windowed is quite different. For the non-windowed case,
> suppress()
> > > has
> > > > > > no (useful) close or retention time, no final semantics, and also
> > no
> > > > > > business logic impact.
> > > > > >
> > > > > > On the other hand, for windowed aggregations, close time and
> final
> > > > > > result do have a meaning. IMHO, `close()` is part of business
> logic
> > > > > > while retention time is not. Also, suppression of intermediate
> > result
> > > > is
> > > > > > not a business rule and there might be use case for which either
> > > "early
> > > > > > intermediate" (before window end time) are suppressed only, or
> all
> > > > > > intermediates are suppressed (maybe also something in the middle,
> > ie,
> > > > > > just reduce the load of intermediate updates). Thus,
> > > window-suppression
> > > > > > is much richer.
> > > > > >
> > > > > > IMHO, a generic `suppress()` operator that can be inserted into
> the
> > > > data
> > > > > > flow at any point is useful. Maybe we should keep is as generic
> as
> > > > > > possible. However, it might be difficult to use with regard to
> > > > > > windowing, as the mental effort to use it is high.
> > > > > >
> > > > > > With regard to Guozhang's comment:
> > > > > >
> > > > > > > we will actually
> > > > > > > process data as old as 30 days as well, while most of the late
> > > > updates
> > > > > > > beyond 5 minutes would be discarded anyways.
> > > > > >
> > > > > > If we use `suppress()` as a standalone operator, this is correct
> > and
> > > > > > intended IMHO. To address the issue if the behavior is unwanted,
> I
> > > > would
> > > > > > suggest to add a "suppress option" directly to
> > > > > > `count()/reduce()/aggregate()` window operator similar to
> > > > > > `Materialized`. This would be an "embedded suppress" and avoid
> the
> > > > > > issue. It would also address the issue about mental effort for
> > > "single
> > > > > > final window result" use case.
> > > > > >
> > > > > > I also think that a shorter close-time than retention time is
> > useful
> > > > for
> > > > > > window aggregation. If we add close() to the window definition
> and
> > > > > > until() to `Materialized`, we can separate both correctly IMHO.
> > > > > >
> > > > > > About setting `close = min(close,retention)` I am not sure. We
> > might
> > > > > > rather throw an exception than reducing the close time
> > automatically.
> > > > > > Otherwise, I see many user question about "I set close to X but
> it
> > > does
> > > > > > not get updated for some data that is with delay of X".
> > > > > >
> > > > > > The tricky question might be to design the API in a backward
> > > compatible
> > > > > > way though.
> > > > > >
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 7/3/18 5:38 AM, John Roesler wrote:
> > > > > > > Hi Guozhang,
> > > > > > >
> > > > > > > I see. It seems like if we want to decouple 1) and 2), we need
> to
> > > > alter
> > > > > > the
> > > > > > > definition of the window. Do you think it would close the gap
> if
> > we
> > > > > > added a
> > > > > > > "window close" time to the window definition?
> > > > > > >
> > > > > > > Such as:
> > > > > > >
> > > > > > > builder.stream("input")
> > > > > > > .groupByKey()
> > > > > > > .windowedBy(
> > > > > > >   TimeWindows
> > > > > > >     .of(60_000)
> > > > > > >     .closeAfter(10 * 60)
> > > > > > >     .until(30L * 24 * 60 * 60 * 1000)
> > > > > > > )
> > > > > > > .count()
> > > > > > > .suppress(Suppression.finalResultsOnly());
> > > > > > >
> > > > > > > Possibly called "finalResultsAtWindowClose" or something?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hey John,
> > > > > > >>
> > > > > > >> Obviously I'm too lazy on email replying diligence compared
> with
> > > you
> > > > > :)
> > > > > > >> Will try to reply them separately:
> > > > > > >>
> > > > > > >>
> > > > > > >> ------------------------------------------------------------
> > > > > > -----------------
> > > > > > >>
> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM":
> > > > > > >>
> > > > > > >> I'm aware of this use case, but again, the concern is that, in
> > > this
> > > > > > setting
> > > > > > >> in order to let the window be queryable for 30 days, we will
> > > > actually
> > > > > > >> process data as old as 30 days as well, while most of the late
> > > > updates
> > > > > > >> beyond 5 minutes would be discarded anyways. Personally I
> think
> > > for
> > > > > the
> > > > > > >> final update scenario, the ideal situation users would want is
> > > that
> > > > > "do
> > > > > > not
> > > > > > >> process any data that is less than 5 minutes, and of course no
> > > > update
> > > > > > >> records to the downstream later than 5 minutes either; but
> > retain
> > > > the
> > > > > > >> window to be queryable for 30 days". And by doing that the
> final
> > > > > window
> > > > > > >> snapshot would also be aligned with the update stream as well.
> > In
> > > > > other
> > > > > > >> words, among these three periods:
> > > > > > >>
> > > > > > >> 1) the retention length of the window / table.
> > > > > > >> 2) the late records acceptance for updating the window.
> > > > > > >> 3) the late records update to be sent downstream.
> > > > > > >>
> > > > > > >> Final update use cases would naturally want 2) = 3), while 1)
> > may
> > > be
> > > > > > >> different and larger, while what we provide now is that 1) =
> 2),
> > > > which
> > > > > > >> could be different and in practice larger than 3), hence not
> the
> > > > most
> > > > > > >> intuitive for their needs.
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> ------------------------------------------------------------
> > > > > > -----------------
> > > > > > >>
> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM":
> > > > > > >>
> > > > > > >> I'd like option 2) over option 1) better as well from
> > programming
> > > > pov.
> > > > > > But
> > > > > > >> I'm wondering if option 2) would provide the above semantics
> or
> > it
> > > > is
> > > > > > still
> > > > > > >> coupling 1) with 2) as well ?
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> Guozhang
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > >>
> > > > > > >>> In fact, to push the idea further (which IIRC is what
> Matthias
> > > > > > originally
> > > > > > >>> proposed), if we can accept "Suppression#finalResultsOnly" in
> > my
> > > > last
> > > > > > >>> email, then we could also consider whether to eliminate
> > > > > > >>> "suppressLateEvents" entirely.
> > > > > > >>>
> > > > > > >>> We could always add it later, but you've both expressed doubt
> > > that
> > > > > > there
> > > > > > >>> are practical use cases for it outside of final-results.
> > > > > > >>>
> > > > > > >>> -John
> > > > > > >>>
> > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler <
> > j...@confluent.io>
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my
> response...
> > > > > > >>>>
> > > > > > >>>> It seems like your main concern is: "if I'm a user who wants
> > > final
> > > > > > >> update
> > > > > > >>>> semantics, how complicated is it for me to get it?"
> > > > > > >>>>
> > > > > > >>>> I think we have to assume that people don't always have time
> > to
> > > > > become
> > > > > > >>>> deeply familiar with all the nuances of a programming
> > > environment
> > > > > > >> before
> > > > > > >>>> they use it. Especially if they're evaluating several
> > frameworks
> > > > for
> > > > > > >>> their
> > > > > > >>>> use case, it's very valuable to make it as obvious as
> possible
> > > how
> > > > > to
> > > > > > >>>> accomplish various computations with Streams.
> > > > > > >>>>
> > > > > > >>>> To me the biggest question is whether with a fresh
> > perspective,
> > > > > people
> > > > > > >>>> would say "oh, I get it, I have to bound my lateness and
> > > suppress
> > > > > > >>>> intermediate updates, and of course I'll get only the final
> > > > > result!",
> > > > > > >> or
> > > > > > >>> if
> > > > > > >>>> it's more like "wtf? all I want is the final result, what
> are
> > > all
> > > > > > these
> > > > > > >>>> parameters?".
> > > > > > >>>>
> > > > > > >>>> I was talking with Matthias a while back, and he had an idea
> > > that
> > > > I
> > > > > > >> think
> > > > > > >>>> can help, which is to essentially set up a final-result
> recipe
> > > in
> > > > > > >>> addition
> > > > > > >>>> to the raw parameters. I previously thought that it wouldn't
> > be
> > > > > > >> possible
> > > > > > >>> to
> > > > > > >>>> restrict its usage to Windowed KTables, but thinking about
> it
> > > > again
> > > > > > >> this
> > > > > > >>>> weekend, I have a couple of ideas:
> > > > > > >>>>
> > > > > > >>>> ================
> > > > > > >>>> = 1. Static Wrapper =
> > > > > > >>>> ================
> > > > > > >>>> We can define an extra static function that "wraps" a KTable
> > > with
> > > > > > >>>> final-result semantics.
> > > > > > >>>>
> > > > > > >>>> public static <K extends Windowed, V> KTable<K, V>
> > > > finalResultsOnly(
> > > > > > >>>>   final KTable<K, V> windowedKTable,
> > > > > > >>>>   final Duration maxAllowedLateness,
> > > > > > >>>>   final Suppression.BufferFullStrategy bufferFullStrategy) {
> > > > > > >>>>     return windowedKTable.suppress(
> > > > > > >>>>         Suppression.suppressLateEvents(maxAllowedLateness)
> > > > > > >>>>                    .suppressIntermediateEvents(
> > > > > > >>>>                      IntermediateSuppression
> > > > > > >>>>                        .emitAfter(maxAllowedLateness)
> > > > > > >>>>                        .bufferFullStrategy(
> > bufferFullStrategy)
> > > > > > >>>>                    )
> > > > > > >>>>     );
> > > > > > >>>> }
> > > > > > >>>>
> > > > > > >>>> Because windowedKTable is a parameter, the static function
> can
> > > > > easily
> > > > > > >>>> impose an extra bound on the key type, that it extends
> > Windowed.
> > > > > This
> > > > > > >>> would
> > > > > > >>>> make "final results only" only available on windowed
> ktables.
> > > > > > >>>>
> > > > > > >>>> Here's how it would look to use:
> > > > > > >>>>
> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ...
> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> > > > > > >>>>   finalResultsOnly(
> > > > > > >>>>     windowCounts,
> > > > > > >>>>     Duration.ofMinutes(10),
> > > > > > >>>>     Suppression.BufferFullStrategy.SHUT_DOWN
> > > > > > >>>>   );
> > > > > > >>>>
> > > > > > >>>> Trying to use it on a non-windowed KTable yields:
> > > > > > >>>>
> > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class
> > > > > > >>>>> org.apache.kafka.streams.kstream.internals.
> > KTableAggregateTest
> > > > > > cannot
> > > > > > >>> be
> > > > > > >>>>> applied to given types;
> > > > > > >>>>>   required:
> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time.
> > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression.
> > > > > > BufferFullStrategy
> > > > > > >>>>>   found:
> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang.
> > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache.
> > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy
> > > > > > >>>>>   reason: inference variable K has incompatible bounds
> > > > > > >>>>>     equality constraints: java.lang.String
> > > > > > >>>>>     upper bounds: org.apache.kafka.streams.kstream.Windowed
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> =================================================
> > > > > > >>>> = 2. Add <K,V> parameters and recipe method to Suppression =
> > > > > > >>>> =================================================
> > > > > > >>>>
> > > > > > >>>> By adding K,V parameters to Suppression, we can provide a
> > > > similarly
> > > > > > >>>> bounded config method directly on the Suppression class:
> > > > > > >>>>
> > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V>
> > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final
> > > > > > >>>> BufferFullStrategy bufferFullStrategy) {
> > > > > > >>>>     return Suppression
> > > > > > >>>>         .<K, V>suppressLateEvents(maxAllowedLateness)
> > > > > > >>>>         .suppressIntermediateEvents(IntermediateSuppression
> > > > > > >>>>             .emitAfter(maxAllowedLateness)
> > > > > > >>>>             .bufferFullStrategy(bufferFullStrategy)
> > > > > > >>>>         );
> > > > > > >>>> }
> > > > > > >>>>
> > > > > > >>>> Then, here's how it would look to use it:
> > > > > > >>>>
> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ...
> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> > > > > > >>>>   windowCounts.suppress(
> > > > > > >>>>     Suppression.finalResultsOnly(
> > > > > > >>>>       Duration.ofMinutes(10)
> > > > > > >>>>       Suppression.BufferFullStrategy.SHUT_DOWN
> > > > > > >>>>     )
> > > > > > >>>>   );
> > > > > > >>>>
> > > > > > >>>> Trying to use it on a non-windowed ktable yields:
> > > > > > >>>>
> > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class
> > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> cannot be
> > > > applied
> > > > > > to
> > > > > > >>>>> given types;
> > > > > > >>>>>   required:
> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> > > > > > >>> Suppression.BufferFullStrategy
> > > > > > >>>>>   found:
> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> > > > > > >>> Suppression.BufferFullStrategy
> > > > > > >>>>>   reason: explicit type argument java.lang.String does not
> > > > conform
> > > > > to
> > > > > > >>>>> declared bound(s) org.apache.kafka.streams.kstream.Windowed
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> ============
> > > > > > >>>> = Downsides =
> > > > > > >>>> ============
> > > > > > >>>>
> > > > > > >>>> Of course, there's a downside either way:
> > > > > > >>>> * for 1:  this "wrapper" interaction would be the first in
> the
> > > > DSL.
> > > > > Is
> > > > > > >> it
> > > > > > >>>> too strange, and how discoverable would it be?
> > > > > > >>>> * for 2: adding those type parameters to Suppression will
> > force
> > > > all
> > > > > > >>>> callers to provide them in the event of a chained
> construction
> > > > > because
> > > > > > >>> Java
> > > > > > >>>> doesn't do RHS recursive type inference. This is already
> > visible
> > > > in
> > > > > > >> other
> > > > > > >>>> parts of the Streams DSL. For example, often calls to
> > > Materialized
> > > > > > >>> builders
> > > > > > >>>> have to provide seemingly obvious type bounds.
> > > > > > >>>>
> > > > > > >>>> ============
> > > > > > >>>> = Conclusion =
> > > > > > >>>> ============
> > > > > > >>>>
> > > > > > >>>> I think option 2 is more "normal" and discoverable. It does
> > > have a
> > > > > > >>>> downside, but it's one that's pre-existing elsewhere in the
> > DSL.
> > > > > > >>>>
> > > > > > >>>> WDYT? Would the addition of this "recipe" method to
> > Suppression
> > > > > > resolve
> > > > > > >>>> your concern?
> > > > > > >>>>
> > > > > > >>>> Thanks again,
> > > > > > >>>> -John
> > > > > > >>>>
> > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <
> > > wangg...@gmail.com
> > > > >
> > > > > > >>> wrote:
> > > > > > >>>>
> > > > > > >>>>> Hi John,
> > > > > > >>>>>
> > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that the
> > > dropped
> > > > > > >>> records
> > > > > > >>>>> due to window retention or emit suppression policies should
> > be
> > > > > > >> recorded
> > > > > > >>>>> differently, and using this KIP's proposed metric would be
> > > fine.
> > > > If
> > > > > > >> you
> > > > > > >>>>> also think we can use this KIP's proposed metrics to cover
> > the
> > > > > window
> > > > > > >>>>> retention cased skipping records, then we can include the
> > > changes
> > > > > in
> > > > > > >>> this
> > > > > > >>>>> KIP as well.
> > > > > > >>>>>
> > > > > > >>>>> Regarding the current proposal, I'm actually not too
> worried
> > > > about
> > > > > > the
> > > > > > >>>>> inconsistency between query semantics and downstream emit
> > > > > semantics.
> > > > > > >> For
> > > > > > >>>>> queries, we will always return the current running results
> of
> > > the
> > > > > > >>> windows,
> > > > > > >>>>> being it partial or final results depending on the window
> > > > retention
> > > > > > >> time
> > > > > > >>>>> anyways, which has nothing to do whether the emitted stream
> > > > should
> > > > > be
> > > > > > >>> one
> > > > > > >>>>> final output per key or not. I also agree that having a
> > unified
> > > > > > >>> operation
> > > > > > >>>>> is generally better for users to focus on leveraging that
> one
> > > > only
> > > > > > >> than
> > > > > > >>>>> learning about two set of operations. The only question I
> had
> > > is,
> > > > > for
> > > > > > >>>>> final
> > > > > > >>>>> updates of window stores, if it is a bit awkward to
> > understand
> > > > the
> > > > > > >>>>> configuration combo. Thinking about this more, I think my
> > root
> > > > > worry
> > > > > > >> in
> > > > > > >>>>> the
> > > > > > >>>>> "suppressLateEvents" call for windowed tables, since from a
> > > user
> > > > > > >>>>> perspective: if my retention time is X which means "pay the
> > > cost
> > > > to
> > > > > > >>> allow
> > > > > > >>>>> late records up to X to still be applied updating the
> > tables",
> > > > why
> > > > > > >>> would I
> > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do not
> > > send
> > > > > the
> > > > > > >>>>> updates up to Y, which means the downstream operator or
> sink
> > > > topic
> > > > > > for
> > > > > > >>>>> this
> > > > > > >>>>> stream would actually see a truncated update stream while
> > I've
> > > > paid
> > > > > > >>> larger
> > > > > > >>>>> cost for that"; and of course, Y > X would not make sense
> > > either
> > > > as
> > > > > > >> you
> > > > > > >>>>> would not see any updates later than X anyways. So in all,
> my
> > > > > feeling
> > > > > > >> is
> > > > > > >>>>> that it makes less sense for windowed table's
> > > > "suppressLateEvents"
> > > > > > >> with
> > > > > > >>> a
> > > > > > >>>>> parameter that is not equal to the window retention, and
> > > opening
> > > > > the
> > > > > > >>> door
> > > > > > >>>>> in the current proposal may confuse people with that.
> > > > > > >>>>>
> > > > > > >>>>> Again, above is just a subjective opinion and probably we
> can
> > > > also
> > > > > > >> bring
> > > > > > >>>>> up
> > > > > > >>>>> some scenarios that users does want to set X != Y.. but
> > > > personally
> > > > > I
> > > > > > >>> feel
> > > > > > >>>>> that even if the semantics for this scenario if intuitive
> for
> > > > user
> > > > > to
> > > > > > >>>>> understand, doe that really make sense and should we really
> > > open
> > > > > the
> > > > > > >>> door
> > > > > > >>>>> for it. So I think maybe separating the final update in a
> > > > separate
> > > > > > >> API's
> > > > > > >>>>> benefits may overwhelm the advantage of having one uniform
> > > > > > definition.
> > > > > > >>> And
> > > > > > >>>>> for my alternative proposal, the rationale was from both my
> > > > concern
> > > > > > >>> about
> > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias'
> > question
> > > > > about
> > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if it is
> > > less
> > > > > > >>>>> meaningful
> > > > > > >>>>> for both, we can consider removing it completely and only
> do
> > > > > > >>>>> "IntermediateSuppression" in Suppress instead.
> > > > > > >>>>>
> > > > > > >>>>> So I'd summarize my thoughts in the following questions:
> > > > > > >>>>>
> > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X (window
> > > > > retention
> > > > > > >>> time)
> > > > > > >>>>> for windowed stores make sense in practice?
> > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for
> > > > non-windowed
> > > > > > >>> stores
> > > > > > >>>>> make sense in practice?
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> Guozhang
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <
> > > bbej...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>>>>
> > > > > > >>>>>> Thanks for the explanation, that does make sense.  I have
> > some
> > > > > > >>>>> questions on
> > > > > > >>>>>> operations, but I'll just wait for the PR and tests.
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks,
> > > > > > >>>>>> Bill
> > > > > > >>>>>>
> > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler <
> > > j...@confluent.io
> > > > >
> > > > > > >>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hi Bill,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks for the review!
> > > > > > >>>>>>>
> > > > > > >>>>>>> Your question is very much applicable to the KIP and not
> at
> > > all
> > > > > an
> > > > > > >>>>>>> implementation detail. Thanks for bringing it up.
> > > > > > >>>>>>>
> > > > > > >>>>>>> I'm proposing not to change the existing caches and
> > > > > configurations
> > > > > > >>> at
> > > > > > >>>>> all
> > > > > > >>>>>>> (for now).
> > > > > > >>>>>>>
> > > > > > >>>>>>> Imagine you have a topology like this:
> > > > > > >>>>>>> commit.interval.ms = 100
> > > > > > >>>>>>>
> > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200)
> > > > > > >>>>>>>
> > > > > > >>>>>>> The first ktable (ktable1) will respect the commit
> interval
> > > and
> > > > > > >>> buffer
> > > > > > >>>>>>> events for 100ms before logging, storing, or forwarding
> > them
> > > > > > >> (IIRC).
> > > > > > >>>>>>> Therefore, the second ktable (suppress) will only see the
> > > > events
> > > > > > >> at
> > > > > > >>> a
> > > > > > >>>>>> rate
> > > > > > >>>>>>> of once per 100ms. It will apply its own buffering, and
> > emit
> > > > once
> > > > > > >>> per
> > > > > > >>>>>> 200ms
> > > > > > >>>>>>> This case is pretty trivial because the suppress time is
> a
> > > > > > >> multiple
> > > > > > >>> of
> > > > > > >>>>>> the
> > > > > > >>>>>>> commit interval.
> > > > > > >>>>>>>
> > > > > > >>>>>>> When it's not an integer multiple, you'll get behavior
> like
> > > in
> > > > > > >> this
> > > > > > >>>>>> marble
> > > > > > >>>>>>> diagram:
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
> > > > > > >>>>>>>
> > > > > > >>>>>>> [ KTable caching with commit interval = 2 ]
> > > > > > >>>>>>>
> > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)->
> > > > > > >>>>>>>
> > > > > > >>>>>>>       [ suppress with emitAfter = 3 ]
> > > > > > >>>>>>>
> > > > > > >>>>>>> <---------------(k:2)----------------(k:6)->
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> If this behavior isn't desired (for example, if you
> wanted
> > to
> > > > > emit
> > > > > > >>>>> (k:3)
> > > > > > >>>>>> at
> > > > > > >>>>>>> time 3, I'd recommend setting the
> > "cache.max.bytes.buffering"
> > > > to
> > > > > 0
> > > > > > >>> or
> > > > > > >>>>>>> modifying the topology to disable caching. Then, the
> > behavior
> > > > is
> > > > > > >>> more
> > > > > > >>>>>>> simply determined just by the suppress operator.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Does that seem right to you?
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> Regarding the changelogs, because the suppression
> operator
> > > > hangs
> > > > > > >>> onto
> > > > > > >>>>>>> events for a while, it will need its own changelog. The
> > > > changelog
> > > > > > >>>>>>> should represent the current state of the buffer at all
> > > times.
> > > > So
> > > > > > >>> when
> > > > > > >>>>>> the
> > > > > > >>>>>>> suppress operator sees (k:2), for example, it will log
> > (k:2).
> > > > > When
> > > > > > >>> it
> > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2) downstream.
> > > > Because
> > > > > > >> k
> > > > > > >>>>> is no
> > > > > > >>>>>>> longer buffered, the suppress operator will log (k:null).
> > > Thus,
> > > > > > >> when
> > > > > > >>>>>>> recovering,
> > > > > > >>>>>>> it can rebuild the buffer by reading its changelog.
> > > > > > >>>>>>>
> > > > > > >>>>>>> What do you think about this?
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks,
> > > > > > >>>>>>> -John
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <
> > > bbej...@gmail.com
> > > > >
> > > > > > >>>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hi John,  thanks for the KIP.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Early on in the KIP, you mention the current approaches
> > for
> > > > > > >>>>> controlling
> > > > > > >>>>>>> the
> > > > > > >>>>>>>> rate of downstream records from a KTable, cache size
> > > > > > >> configuration
> > > > > > >>>>> and
> > > > > > >>>>>>>> commit time.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Will these configuration parameters still be in effect
> for
> > > > > > >> tables
> > > > > > >>>>> that
> > > > > > >>>>>>>> don't use suppression?  For tables taking advantage of
> > > > > > >>> suppression,
> > > > > > >>>>>> will
> > > > > > >>>>>>>> these configurations have no impact?
> > > > > > >>>>>>>> This last question may be to implementation specific but
> > if
> > > > the
> > > > > > >>>>>> requested
> > > > > > >>>>>>>> suppression time is longer than the specified commit
> time,
> > > > will
> > > > > > >>> the
> > > > > > >>>>>>> latest
> > > > > > >>>>>>>> record in the suppression buffer get stored in a
> > changelog?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thanks,
> > > > > > >>>>>>>> Bill
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler <
> > > > j...@confluent.io
> > > > > > >>>
> > > > > > >>>>>> wrote:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Thanks for the feedback, Matthias,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> It seems like in straightforward relational processing
> > > cases,
> > > > > > >> it
> > > > > > >>>>>> would
> > > > > > >>>>>>>> not
> > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In
> general,
> > it
> > > > > > >>> seems
> > > > > > >>>>>>> better
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>> have "guard rails" in place that make it easier to
> write
> > > > > > >>> sensible
> > > > > > >>>>>>>> programs
> > > > > > >>>>>>>>> than insensible ones.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping it for
> > all
> > > > > > >>>>> KTables
> > > > > > >>>>>> ;)
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> 1. I believe it is simpler to understand the operator
> if
> > it
> > > > > > >> has
> > > > > > >>>>> one
> > > > > > >>>>>>>> uniform
> > > > > > >>>>>>>>> definition, regardless of context. It's well defined
> and
> > > > > > >>> intuitive
> > > > > > >>>>>> what
> > > > > > >>>>>>>>> will happen when you use late-event suppression on a
> > > KTable,
> > > > > > >> so
> > > > > > >>> I
> > > > > > >>>>>> think
> > > > > > >>>>>>>>> nothing surprising or dangerous will happen in that
> case.
> > > > From
> > > > > > >>> my
> > > > > > >>>>>>>>> perspective, having two sets of allowed operations is
> > > > actually
> > > > > > >>> an
> > > > > > >>>>>>>> increase
> > > > > > >>>>>>>>> in cognitive complexity.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this way.
> > For
> > > > > > >>>>> example,
> > > > > > >>>>>> in
> > > > > > >>>>>>>> lieu
> > > > > > >>>>>>>>> of full-featured timestamp semantics, I can implement
> > MVCC
> > > > > > >>>>> behavior
> > > > > > >>>>>>> when
> > > > > > >>>>>>>>> building a KTable by
> "suppressLateEvents(Duration.ZERO)".
> > I
> > > > > > >>>>> suspect
> > > > > > >>>>>>> that
> > > > > > >>>>>>>>> there are other, non-obvious applications of
> suppressing
> > > late
> > > > > > >>>>> events
> > > > > > >>>>>> on
> > > > > > >>>>>>>>> KTables.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> 3. Not to get too much into implementation details in a
> > KIP
> > > > > > >>>>>> discussion,
> > > > > > >>>>>>>> but
> > > > > > >>>>>>>>> if we did want to make late-event suppression available
> > > only
> > > > > > >> on
> > > > > > >>>>>>> windowed
> > > > > > >>>>>>>>> KTables, we have two enforcement options:
> > > > > > >>>>>>>>>   a. check when we build the topology - this would be
> > > simple
> > > > > > >> to
> > > > > > >>>>>>>> implement,
> > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people write
> > tests
> > > > > > >> for
> > > > > > >>>>> their
> > > > > > >>>>>>>>> topology before deploying them, so the feedback loop
> > isn't
> > > > > > >>>>>>> instantaneous,
> > > > > > >>>>>>>>> but it's not too long either.
> > > > > > >>>>>>>>>   b. add a new WindowedKTable type - this would be a
> > > compile
> > > > > > >>> time
> > > > > > >>>>>>> check,
> > > > > > >>>>>>>>> but would also be substantial increase of both
> interface
> > > and
> > > > > > >>> code
> > > > > > >>>>>>>>> complexity.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> We should definitely strive to have guard rails
> > protecting
> > > > > > >>> against
> > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting against
> > > programs
> > > > > > >>>>> that we
> > > > > > >>>>>>>> don't
> > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think we
> can
> > > put
> > > > > > >> up
> > > > > > >>>>>> guard
> > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems like
> the
> > > > > > >>>>> increase in
> > > > > > >>>>>>>>> cognitive (and potentially code and interface)
> complexity
> > > > > > >> makes
> > > > > > >>> me
> > > > > > >>>>>>> think
> > > > > > >>>>>>>> we
> > > > > > >>>>>>>>> should skip this case.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> What do you think?
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks,
> > > > > > >>>>>>>>> -John
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
> > > > > > >>>>>>> matth...@confluent.io>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Thanks for the KIP John.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> One initial comments about the last example "Bounded
> > > > > > >>> lateness":
> > > > > > >>>>>> For a
> > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does not
> > really
> > > > > > >> make
> > > > > > >>>>>> sense,
> > > > > > >>>>>>>>>> does it?
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Thus, I am wondering if we should allow
> > > > > > >> `suppressLateEvents()`
> > > > > > >>>>> for
> > > > > > >>>>>>> this
> > > > > > >>>>>>>>>> case? It seems to be better to only allow it for
> > > > > > >>>>> windowed-KTables.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> -Matthias
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote:
> > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as well.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> What you gave as new example is semantically the same
> > as
> > > > > > >>> what
> > > > > > >>>>> I
> > > > > > >>>>>>>>>> suggested.
> > > > > > >>>>>>>>>>> So it is good by me.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thanks
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <
> > > > > > >>>>> j...@confluent.io
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thanks for taking look, Ted,
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> I agree this is a departure from the conventions of
> > > > > > >> Streams
> > > > > > >>>>> DSL.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Most of our config objects have one or two
> "required"
> > > > > > >>>>>> parameters,
> > > > > > >>>>>>>>> which
> > > > > > >>>>>>>>>> fit
> > > > > > >>>>>>>>>>>> naturally with the static factory method approach.
> > > > > > >>>>> TimeWindow,
> > > > > > >>>>>> for
> > > > > > >>>>>>>>>> example,
> > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally say
> > > > > > >>>>>>>>> TimeWindows.of(size).
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's really
> > no
> > > > > > >>>>> "core"
> > > > > > >>>>>>>>>> parameter,
> > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new
> > > > > > >>>>> Suppression()". I
> > > > > > >>>>>>>> think
> > > > > > >>>>>>>>>> that
> > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, since
> > there
> > > > > > >>> are
> > > > > > >>>>>> many
> > > > > > >>>>>>>>>> durations
> > > > > > >>>>>>>>>>>> that we can configure.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose that I
> can
> > > > > > >> give
> > > > > > >>>>> each
> > > > > > >>>>>>>>>>>> configuration method a static version, which would
> let
> > > > > > >> you
> > > > > > >>>>>> replace
> > > > > > >>>>>>>>> "new
> > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the
> > examples.
> > > > > > >>>>>>> Basically,
> > > > > > >>>>>>>>>> instead
> > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I listed.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> For example:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> windowCounts
> > > > > > >>>>>>>>>>>>     .suppress(
> > > > > > >>>>>>>>>>>>         Suppression
> > > > > > >>>>>>>>>>>>             .suppressLateEvents(Duration.
> > ofMinutes(10))
> > > > > > >>>>>>>>>>>>             .suppressIntermediateEvents(
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>  IntermediateSuppression.emitAfter(Duration.ofMinutes(
> > 10))
> > > > > > >>>>>>>>>>>>             )
> > > > > > >>>>>>>>>>>>     );
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Does that seem better?
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thanks,
> > > > > > >>>>>>>>>>>> -John
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <
> > > > > > >>> yuzhih...@gmail.com
> > > > > > >>>>>>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a lot of
> > > > > > >>>>> materials.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> One suggestion:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>     .suppress(
> > > > > > >>>>>>>>>>>>>         new Suppression()
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Do you think it would be more consistent with the
> > rest
> > > > > > >> of
> > > > > > >>>>>> Streams
> > > > > > >>>>>>>>> data
> > > > > > >>>>>>>>>>>>> structures by supporting `of` ?
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10))
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Cheers
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <
> > > > > > >>>>>> j...@confluent.io
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>>> wrote:
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Hello devs and users,
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Please take some time to consider this proposal
> for
> > > > > > >> Kafka
> > > > > > >>>>>>> Streams:
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for KTables
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> link:
> https://cwiki.apache.org/confluence/x/sQU0BQ
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> The basic idea is to provide:
> > > > > > >>>>>>>>>>>>>> * more usable control over update rate (vs the
> > current
> > > > > > >>>>> state
> > > > > > >>>>>>> store
> > > > > > >>>>>>>>>>>>> caches)
> > > > > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations
> > feature
> > > > > > >>> which
> > > > > > >>>>>>> several
> > > > > > >>>>>>>>>>>> people
> > > > > > >>>>>>>>>>>>>> have requested
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> I look forward to your feedback!
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>> Thanks,
> > > > > > >>>>>>>>>>>>>> -John
> > > > > > >>>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> --
> > > > > > >>>>> -- Guozhang
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> --
> > > > > > >> -- Guozhang
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to