Hi Matthias

I have been mulling over the unsubscribe / delete optimization, and I have
one main concern. I believe that the RHS can only determine whether to
propagate the tombstone or not based on the value passed over from the LHS.
This value would need to be non-null, and so wouldn't the internal
repartition topics end up containing many non-null "tombstone" values?

ie:
Normal tombstone (propagate):     (key=123, value=null)
Don't-propagate-tombstone:          (key=123, value=("don't propagate me,
but please delete state"))

Would this pose some sort of verbosity problem in the internal topics,
especially if we have to rebuild state off of them?

Thanks

Adam





On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> SGTM.
>
> I also had the impression that those duplicates are rather an error than
> an case of eventual consistency. Using hashing to avoid sending the
> payload is a good idea IMHO.
>
> @Adam: can you update the KIP accordingly?
>
>  - add the optimization to not send a reply from RHS to LHS on
> unsubscribe (if not a tombstone)
>  - explain why using offsets to avoid duplicates does not work
>  - add hashing to avoid duplicates
>
> Beside this, I don't have any further comments. Excited to finally get
> this in!
>
> Let us know when you have updated the KIP so we can move forward with
> the VOTE. Thanks a lot for your patience! This was a very loooong shot!
>
>
> -Matthias
>
> On 3/8/19 8:47 AM, John Roesler wrote:
> > Hi all,
> >
> > This proposal sounds good to me, especially since we observe that people
> > are already confused when the see duplicate results coming out of 1:1
> joins
> > (which is a bug). I take this as "evidence" that we're better off
> > eliminating those duplicates from the start. Guozhang's proposal seems
> like
> > a lightweight solution to the problem, so FWIW, I'm in favor.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <adam.bellem...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang
> >>
> >> That would certainly work for eliminating those duplicate values. As it
> >> stands right now, this would be consistent with swallowing changes due
> to
> >> out-of-order processing with multiple threads, and seems like a very
> >> reasonable way forward. Thank you for the suggestion!
> >>
> >> I have been trying to think if there are any other scenarios where we
> can
> >> end up with duplicates, though I have not been able to identify any
> others
> >> at the moment. I will think on it a bit more, but if anyone else has any
> >> ideas, please chime in.
> >>
> >> Thanks,
> >> Adam
> >>
> >>
> >>
> >> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> One more thought regarding *c-P2: Duplicates)*: first I want to
> separate
> >>> this issue with the more general issue that today (not only
> foreign-key,
> >>> but also co-partition primary-key) table-table joins is still not
> >> strictly
> >>> respecting the timestamp ordering since the two changelog streams may
> be
> >>> fetched and hence processed out-of-order and we do not allow a record
> to
> >> be
> >>> joined with the other table at any given time snapshot yet. So ideally
> >> when
> >>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1, v2))
> coming
> >>> at the left hand table and one record (f-k1, v3) at the right hand
> table,
> >>> depending on the processing ordering we may get:
> >>>
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> or
> >>>
> >>> (k1, (f-k1, v1-v3))
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> And this is not to be addressed by this KIP.
> >>>
> >>> What I would advocate is to fix the issue that is introduced in this
> KIP
> >>> alone, that is we may have
> >>>
> >>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> I admit that it does not have correctness issue from the semantics
> along,
> >>> comparing it with "discarding the first result", but it may be
> confusing
> >>> from user's observation who do not expect to see the seemingly
> >> duplicates.
> >>> On the other hand, I think there's a light solution to avoid it, which
> is
> >>> that we can still optimize away to not send the full payload of "v1"
> from
> >>> left hand side to right hand side, but instead of just trimming off the
> >>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm using MD5
> >>> here just as an example, we can definitely replace it with other
> >>> functions), by doing which we can discard the join operation if the
> hash
> >>> value sent back from the right hand side does not match with the left
> >> hand
> >>> side any more, i.e. we will only send:
> >>>
> >>> (k1, (f-k1, v2-v3))
> >>>
> >>> to down streams once.
> >>>
> >>> WDYT?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <
> adam.bellem...@gmail.com>
> >>> wrote:
> >>>
> >>>> Ah yes, I recall it all now. That answers that question as to why I
> had
> >>>> caching disabled. I can certainly re-enable it since I believe the
> main
> >>>> concern was simply about reconciling those two iterators. A lack of
> >>>> knowledge there on my part.
> >>>>
> >>>>
> >>>> Thank you John for weighing in - we certainly both do appreciate it. I
> >>>> think that John hits it on the head though with his comment of "If it
> >>> turns
> >>>> out we're wrong about this, then it should be possible to fix the
> >>> semantics
> >>>> in place, without messing with the API."
> >>>>
> >>>> If anyone else would like to weigh in, your thoughts would be greatly
> >>>> appreciated.
> >>>>
> >>>> Thanks
> >>>>
> >>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <matth...@confluent.io
> >
> >>>> wrote:
> >>>>
> >>>>>>> I dont know how to range scan over a caching store, probably one
> >> had
> >>>>>>> to open 2 iterators and merge them.
> >>>>>
> >>>>> That happens automatically. If you query a cached KTable, it ranges
> >>> over
> >>>>> the cache and the underlying RocksDB and performs the merging under
> >> the
> >>>>> hood.
> >>>>>
> >>>>>>> Other than that, I still think even the regualr join is broken
> >> with
> >>>>>>> caching enabled right?
> >>>>>
> >>>>> Why? To me, if you use the word "broker", it implies conceptually
> >>>>> incorrect; I don't see this.
> >>>>>
> >>>>>> I once files a ticket, because with caching
> >>>>>>>> enabled it would return values that havent been published
> >>> downstream
> >>>>> yet.
> >>>>>
> >>>>> For the bug report, if found
> >>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still need to
> >> fix
> >>>>> this, but it is a regular bug as any other, and we should not change
> >> a
> >>>>> design because of a bug.
> >>>>>
> >>>>> That range() returns values that have not been published downstream
> >> if
> >>>>> caching is enabled is how caching works and is intended behavior. Not
> >>>>> sure why say it's incorrect?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
> >>>>>>
> >>>>>>
> >>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
> >>>>>>> Thanks Adam,
> >>>>>>>
> >>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there is
> >> no
> >>>>>>> functional/correctness requirement to disable caching. I cannot
> >>>>>>> remember why Jan's proposal added this? It might be an
> >>>>>>> implementation detail though (maybe just remove it from the KIP?
> >>>>>>> -- might be miss leading).
> >>>>>>
> >>>>>> I dont know how to range scan over a caching store, probably one
> >> had
> >>>>>> to open 2 iterators and merge them.
> >>>>>>
> >>>>>> Other than that, I still think even the regualr join is broken with
> >>>>>> caching enabled right? I once files a ticket, because with caching
> >>>>>> enabled it would return values that havent been published
> >> downstream
> >>>> yet.
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>
>

Reply via email to