Hi Matthias, Oh, I see. Next time, I will take that into account. It looked like at the time there wasn't much contention over the major points of the proposal, so I thought I could pass it.
I will also make some last modifications to the KIP. Thanks for your vote! Best, Richard On Sat, Mar 7, 2020 at 1:00 PM Matthias J. Sax <mj...@apache.org> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Richard, > > you cannot close a KIP as accepted with 2 binging votes. (cf > https://cwiki.apache.org/confluence/display/KAFKA/Bylaws) > > You could only discard the KIP as long as it's not accepted :D > > However, I am +1 (binding) and thus you can close the VOTE as accepted. > > > Just a three minor follow up comments: > > (1) In "Reporting Strategies" you mention in point (2) "Emit on update > / non-empty content" -- I am not sure what "empty content" would be. > This is a little bit confusing. Maybe just remove it? > > > (2) "Design Reasoning" > > > we have decided that we will forward aggregation results if and > > only if the timestamp and the value had not changed > > This sounds incorrect. If both value and timestamp have not changed, > we would skip the update from my understanding? > > Ie, to phrase is differently: for a table-operation we only consider > the value to make a comparison and if the value does not change, we > don't emit anything (even if the timestamp changed). > > For windowed aggregations however, even if the value does not change, > but the timestamp advances, we emit, ie, a changing timestamp is not > considered idempotent for this case. (Note, that the timestamp can > never go backward for this case, because it's computed as maximum over > all input record for the window). > > > (3) The discussion about stream time is very interesting. I agree that > it's an orthogonal concern to this KIP. > > > > - -Matthias > > > On 3/6/20 1:52 PM, Richard Yu wrote: > > Hi all, > > > > I have decided to pass this KIP with 2 binding votes and 3 > > non-binding votes (including mine). I will update KIP status > > shortly after this. > > > > Best, Richard > > > > On Thu, Mar 5, 2020 at 3:45 PM Richard Yu > > <yohan.richard...@gmail.com> wrote: > > > >> Hi all, > >> > >> Just polling for some last changes on the name. I think that > >> since there doesn't seem to be much objection to any major > >> changes in the KIP, I will pass it this Friday. > >> > >> If you feel that we still need some more discussion, please let > >> me know. :) > >> > >> Best, Richard > >> > >> P.S. Will start working on a PR for this one soon. > >> > >> On Wed, Mar 4, 2020 at 1:30 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > >>> Regarding the metric name, I was actually trying to be > >>> consistent with the node-level `suppression-emit` as I feel > >>> this one's characteristics is closer to that. I other folks > >>> feels better to align with the task-level "dropped-records" I > >>> think I can be convinced too. > >>> > >>> > >>> Guozhang > >>> > >>> On Wed, Mar 4, 2020 at 12:09 AM Bruno Cadonna > >>> <br...@confluent.io> wrote: > >>> > >>>> Hi all, > >>>> > >>>> may I make a non-binding proposal for the metric name? I > >>>> would prefer "skipped-idempotent-updates" to be consistent > >>>> with the "dropped-records". > >>>> > >>>> Best, Bruno > >>>> > >>>> On Tue, Mar 3, 2020 at 11:57 PM Richard Yu > >>>> <yohan.richard...@gmail.com> wrote: > >>>>> > >>>>> Hi all, > >>>>> > >>>>> Thanks for the discussion! > >>>>> > >>>>> @Guozhang, I will make the corresponding changes to the KIP > >>>>> (i.e. > >>>> renaming > >>>>> the sensor and adding some notes). With the current state > >>>>> of things, we are very close. Just need that > >>> one > >>>>> last binding vote. > >>>>> > >>>>> @Matthias J. Sax <matth...@confluent.io> It would be ideal > >>>>> if we can > >>>> also > >>>>> get your last two cents on this as well. Other than that, > >>>>> we are good. > >>>>> > >>>>> Best, Richard > >>>>> > >>>>> > >>>>> On Tue, Mar 3, 2020 at 10:46 AM Guozhang Wang > >>>>> <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hi Bruno, John: > >>>>>> > >>>>>> 1) That makes sense. If we consider them to be > >>>>>> node-specific metrics > >>>> that > >>>>>> only applies to a subset of built-in processor nodes that > >>>>>> are > >>>> irrelevant to > >>>>>> alert-relevant metrics (just like suppression-emit (rate > >>>>>> | total)), > >>>> they'd > >>>>>> better be per-node instead of per-task and we would not > >>>>>> associate > >>> such > >>>>>> events with warning. With that in mind, I'd suggest we > >>>>>> consider > >>>> renaming > >>>>>> the metric without the `dropped` keyword to distinguish > >>>>>> it with the per-task level sensor. How about > >>>>>> "idempotent-update-skip (rate | > >>>> total)"? > >>>>>> > >>>>>> Also a minor suggestion: we should clarify in the KIP / > >>>>>> javadocs > >>> which > >>>>>> built-in processor nodes would have this metric while > >>>>>> others don't. > >>>>>> > >>>>>> 2) About stream time tracking, there are multiple known > >>>>>> issues that > >>> we > >>>>>> should close to improve our consistency semantics: > >>>>>> > >>>>>> a. preserve stream time of active tasks across rebalances > >>>>>> where > >>> they > >>>> may > >>>>>> be migrated. This is what KAFKA-9368 > >>>>>> <https://issues.apache.org/jira/browse/KAFKA-9368> meant > >>>>>> for. b. preserve stream time of standby tasks to be > >>>>>> aligned with the > >>> active > >>>>>> tasks, via the changelog topics. > >>>>>> > >>>>>> And what I'm more concerning is b) here. For example: > >>>>>> let's say we > >>>> have a > >>>>>> topology of `source -> A -> repartition -> B` where both > >>>>>> A and B > >>> have > >>>>>> states along with changelogs, and both of them have > >>>>>> standbys. If a > >>>> record > >>>>>> is piped from the source and completed traversed through > >>>>>> the > >>> topology, > >>>> we > >>>>>> need to make sure that the stream time inferred across: > >>>>>> > >>>>>> * active task A (inferred from the source record), * > >>>>>> active task B (inferred from the derived record from > >>>>>> repartition > >>>> topic), > >>>>>> * standby task A (inferred from the changelog topic of > >>>>>> A's store), * standby task B (inferred from the changelog > >>>>>> topic of B's store) > >>>>>> > >>>>>> are consistent (note I'm not saying they should be > >>>>>> "exactly the > >>> same", > >>>> but > >>>>>> consistent, meaning that they may have different values > >>>>>> but as long > >>> as > >>>> that > >>>>>> does not impact the time-based queries, it is fine). The > >>>>>> main > >>>> motivation is > >>>>>> that on IQ, where both active and standby tasks could be > >>>>>> accessed, > >>> we > >>>> can > >>>>>> eventually improve our consistency guarantee to have 1) > >>>> read-your-write, 2) > >>>>>> consistency across stores, etc. > >>>>>> > >>>>>> I agree with John's assessment in the previous email, and > >>>>>> just to > >>>> clarify > >>>>>> more concretely what I'm thinking. > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Tue, Mar 3, 2020 at 9:03 AM John Roesler > >>>>>> <vvcep...@apache.org> > >>>> wrote: > >>>>>> > >>>>>>> Thanks, Guozhang and Bruno! > >>>>>>> > >>>>>>> 2) I had a similar though to both of you about the > >>>>>>> metrics, but I > >>>> ultimately > >>>>>>> came out with a conclusion like Bruno's. These aren't > >>>>>>> dropped > >>> invalid > >>>>>>> records, they're intentionally dropped, valid, but > >>>>>>> unnecessary, > >>>> updates. > >>>>>>> A "warning" for this case definitely seems wrong, and > >>>>>>> I'd also not recommend counting these events along with > >>>>>>> "dropped-records", because those > >>> are > >>>>>>> all dropped invalid records, e.g., late or null-keyed > >>>>>>> or couldn't > >>> be > >>>>>>> deserialized. > >>>>>>> > >>>>>>> Like Bruno pointed out, an operator should be concerned > >>>>>>> to see non-zero "dropped-records", and would then > >>>>>>> consult the logs for > >>>> warnings. > >>>>>>> But that same person should be happy to see > >>>> "dropped-idempotent-updates" > >>>>>>> increasing, since it means they're saving time and > >>>>>>> money. Maybe > >>> the > >>>> name > >>>>>>> of the metric could be different, but I couldn't think > >>>>>>> of a better > >>>> one. > >>>>>>> OTOH, maybe it just stands out to us because we > >>>>>>> recently discussed those > >>>> other > >>>>>>> metrics in KIP-444? > >>>>>>> > >>>>>>> 1) Maybe we should discuss this point more. It seems > >>>>>>> like we should > >>>> maintain > >>>>>>> an invariant that the following three objects always > >>>>>>> have exactly > >>> the > >>>>>> same > >>>>>>> state (modulo flush boundaries): 1. The internal state > >>>>>>> store 2. The changelog 3. The operation's result view > >>>>>>> > >>>>>>> That is, if I have a materialized Filter, then it seems > >>>>>>> like I > >>> _must_ > >>>>>> store > >>>>>>> exactly the same record in the store and the changelog, > >>>>>>> and also > >>>> forward > >>>>>>> the exact same record, including the timestamp, to the > >>>>>>> downstream operations. > >>>>>>> > >>>>>>> If we store something different in the internal state > >>>>>>> store than > >>> the > >>>>>>> changelog, we can get a situation where the state is > >>>>>>> actually > >>>> different > >>>>>>> after restoration than it is during processing, and > >>>>>>> queries against > >>>> standbys > >>>>>>> would return different results than queries against the > >>>>>>> active tasks. > >>>>>>> > >>>>>>> Regarding storing something different in the > >>>>>>> store+changelog than > >>> we > >>>>>>> forward downstream, consider the following topology: > >>>>>>> sourceTable .filter(someFilter, Materialized.as("f1")) > >>>>>>> .filter(_ -> true, Materialized.as("f2")) > >>>>>>> > >>>>>>> If we didn't forward exactly the same data we store, > >>>>>>> then > >>> querying f2 > >>>>>>> would return different results than querying f1, which > >>>>>>> is clearly > >>> not > >>>>>>> correct, given the topology. > >>>>>>> > >>>>>>> It seems like maybe what you have in mind is the > >>>>>>> preservation of > >>>> stream > >>>>>>> time across restart/rebalance? This bug is still open, > >>>>>>> actually: > >>>>>>> https://issues.apache.org/jira/browse/KAFKA-9368 It > >>>>>>> seems like solving that bug would be independent of > >>>>>>> KIP-557. > >>> I.e., > >>>>>>> KIP-557 neither makes that bug worse or better. > >>>>>>> > >>>>>>> One other thought I had is maybe you were thinking that > >>>>>>> operators would update their internally tracked stream > >>>>>>> time, but still > >>> discard > >>>>>>> records? I think that _would_ be a bug. That is, if a > >>>>>>> record gets > >>>>>> discarded > >>>>>>> as idempotent, it should have no effect at all on the > >>>>>>> state of the application. Reflecting on my prior > >>>>>>> analysis of stream time, most of the cases > >>>> where > >>>>>> we > >>>>>>> track stream time is in Stream aggregations, and in > >>>>>>> those cases, > >>> if > >>>> an > >>>>>>> incoming record's timestamp is higher than the previous > >>>>>>> stream > >>> time, > >>>> it > >>>>>>> would already not be considered idempotent. So we would > >>>>>>> store, > >>> log, > >>>> and > >>>>>>> forward the result with the new timestamp. The only > >>>>>>> other case is Suppress. With respect to idempotence, > >>>> Suppress is > >>>>>>> equivalent to a stateless no-op transformation. All it > >>>>>>> does is > >>>> collect > >>>>>> and > >>>>>>> delay updates. It has no memory of what it previously > >>>>>>> emitted, so it > >>>> wouldn't > >>>>>>> be possible for it to check for idempotence anyway. > >>>>>>> > >>>>>>> Was that what you were thinking? Thanks, -John > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Mar 3, 2020, at 02:34, Bruno Cadonna wrote: > >>>>>>>> Hi Guozhang, > >>>>>>>> > >>>>>>>> I also had the same thought about using the existing > >>>> "dropped-records" > >>>>>>>> metrics. However, I think in this case it would be > >>>>>>>> better to > >>> use a > >>>> new > >>>>>>>> metric because dropped idempotent updates is an > >>>>>>>> optimization, > >>> they > >>>> do > >>>>>>>> not represent missed records. The dropped idempotent > >>>>>>>> updates in general do not change the result and so do > >>>>>>>> not need a warn log message. Whereas dropped records > >>>>>>>> due to expired windows, > >>>> serialization > >>>>>>>> errors, or lateness might be something concerning > >>>>>>>> that need a > >>> warn > >>>> log > >>>>>>>> message. > >>>>>>>> > >>>>>>>> Looking at the metrics, you would be happy to see > >>>>>>>> "dropped-idempotent-updates" increase, because that > >>>>>>>> means > >>> Streams > >>>> gets > >>>>>>>> rid of no-ops downstream, but you would be concerned > >>>>>>>> if "dropped-records" would increase, because that > >>>>>>>> means your > >>> records > >>>> or > >>>>>>>> the configuration of your app has issues. The > >>>>>>>> "dropped-idempotent-updates" metric could also be an > >>>>>>>> indication > >>>> that > >>>>>>>> you could further optimize your setup, by getting rid > >>>>>>>> of > >>> idempotent > >>>>>>>> updates further upstream. > >>>>>>>> > >>>>>>>> Best, Bruno > >>>>>>>> > >>>>>>>> On Tue, Mar 3, 2020 at 7:58 AM Guozhang Wang < > >>> wangg...@gmail.com> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hello Richard, > >>>>>>>>> > >>>>>>>>> Thanks for the KIP. I once reviewed it and was > >>>>>>>>> concerned about > >>>> its > >>>>>>> effects > >>>>>>>>> on stream time advancing. After reading the updated > >>>>>>>>> KIP I > >>> think > >>>> it > >>>>>> has > >>>>>>>>> answered a lot of them already. > >>>>>>>>> > >>>>>>>>> I have a couple minor comments still, otherwise I'm > >>>>>>>>> +1: > >>>>>>>>> > >>>>>>>>> 1) I want to clarify that for operations resulted > >>>>>>>>> in KTables > >>> (not > >>>>>> only > >>>>>>>>> aggregations, but consider KTable#filter that may > >>>>>>>>> also result > >>> in > >>>> a > >>>>>> new > >>>>>>>>> KTable), even if we drop emissions to the > >>>>>>>>> downstream topics we > >>>> would > >>>>>>> still > >>>>>>>>> append to the corresponding changelog if timestamp > >>>>>>>>> has > >>> changed. > >>>> This > >>>>>> is > >>>>>>>>> because the timestamps on the changelog is read by > >>>>>>>>> the standby > >>>> tasks > >>>>>>> which > >>>>>>>>> relies on them to infer its own stream time > >>>>>>>>> advancing. > >>>>>>>>> > >>>>>>>>> 2) About the metrics, in KIP-444 we are > >>>>>>>>> consolidating all > >>> types > >>>> of > >>>>>>>>> scenarios that can cause dropped records to the > >>>>>>>>> same metrics: > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi > t+on+change+support+for+Kafka+Streams > >>>>>>>>> > >>>>>>>>> > >>> > late-records-drop: INFO at processor node level, replaced by > >>> INFO > >>>>>>>>> task-level "dropped-records". > >>>>>>>>> > >>>>>>>>> skipped-records: INFO at thread and processor node > >>>>>>>>> level, > >>>> replaced by > >>>>>>> INFO > >>>>>>>>> task-level "dropped-records". > >>>>>>>>> > >>>>>>>>> expired-window-record-drop: DEBUG at state store > >>>>>>>>> level, > >>> replaced > >>>> by > >>>>>>> INFO > >>>>>>>>> task-level "dropped-records". > >>>>>>>>> > >>>>>>>>> The main idea is that instead of using different > >>>>>>>>> metrics to > >>>> indicate > >>>>>>>>> different types of scenarios, and users just alert > >>>>>>>>> on that > >>> single > >>>>>>> metrics. > >>>>>>>>> When alert triggers, they can look into the log4j > >>>>>>>>> for its > >>> causes > >>>> (we > >>>>>>> made > >>>>>>>>> sure that all sensor recordings of this metric > >>>>>>>>> would be > >>>> associated > >>>>>>> with a > >>>>>>>>> warning log4j). > >>>>>>>>> > >>>>>>>>> So I'd suggest that instead of introducing a new > >>>>>>>>> per-node "dropped-idempotent-updates", we just > >>>>>>>>> piggy-back on the > >>> existing > >>>>>>> task-level > >>>>>>>>> metric; unless we think that idempotent drops are > >>>>>>>>> more > >>> frequent > >>>> than > >>>>>>> others > >>>>>>>>> and also they do not worth a warning log, in that > >>>>>>>>> case we can > >>>>>> consider > >>>>>>>>> break this metric down with different tags for > >>>>>>>>> example. > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> On Mon, Mar 2, 2020 at 1:59 PM Richard Yu < > >>>>>> yohan.richard...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> Thanks for the votes so far! @Matthias or > >>>>>>>>>> @Guozhang Wang <guozh...@confluent.io> it > >>> would > >>>> be > >>>>>>> great to > >>>>>>>>>> also get your input on this KIP. > >>>>>>>>>> > >>>>>>>>>> It looks to be pretty close to completion, so the > >>>>>>>>>> finishing > >>>> touches > >>>>>>> are all > >>>>>>>>>> we need. :) > >>>>>>>>>> > >>>>>>>>>> Best, Richard > >>>>>>>>>> > >>>>>>>>>> On Mon, Mar 2, 2020 at 11:45 AM Ghassan Yammine > >>>>>>>>>> < ghassan.yamm...@bazaarvoice.com> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hello all, > >>>>>>>>>>> > >>>>>>>>>>> +1 (non-binding) > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Ghassan > >>>>>>>>>>> > >>>>>>>>>>> On 3/2/20, 12:43 PM, "Bruno Cadonna" > >>>>>>>>>>> <br...@confluent.io > >>>> > >>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> EXTERNAL: This email originated from outside > >>>>>>>>>>> of > >>>> Bazaarvoice. > >>>>>>> Do not > >>>>>>>>>>> click any links or open any attachments unless > >>>>>>>>>>> you trust > >>> the > >>>>>>> sender and > >>>>>>>>>>> know the content is safe. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Hi Richard, > >>>>>>>>>>> > >>>>>>>>>>> +1 (non-binding) > >>>>>>>>>>> > >>>>>>>>>>> Best, Bruno > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Mar 2, 2020 at 4:33 PM John Roesler < > >>>>>>> vvcep...@apache.org> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi Richard, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP! > >>>>>>>>>>>> > >>>>>>>>>>>> I'm +1 (binding) > >>>>>>>>>>>> > >>>>>>>>>>>> -john > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Feb 27, 2020, at 14:40, Richard Yu > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I am proposing a new optimization to Kafka > >>>>>>>>>>>>> Streams > >>>> which > >>>>>>> would > >>>>>>>>>>> greatly > >>>>>>>>>>>>> reduce the number of idempotent updates > >>>>>>>>>>>>> (or > >>> no-ops) > >>>> in > >>>>>> the > >>>>>>> Kafka > >>>>>>>>>>> Streams > >>>>>>>>>>>>> DAG. A number of users have been interested > >>>>>>>>>>>>> in this > >>>> feature, > >>>>>> so > >>>>>>> it > >>>>>>>>>>> would be nice > >>>>>>>>>>>>> to pass this one in. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For information, the KIP is described > >>>>>>>>>>>>> below: > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emi > t+on+change+support+for+Kafka+Streams > >>>>>>>>>>> > >>> > >> > >>>>>>>>>>>>> We aim to make Kafka Streams more efficient > >>>>>>>>>>>>> by > >>>> adopting > >>>>>>> the "emit > >>>>>>>>>>> on > >>>>>>>>>>>>> change" reporting strategy. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Please cast your vote! > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, Richard > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- -- Guozhang > >>>>>> > >>>> > >>> > >>> > >>> -- -- Guozhang > >>> > >> > > > -----BEGIN PGP SIGNATURE----- > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kC1kACgkQO4miYXKq > /OhHKA/+OewqjX248vjk6GO6Ex/f2kOJuIIDGb4/c0NlTIS/Iyat1+S8N9P58KNP > pg133xwdWHagU7wajYMktoFiPamQ+Cv+PPhr7qz38JdfVAvzpNb8tcsI/wr5apOQ > XNlBsPhQBLtO/JQUve72OqY/TC9unbpBfhA4tvdA/qkLNvDaX542SrZdlwXuqTKH > EBpgEPBrwaqJ5S65KTMs6Fppc5c2V3dWOAC7Ssql30OneUd/RS88oQ07oNkwZwss > tADw+tzXtw8a0C0PGtMoXhLrs9wipEsuGOP8N6uvuQCM7YoIvTyeBf3Cu7jG8NFB > r2caoWY4TZkqCRsrKe37nNbR8KpjkNQBxCZ7nvIJ9B3KCdB0JOFQXwYj1+23z6aX > T1otQ+0ZIg5lzpIFiHCzwzO5mo2VUEYryRvanw/f2S/LaaBIcg83Dz5TJIv8dFcd > mU7Vu1KXtpWTgpg48JkWd9qSwPqBaR+nvbdP/DnStwf9/9n5SSGgcdS83jw/w6RV > N1bX6YlDCFYeIIT14lrsbWiHSZpiFARZ0fn+VBm8DAF0g+mWlX5Hg30yHKujDj+h > qMDZkI2K2eoYRJaUFcS3yvr2RqCtgXMCEr+jrAVGHDaq+Lt4mbEJRZdon3MiF0Ht > WmEiNaQa7Tu5h+8P5Rb05kPAB6ODa7/sC0BxC54uRXLdPnNxQCs= > =nuG1 > -----END PGP SIGNATURE----- >