Hi Bill,

Thanks for the review!

Your question is very much applicable to the KIP and not at all an
implementation detail. Thanks for bringing it up.

I'm proposing not to change the existing caches and configurations at all
(for now).

Imagine you have a topology like this:
commit.interval.ms = 100

(ktable1 (cached)) -> (suppress emitAfter 200)

The first ktable (ktable1) will respect the commit interval and buffer
events for 100ms before logging, storing, or forwarding them (IIRC).
Therefore, the second ktable (suppress) will only see the events at a rate
of once per 100ms. It will apply its own buffering, and emit once per 200ms
This case is pretty trivial because the suppress time is a multiple of the
commit interval.

When it's not an integer multiple, you'll get behavior like in this marble
diagram:


<-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->

[ KTable caching with commit interval = 2 ]

<--------(k:2)---------(k:4)---------(k:6)->

      [ suppress with emitAfter = 3 ]

<---------------(k:2)----------------(k:6)->


If this behavior isn't desired (for example, if you wanted to emit (k:3) at
time 3, I'd recommend setting the "cache.max.bytes.buffering" to 0 or
modifying the topology to disable caching. Then, the behavior is more
simply determined just by the suppress operator.

Does that seem right to you?


Regarding the changelogs, because the suppression operator hangs onto
events for a while, it will need its own changelog. The changelog
should represent the current state of the buffer at all times. So when the
suppress operator sees (k:2), for example, it will log (k:2). When it
later gets to time 3, it's time to emit (k:2) downstream. Because k is no
longer buffered, the suppress operator will log (k:null). Thus, when
recovering,
it can rebuild the buffer by reading its changelog.

What do you think about this?

Thanks,
-John



On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bbej...@gmail.com> wrote:

> Hi John,  thanks for the KIP.
>
> Early on in the KIP, you mention the current approaches for controlling the
> rate of downstream records from a KTable, cache size configuration and
> commit time.
>
> Will these configuration parameters still be in effect for tables that
> don't use suppression?  For tables taking advantage of suppression, will
> these configurations have no impact?
> This last question may be to implementation specific but if the requested
> suppression time is longer than the specified commit time, will the latest
> record in the suppression buffer get stored in a changelog?
>
> Thanks,
> Bill
>
> On Wed, Jun 27, 2018 at 3:04 PM John Roesler <j...@confluent.io> wrote:
>
> > Thanks for the feedback, Matthias,
> >
> > It seems like in straightforward relational processing cases, it would
> not
> > make sense to bound the lateness of KTables. In general, it seems better
> to
> > have "guard rails" in place that make it easier to write sensible
> programs
> > than insensible ones.
> >
> > But I'm still going to argue in favor of keeping it for all KTables ;)
> >
> > 1. I believe it is simpler to understand the operator if it has one
> uniform
> > definition, regardless of context. It's well defined and intuitive what
> > will happen when you use late-event suppression on a KTable, so I think
> > nothing surprising or dangerous will happen in that case. From my
> > perspective, having two sets of allowed operations is actually an
> increase
> > in cognitive complexity.
> >
> > 2. To me, it's not crazy to use the operator this way. For example, in
> lieu
> > of full-featured timestamp semantics, I can implement MVCC behavior when
> > building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect that
> > there are other, non-obvious applications of suppressing late events on
> > KTables.
> >
> > 3. Not to get too much into implementation details in a KIP discussion,
> but
> > if we did want to make late-event suppression available only on windowed
> > KTables, we have two enforcement options:
> >   a. check when we build the topology - this would be simple to
> implement,
> > but would be a runtime check. Hopefully, people write tests for their
> > topology before deploying them, so the feedback loop isn't instantaneous,
> > but it's not too long either.
> >   b. add a new WindowedKTable type - this would be a compile time check,
> > but would also be substantial increase of both interface and code
> > complexity.
> >
> > We should definitely strive to have guard rails protecting against
> > surprising or dangerous behavior. Protecting against programs that we
> don't
> > currently predict is a lesser benefit, and I think we can put up guard
> > rails on a case-by-case basis for that. It seems like the increase in
> > cognitive (and potentially code and interface) complexity makes me think
> we
> > should skip this case.
> >
> > What do you think?
> >
> > Thanks,
> > -John
> >
> > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for the KIP John.
> > >
> > > One initial comments about the last example "Bounded lateness": For a
> > > non-windowed KTable bounding the lateness does not really make sense,
> > > does it?
> > >
> > > Thus, I am wondering if we should allow `suppressLateEvents()` for this
> > > case? It seems to be better to only allow it for windowed-KTables.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/27/18 8:53 AM, Ted Yu wrote:
> > > > I noticed this (lack of primary parameter) as well.
> > > >
> > > > What you gave as new example is semantically the same as what I
> > > suggested.
> > > > So it is good by me.
> > > >
> > > > Thanks
> > > >
> > > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <j...@confluent.io>
> > wrote:
> > > >
> > > >> Thanks for taking look, Ted,
> > > >>
> > > >> I agree this is a departure from the conventions of Streams DSL.
> > > >>
> > > >> Most of our config objects have one or two "required" parameters,
> > which
> > > fit
> > > >> naturally with the static factory method approach. TimeWindow, for
> > > example,
> > > >> requires a size parameter, so we can naturally say
> > TimeWindows.of(size).
> > > >>
> > > >> I think in the case of a suppression, there's really no "core"
> > > parameter,
> > > >> and "Suppression.of()" seems sillier than "new Suppression()". I
> think
> > > that
> > > >> Suppression.of(duration) would be ambiguous, since there are many
> > > durations
> > > >> that we can configure.
> > > >>
> > > >> However, thinking about it again, I suppose that I can give each
> > > >> configuration method a static version, which would let you replace
> > "new
> > > >> Suppression()." with "Suppression." in all the examples. Basically,
> > > instead
> > > >> of "of()", we'd support any of the methods I listed.
> > > >>
> > > >> For example:
> > > >>
> > > >> windowCounts
> > > >>     .suppress(
> > > >>         Suppression
> > > >>             .suppressLateEvents(Duration.ofMinutes(10))
> > > >>             .suppressIntermediateEvents(
> > > >>
> > >  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > > >>             )
> > > >>     );
> > > >>
> > > >>
> > > >> Does that seem better?
> > > >>
> > > >> Thanks,
> > > >> -John
> > > >>
> > > >>
> > > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <yuzhih...@gmail.com>
> wrote:
> > > >>
> > > >>> I started to read this KIP which contains a lot of materials.
> > > >>>
> > > >>> One suggestion:
> > > >>>
> > > >>>     .suppress(
> > > >>>         new Suppression()
> > > >>>
> > > >>>
> > > >>> Do you think it would be more consistent with the rest of Streams
> > data
> > > >>> structures by supporting `of` ?
> > > >>>
> > > >>> Suppression.of(Duration.ofMinutes(10))
> > > >>>
> > > >>>
> > > >>> Cheers
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <j...@confluent.io>
> > > wrote:
> > > >>>
> > > >>>> Hello devs and users,
> > > >>>>
> > > >>>> Please take some time to consider this proposal for Kafka Streams:
> > > >>>>
> > > >>>> KIP-328: Ability to suppress updates for KTables
> > > >>>>
> > > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > > >>>>
> > > >>>> The basic idea is to provide:
> > > >>>> * more usable control over update rate (vs the current state store
> > > >>> caches)
> > > >>>> * the final-result-for-windowed-computations feature which several
> > > >> people
> > > >>>> have requested
> > > >>>>
> > > >>>> I look forward to your feedback!
> > > >>>>
> > > >>>> Thanks,
> > > >>>> -John
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to