> I am more inclined to go with keeping it consistent and
>> separated into a normal repartition topic and a normal changelog topic
>> otherwise.

SGTM.


-Matthias

On 3/11/19 11:18 AM, Adam Bellemare wrote:
> Hi John
> 
> Thanks for the explanation. I wasn't sure how KTable repartition topics
> were handled with regards to cleanup but I just wanted to double check to
> see if it could cause an issue.
> 
> @Matthias
> My inclination is to keep the DSL topologies consistent with one another. I
> am a bit concerned about scope creep into the header domain, and I am not
> sure how much performance would be improved vs. additional complexity. I
> think if we go down this approach we should consider a new type of internal
> topic so that it's not confused with existing repartition and changelog
> topic types. I am more inclined to go with keeping it consistent and
> separated into a normal repartition topic and a normal changelog topic
> otherwise.
> 
> Thanks
> Adam
> 
> 
> 
> 
> 
> 
> On Mon, Mar 11, 2019 at 1:24 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I guess Adam suggests, to use compaction for the repartition topic and
>> don't purge data. Doing this, would allow us to avoid a store changelog
>> topic for the "subscription store" on the RHS. This would be a nice
>> optimization.
>>
>> But the concern about breaking compaction is correct. However, I see it
>> as an optimization only and thus, if we keep the topic as plain
>> repartition topic and use a separate store changelog topic the issue
>> resolves itself.
>>
>> Maybe we could use headers thought to get this optimization. Do you
>> think it's worth to do this optimization or just stick with the simple
>> design and two topics (repartition plus changelog)?
>>
>>
>>
>> @Adam: thanks for updating the Wiki page. LGTM.
>>
>>
>> -Matthias
>>
>>
>> On 3/11/19 9:24 AM, John Roesler wrote:
>>> Hey Adam,
>>>
>>> That's a good observation, but it wouldn't be a problem for repartition
>>> topics because Streams aggressively deletes messages from the reparation
>>> topics once it knows they are handled. Thus, we don't need to try and
>> cater
>>> to the log compactor.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Mon, Mar 11, 2019 at 9:10 AM Adam Bellemare <adam.bellem...@gmail.com
>>>
>>> wrote:
>>>
>>>> For the sake of expediency, I updated the KIP with what I believe we
>> have
>>>> discussed.
>>>>
>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-Tombstones&ForeignKeyChanges
>>>>
>>>>
>>>>
>>>> On Mon, Mar 11, 2019 at 8:20 AM Adam Bellemare <
>> adam.bellem...@gmail.com>
>>>> wrote:
>>>>
>>>>> My only concern was around compaction of records in the repartition
>>>> topic.
>>>>> This would simply mean that these records would stick around as their
>>>> value
>>>>> isn't truly null. Though I know about the usage of compaction on
>>>> changelog
>>>>> topics, I am a bit fuzzy on where we use compaction in other internal
>>>>> topics. So long as this doesn't cause concern I can certainly finish
>> off
>>>>> the KIP today.
>>>>>
>>>>> Thanks
>>>>>
>>>>> Adam
>>>>>
>>>>> On Sun, Mar 10, 2019 at 11:38 PM Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I agree that the LHS side must encode this information and tell the
>> RHS
>>>>>> if a tombstone requires a reply or not.
>>>>>>
>>>>>>>> Would this pose some sort of verbosity problem in the internal
>>>> topics,
>>>>>>>> especially if we have to rebuild state off of them?
>>>>>>
>>>>>> I don't see an issue atm. Can you elaborate how this relates to
>> rebuild
>>>>>> state?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 3/10/19 12:25 PM, Adam Bellemare wrote:
>>>>>>> Hi Matthias
>>>>>>>
>>>>>>> I have been mulling over the unsubscribe / delete optimization, and I
>>>>>> have
>>>>>>> one main concern. I believe that the RHS can only determine whether
>> to
>>>>>>> propagate the tombstone or not based on the value passed over from
>> the
>>>>>> LHS.
>>>>>>> This value would need to be non-null, and so wouldn't the internal
>>>>>>> repartition topics end up containing many non-null "tombstone"
>> values?
>>>>>>>
>>>>>>> ie:
>>>>>>> Normal tombstone (propagate):     (key=123, value=null)
>>>>>>> Don't-propagate-tombstone:          (key=123, value=("don't propagate
>>>>>> me,
>>>>>>> but please delete state"))
>>>>>>>
>>>>>>> Would this pose some sort of verbosity problem in the internal
>> topics,
>>>>>>> especially if we have to rebuild state off of them?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Adam
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 8, 2019 at 10:14 PM Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> SGTM.
>>>>>>>>
>>>>>>>> I also had the impression that those duplicates are rather an error
>>>>>> than
>>>>>>>> an case of eventual consistency. Using hashing to avoid sending the
>>>>>>>> payload is a good idea IMHO.
>>>>>>>>
>>>>>>>> @Adam: can you update the KIP accordingly?
>>>>>>>>
>>>>>>>>  - add the optimization to not send a reply from RHS to LHS on
>>>>>>>> unsubscribe (if not a tombstone)
>>>>>>>>  - explain why using offsets to avoid duplicates does not work
>>>>>>>>  - add hashing to avoid duplicates
>>>>>>>>
>>>>>>>> Beside this, I don't have any further comments. Excited to finally
>>>> get
>>>>>>>> this in!
>>>>>>>>
>>>>>>>> Let us know when you have updated the KIP so we can move forward
>> with
>>>>>>>> the VOTE. Thanks a lot for your patience! This was a very loooong
>>>> shot!
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 3/8/19 8:47 AM, John Roesler wrote:
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> This proposal sounds good to me, especially since we observe that
>>>>>> people
>>>>>>>>> are already confused when the see duplicate results coming out of
>>>> 1:1
>>>>>>>> joins
>>>>>>>>> (which is a bug). I take this as "evidence" that we're better off
>>>>>>>>> eliminating those duplicates from the start. Guozhang's proposal
>>>> seems
>>>>>>>> like
>>>>>>>>> a lightweight solution to the problem, so FWIW, I'm in favor.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> -John
>>>>>>>>>
>>>>>>>>> On Fri, Mar 8, 2019 at 7:59 AM Adam Bellemare <
>>>>>> adam.bellem...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Guozhang
>>>>>>>>>>
>>>>>>>>>> That would certainly work for eliminating those duplicate values.
>>>> As
>>>>>> it
>>>>>>>>>> stands right now, this would be consistent with swallowing changes
>>>>>> due
>>>>>>>> to
>>>>>>>>>> out-of-order processing with multiple threads, and seems like a
>>>> very
>>>>>>>>>> reasonable way forward. Thank you for the suggestion!
>>>>>>>>>>
>>>>>>>>>> I have been trying to think if there are any other scenarios where
>>>> we
>>>>>>>> can
>>>>>>>>>> end up with duplicates, though I have not been able to identify
>> any
>>>>>>>> others
>>>>>>>>>> at the moment. I will think on it a bit more, but if anyone else
>>>> has
>>>>>> any
>>>>>>>>>> ideas, please chime in.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Adam
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 7, 2019 at 8:19 PM Guozhang Wang <wangg...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> One more thought regarding *c-P2: Duplicates)*: first I want to
>>>>>>>> separate
>>>>>>>>>>> this issue with the more general issue that today (not only
>>>>>>>> foreign-key,
>>>>>>>>>>> but also co-partition primary-key) table-table joins is still not
>>>>>>>>>> strictly
>>>>>>>>>>> respecting the timestamp ordering since the two changelog streams
>>>>>> may
>>>>>>>> be
>>>>>>>>>>> fetched and hence processed out-of-order and we do not allow a
>>>>>> record
>>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>> joined with the other table at any given time snapshot yet. So
>>>>>> ideally
>>>>>>>>>> when
>>>>>>>>>>> there are two changelog records (k1, (f-k1, v1)), (k1, (f-k1,
>> v2))
>>>>>>>> coming
>>>>>>>>>>> at the left hand table and one record (f-k1, v3) at the right
>> hand
>>>>>>>> table,
>>>>>>>>>>> depending on the processing ordering we may get:
>>>>>>>>>>>
>>>>>>>>>>> (k1, (f-k1, v2-v3))
>>>>>>>>>>>
>>>>>>>>>>> or
>>>>>>>>>>>
>>>>>>>>>>> (k1, (f-k1, v1-v3))
>>>>>>>>>>> (k1, (f-k1, v2-v3))
>>>>>>>>>>>
>>>>>>>>>>> And this is not to be addressed by this KIP.
>>>>>>>>>>>
>>>>>>>>>>> What I would advocate is to fix the issue that is introduced in
>>>> this
>>>>>>>> KIP
>>>>>>>>>>> alone, that is we may have
>>>>>>>>>>>
>>>>>>>>>>> (k1, (f-k1, v2-v3))   // this should actually be v1-v3
>>>>>>>>>>> (k1, (f-k1, v2-v3))
>>>>>>>>>>>
>>>>>>>>>>> I admit that it does not have correctness issue from the
>> semantics
>>>>>>>> along,
>>>>>>>>>>> comparing it with "discarding the first result", but it may be
>>>>>>>> confusing
>>>>>>>>>>> from user's observation who do not expect to see the seemingly
>>>>>>>>>> duplicates.
>>>>>>>>>>> On the other hand, I think there's a light solution to avoid it,
>>>>>> which
>>>>>>>> is
>>>>>>>>>>> that we can still optimize away to not send the full payload of
>>>> "v1"
>>>>>>>> from
>>>>>>>>>>> left hand side to right hand side, but instead of just trimming
>>>> off
>>>>>> the
>>>>>>>>>>> whole bytes, we can send, e.g., an MD5 hash of the bytes (I'm
>>>> using
>>>>>> MD5
>>>>>>>>>>> here just as an example, we can definitely replace it with other
>>>>>>>>>>> functions), by doing which we can discard the join operation if
>>>> the
>>>>>>>> hash
>>>>>>>>>>> value sent back from the right hand side does not match with the
>>>>>> left
>>>>>>>>>> hand
>>>>>>>>>>> side any more, i.e. we will only send:
>>>>>>>>>>>
>>>>>>>>>>> (k1, (f-k1, v2-v3))
>>>>>>>>>>>
>>>>>>>>>>> to down streams once.
>>>>>>>>>>>
>>>>>>>>>>> WDYT?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 6, 2019 at 7:58 AM Adam Bellemare <
>>>>>>>> adam.bellem...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah yes, I recall it all now. That answers that question as to
>>>> why I
>>>>>>>> had
>>>>>>>>>>>> caching disabled. I can certainly re-enable it since I believe
>>>> the
>>>>>>>> main
>>>>>>>>>>>> concern was simply about reconciling those two iterators. A lack
>>>> of
>>>>>>>>>>>> knowledge there on my part.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you John for weighing in - we certainly both do appreciate
>>>>>> it. I
>>>>>>>>>>>> think that John hits it on the head though with his comment of
>>>> "If
>>>>>> it
>>>>>>>>>>> turns
>>>>>>>>>>>> out we're wrong about this, then it should be possible to fix
>> the
>>>>>>>>>>> semantics
>>>>>>>>>>>> in place, without messing with the API."
>>>>>>>>>>>>
>>>>>>>>>>>> If anyone else would like to weigh in, your thoughts would be
>>>>>> greatly
>>>>>>>>>>>> appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Mar 5, 2019 at 6:05 PM Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>>> I dont know how to range scan over a caching store, probably
>>>> one
>>>>>>>>>> had
>>>>>>>>>>>>>>> to open 2 iterators and merge them.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That happens automatically. If you query a cached KTable, it
>>>>>> ranges
>>>>>>>>>>> over
>>>>>>>>>>>>> the cache and the underlying RocksDB and performs the merging
>>>>>> under
>>>>>>>>>> the
>>>>>>>>>>>>> hood.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Other than that, I still think even the regualr join is
>> broken
>>>>>>>>>> with
>>>>>>>>>>>>>>> caching enabled right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Why? To me, if you use the word "broker", it implies
>>>> conceptually
>>>>>>>>>>>>> incorrect; I don't see this.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I once files a ticket, because with caching
>>>>>>>>>>>>>>>> enabled it would return values that havent been published
>>>>>>>>>>> downstream
>>>>>>>>>>>>> yet.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the bug report, if found
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6599. We still
>> need
>>>>>> to
>>>>>>>>>> fix
>>>>>>>>>>>>> this, but it is a regular bug as any other, and we should not
>>>>>> change
>>>>>>>>>> a
>>>>>>>>>>>>> design because of a bug.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That range() returns values that have not been published
>>>>>> downstream
>>>>>>>>>> if
>>>>>>>>>>>>> caching is enabled is how caching works and is intended
>>>> behavior.
>>>>>> Not
>>>>>>>>>>>>> sure why say it's incorrect?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 3/5/19 1:49 AM, Jan Filipiak wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 04.03.2019 19:14, Matthias J. Sax wrote:
>>>>>>>>>>>>>>> Thanks Adam,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *> Q) Range scans work with caching enabled, too. Thus, there
>>>> is
>>>>>>>>>> no
>>>>>>>>>>>>>>> functional/correctness requirement to disable caching. I
>>>> cannot
>>>>>>>>>>>>>>> remember why Jan's proposal added this? It might be an
>>>>>>>>>>>>>>> implementation detail though (maybe just remove it from the
>>>> KIP?
>>>>>>>>>>>>>>> -- might be miss leading).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I dont know how to range scan over a caching store, probably
>>>> one
>>>>>>>>>> had
>>>>>>>>>>>>>> to open 2 iterators and merge them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Other than that, I still think even the regualr join is broken
>>>>>> with
>>>>>>>>>>>>>> caching enabled right? I once files a ticket, because with
>>>>>> caching
>>>>>>>>>>>>>> enabled it would return values that havent been published
>>>>>>>>>> downstream
>>>>>>>>>>>> yet.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to