Thank you Matthias for the use cases!

Looking at both use cases, I think you need to elaborate on them in
the KIP, Richard.

Emit from plain KTable:
I agree with Matthias that the lower timestamp makes sense because it
marks the start of the validity of the record. Idempotent records with
a higher timestamp can be safely ignored. A corner case that I
discussed with Matthias offline is when we do not materialize a KTable
due to optimization. Then we cannot avoid the idempotent records
because we do not keep the first record with the lower timestamp to
compare to.

Emit from KTable with aggregations:
If we specify that an aggregation result should have the highest
timestamp of the records that participated in the aggregation, we
cannot ignore any idempotent records. Admittedly, the result of an
aggregation usually changes, but there are aggregations where the
result may not change like min and max, or sum when the incoming
records have a value of zero. In those cases, we could benefit of the
emit on change, but only if we define the semantics of the
aggregations to not use the highest timestamp of the participating
records for the result. In Kafka Streams, we do not have min, max, and
sum as explicit aggregations, but we need to provide an API to define
what timestamp should be used for the result of an aggregation if we
want to go down this path.

All of this does not block this KIP and I just wanted to put this
aspects up for discussion. The KIP can limit itself to emit from
materialized KTables. However, the limits should be explicitly stated
in the KIP.

Best,
Bruno



On Fri, Jan 24, 2020 at 10:58 AM Matthias J. Sax <matth...@confluent.io> wrote:
>
> IMHO, the question about semantics depends on the use case, in
> particular on the origin of a KTable.
>
> If there is a changlog topic that one reads directly into a KTable,
> emit-on-change does actually make sense, because the timestamp indicates
> _when_ the update was _effective_. For this case, it is semantically
> sound to _not_ update the timestamp in the store, because the second
> update is actually idempotent and advancing the timestamp is not ideal
> (one could even consider it to be wrong to advance the timestamp)
> because the "valid time" of the record pair did not change.
>
> This reasoning also applies to KTable-KTable joins.
>
> However, if the KTable is the result of an aggregation, I think
> emit-on-update is more natural, because the timestamp reflects the
> _last_ time (ie, highest timestamp) of all input records the contributed
> to the result. Hence, updating the timestamp and emitting a new record
> actually sounds correct to me. This applies to windowed and non-windowed
> aggregations IMHO.
>
> However, considering the argument that the timestamp should not be
> update in the first case in the store to begin with, both cases are
> actually the same, and both can be modeled as emit-on-change: if a
> `table()` operator does not update the timestamp if the value does not
> change, there is _no_ change and thus nothing is emitted. At the same
> time, if an aggregation operator does update the timestamp (even if the
> value does not change) there _is_ a change and we emit.
>
> Note that handling out-of-order data for aggregations would also work
> seamlessly with this approach -- for out-of-order records, the timestamp
> does never change, and thus, we only emit if the result itself changes.
>
> Therefore, I would argue that we might not even need any config, because
> the emit-on-change behavior is just correct and reduced the downstream
> load, while our current behavior is not ideal (even if it's also correct).
>
> Thoughts?
>
> -Matthias
>
> On 1/24/20 9:37 AM, John Roesler wrote:
> > Hi Bruno,
> >
> > Thanks for that idea. I hadn't considered that
> > option before, and it does seem like that would be
> > the right place to put it if we think it might be
> > semantically important to control on a
> > table-by-table basis.
> >
> > I had been thinking of it less semantically and
> > more practically. In the context of a large
> > topology, or more generally, a large software
> > system that contains many topologies and other
> > event-driven systems, each no-op result becomes an
> > input that is destined to itself become a no-op
> > result, and so on, all the way through the system.
> > Thus, a single pointless processing result becomes
> > amplified into a large number of pointless
> > computations, cache perturbations, and network
> > and disk I/O operations. If you also consider
> > operations with fan-out implications, like
> > branching or foreign-key joins, the wasted
> > resources are amplified not just in proportion to
> > the size of the system, but the size of the system
> > times the average fan-out (to the power of the
> > number of fan-out operations on the path(s)
> > through the system).
> >
> > In my time operating such systems, I've observed
> > these effects to be very real, and actually, the
> > system and use case doesn't have to be very large
> > before the amplification poses an existential
> > threat to the system as a whole.
> >
> > This is the basis of my advocating for a simple
> > behavior change, rather than an opt-in config of
> > any kind. It seems like Streams should "do the
> > right thing" for the majority use case. My theory
> > (which may be wrong) is that the majority use case
> > is more like "relational queries" than "CEP
> > queries". Even if you were doing some
> > event-sensitive computation, wouldn't you do them
> > as Stream operations (where this feature is
> > inapplicable anyway)?
> >
> > In keeping with the "practical" perspective, I
> > suggested the opt-out config only in the (I think
> > unlikely) event that filtering out pointless
> > updates actually harms performance. I'd also be
> > perfectly fine without the opt-out config. I
> > really think that (because of the timestamp
> > semantics work already underway), we're already
> > pre-fetching the prior result most of the time, so
> > there would actually be very little extra I/O
> > involved in implementing emit-on-change.
> >
> > However, we should consider whether my experience
> > is likely to be general. Do you have some use
> > case in mind for which you'd actually want some
> > KTable results to be emit-on-update for semantic
> > reasons?
> >
> > Thanks,
> > -John
> >
> >
> > On Fri, Jan 24, 2020, at 11:02, Bruno Cadonna wrote:
> >> Hi Richard,
> >>
> >> Thank you for the KIP.
> >>
> >> I agree with John that we should focus on the interface and behavior
> >> change in a KIP. We can discuss the implementation later.
> >>
> >> I am also +1 for the survey.
> >>
> >> I had a thought about this. Couldn't we consider emit-on-change to be
> >> one config of suppress (like `untilWindowCloses`)? What you basically
> >> propose is to suppress updates if they do not change the result.
> >> Considering emit on change as a flavour of suppress would be more
> >> flexible because it would specify the behavior locally for a KTable
> >> instead of globally for all KTables. Additionally, specifying the
> >> behavior in one place instead of multiple places feels more intuitive
> >> and consistent to me.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Fri, Jan 24, 2020 at 7:49 AM John Roesler <vvcep...@apache.org> wrote:
> >>>
> >>> Hi Richard,
> >>>
> >>> Thanks for picking this up! I know of at least one large community member
> >>> for which this feature is absolutely essential.
> >>>
> >>> If I understand your two options, it seems like the proposal is to 
> >>> implement
> >>> it as a behavior change regardless, and the question is whether to provide
> >>> an opt-out config or not.
> >>>
> >>> Given that any implementation of this feature would have some performance
> >>> impact under some workloads, and also that we don't know if anyone really
> >>> depends on emit-on-update time semantics, it seems like we should propose
> >>> to add an opt-out config. Can you update the KIP to mention the exact
> >>> config key and value(s) you'd propose?
> >>>
> >>> Just to move the discussion forward, maybe something like:
> >>>     emit.on := change|update
> >>> with the new default being "change"
> >>>
> >>> Thanks for pointing out the timestamp issue in particular. I agree that if
> >>> we discard the latter update as a no-op, then we also have to discard its
> >>> timestamp (obviously, we don't forward the timestamp update, as that's
> >>> the whole point, but we also can't update the timestamp in the store, as
> >>> the store must remain consistent with what has been emitted).
> >>>
> >>> I have to confess that I disagree with your implementation proposal, but
> >>> it's also not necessary to discuss implementation in the KIP. Maybe it 
> >>> would
> >>> be less controversial if you just drop that section for now, so that the 
> >>> KIP
> >>> discussion can focus on the behavior change and config.
> >>>
> >>> Just for reference, there is some research into this domain. For example,
> >>> see the "Report" section (3.2.3) of the SECRET paper:
> >>> http://people.csail.mit.edu/tatbul/publications/maxstream_vldb10.pdf
> >>>
> >>> It might help to round out the proposal if you take a brief survey of the
> >>> behaviors of other systems, along with pros and cons if any are reported.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>>
> >>> On Fri, Jan 10, 2020, at 22:27, Richard Yu wrote:
> >>>> Hi everybody!
> >>>>
> >>>> I'd like to propose a change that we probably should've added for a long
> >>>> time now.
> >>>>
> >>>> The key benefit of this KIP would be reduced traffic in Kafka Streams 
> >>>> since
> >>>> a lot of no-op results would no longer be sent downstream.
> >>>> Here is the KIP for reference.
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
> >>>>
> >>>> Currently, I seek to formalize our approach for this KIP first before we
> >>>> determine concrete API additions / configurations.
> >>>> Some configs might warrant adding, whiles others are not necessary since
> >>>> adding them would only increase complexity of Kafka Streams.
> >>>>
> >>>> Cheers,
> >>>> Richard
> >>>>
> >>
>

Reply via email to