Thanks Adam,

*> Q)
Range scans work with caching enabled, too. Thus, there is no
functional/correctness requirement to disable caching. I cannot remember
why Jan's proposal added this? It might be an implementation detail
though (maybe just remove it from the KIP? -- might be miss leading).


*> (b)
For stream.flatMap()... There is only one input record for the LHS. This
single input record, may produce multiple updates to the KTable --
however, those update records are not written into a topic (before the
update) and thus they don't have their own offsets (if you call
context.offset() for them, you will get the offset of the original input
record, ie, the same offset for all the time). Of course, on write into
the first repartition topic they will get their own offsets. However,
this information seems not to be useful, because this offsets are not
related to the input topic offset and might be from different
partitions. Thus, I don't see how reordering could be resolves using
them? However, I don't think we need offsets anyway to resolve ordering
issues, thus, it does not really matter. (I added this just as a side
remark in my response.)


*c-P1)*
*c-P2: Duplicates)*
I see your point here and agree that it might be difficult to send _all_
result updates if the RHS processors have different load and one might
lag. However, "skipping" some intermediate updates seems to be something
different that "duplicating" an update? However, I also agree that
sending the payload to the RHS might be cost prohibitive. For your
filter example, it seems that calling `KTable.toStream().filter()` would
be incorrect, but that it should be `KTable.filter()`. For the first
scenario I agree that the output record would never make it into the
result, however, for the second case, update semantics are preserved and
thus the result would still be correct (the output record won't be in
the result KTable, either, but this would be correct! -- note that
calling `KTable.toStream()` changes the semantics(!)).

Additionally, as you mentioned ("Monkey Wrench"), there are still other
scenario for which intermediate update might get swallowed. This seems
to be a general "issue" in the current design. Maybe that was the main
criticism from Jan (and it is a quite valid point). However, which
caching enabled, swallowing updates is intended behavior anyway, so I
personally don't think this is a big concern atm. (This might go back to
Jan's proposal to disable caching to avoid swallowing any updates?)

-> would be good to get input from others about the "swallowing" issue

Thus, I agree with your conclusion that sending the payload to RHS does
not resolve this issue. However, I still believe that the current design
ensures that we don't overwrite the correct final result with a stale
intermediate result. The FK check before the result data is emitted
seems to be sufficient for that (I don't think we need timestamp or
offsets to resolve this). The FK check has the only issue, that it may
produce "duplicate" updates. Or did I miss anything?

-> would be good to get input from others about this "duplicate update"
issue



-Matthias




On 3/4/19 8:29 AM, Adam Bellemare wrote:
> Hi Matthias
> 
> Thank you for the feedback! I appreciate your well thought-out questions. I
> have tried to answer and comment on everything that I know below.
> 
> 
> 
> *> Q) For the materialized combined-key store, why do we need to disable>
> caching? And why do we need to flush the store?*
> This is an artifact from Jan's implementation that I have carried along. My
> understanding (though possibly erroneous!) is that RocksDB prefix scan
> doesn't work with the cache, and ignores any data stored within it. I have
> tried to validate this but I have not succeeded, so I believe that this
> will need more investigation and testing. I will dig deeper on this and get
> back to you.
> 
> 
> 
> *> a) Thus, I am wondering why we would need to send the `null` message
> back> (from RHS to LHS) in the first place?*
> 
> We don't need to, if we follow your subsequent tombstone suggestion.
> 
> 
> 
> 
> 
> *> (b) About using "offsets" to resolve ordering issue: I don't think this>
> would work. The join input table would be created as>
> stream.flatMapValues().groupByKey().aggregate()*
> Hmmm... I am a bit fuzzy on this part. Wouldn't the LHS processor be able
> to get the highest offset and propagate that onwards to the RHS processor?
> In my original design I had a wrapper that kept track of the input offset,
> though I suspect it did not work for the above aggregation scenario.
> 
> *c-P1)*
> Regarding the consistency examples, everything you wrote is correct as far
> as I can tell in how the proposed system would behave. Rapid updates to the
> LHS will result in some of the results being discarded (in the case of
> deletes or change of FK) or doubly-produced (discussed below, after the
> following example).
> 
> It does not seem to me to be possible to avoid quashing records that are
> late arriving from the RHS. This could commonly be exhibited by two RHS
> processors that are receiving very different loads. In the example below,
> consider RHS-1 to be heavily loaded while RHS-2 is idle.
> 
> Example:
> 1- RHS-1 is updated to Y|bar
> 2- RHS-2 is updated to Z|foo
> 3- LHS is updated to A|Y
>    -> sends Y|A+ subscription message to RHS-1
> 3- LHS is updated to A|Z
>    -> sends Y|A- unsubscribe message to RHS-1
>    -> sends Z|A+ subscription to RHS-2
> 4- RHS-2 processes Z|A message immediately
>    -> sends A|Z,foo back
> 5- LHS processes A|Z,foo and produces result record A|join(Z,foo)
> 4- RHS-1 processes Y|A message
>    -> sends A|Y,bar back
> 4- RHS-1 processes Y|A- unsubscribe message
>    -> sends A|null message back
> X- LHS processes A|Y,bar, compares it to A|Z, and discards it due to
> staleness.
> X- LHS processes A|null, compares it to A|Z, and discards it due to
> staleness.
> 
> In this case, intermediate messages were discarded due to staleness. From
> the outside, this can be seen as "incorrect" because these intermediate
> results were not shown. However, there is no possible way for RHS-2 to know
> to delay production of its event until RHS-1 has completed its
> propagations. If we wish to produce all intermediate events, in order, we
> must maintain state on the LHS about which events have been sent out, await
> their return, and only publish them in order. Aside from the obvious
> complexity and memory requirements, the intermediate events would
> immediately be stomped by the final output.
> 
> 
> *c-P2: Duplicates)*
> With regards to duplicates (as per the double-sending of `A|Y,2,bar`), one
> option is to ship the entire payload of the LHS over to the RHS, and either
> join there or ship the entire payload back along with the RHS record. We
> would still need to compare the FK on the LHS to ensure that it is still
> valid. To take your example and expand it:
> 
> 1- RHS is updated to Y|bar
> 2- LHS is updated to A|Y,1
>    -> sends Y|(A, (Y,1))+ subscription message to RHS
> 3- LHS is updated to A|Y,2
>    -> sends Y|(A, (Y,1))- unsubscribe message to RHS
>    -> sends Y|(A, (Y,2))+ subscription to RHS
> 4- RHS processes first Y|A+ message
>    -> sends A|(A, (Y,1)),bar back
> 5- LHS processes A|(A, (Y,1)),bar and produces result record A|Y,1,bar
> 6- RHS processes Y|(A, (Y,1))- unsubscribe message (update store only)
> 7- RHS processes second Y|(A, (Y,2))+ subscribe message
>    -> sends A|(A, (Y,2)),bar back
> 8- LHS processes A|(A, (Y,2)),bar and produces result record A|Y,2,bar
> 
> Thus, the first result record is now `A|Y,1,bar`, while the second is
> `A|Y,2,bar`.
> 
> This will add substantially to the data payload size. The question here
> then becomes, "In which scenario is this a necessity"?
> 
> A possible scenario may include:
>     ktable.toStream.filter(filterFunc).foreach( workFunc )
>     //filterFunc true if value == (Y,1), else false
> If the intermediate event (`A|Y,1`) is never produced + filtered, then
> workFunc will not be executed. If I am mistaken on this point, please let
> me know.
> 
> 
> 
> *Monkey Wrench)If you change the foreign key (Say, A|Z,1) while the updates
> in step 2 & 3 above are processing in step 4, the results will all be
> rejected anyways upon returning to the LHS. So even if we send the payload,
> the results will be rejected as stale.*
> 
> *Conclusion:*
> My two cents is that without full, in-order, intermediate record production
> (due to the issues I illustrated in C-P1), I do not believe we should be
> overly concerned about the other subset of intermediate messages. Fixing
> the "duplicate" output records by sending the entire payload over the wire
> is only a partial solution, as changes to theLHS FK can stomp them anyways
> (see Monkey Wrench, immediately above).  If we want all intermediate
> results to be produced, we will need to come up with another solution, and
> still accept that it wont be possible to produce some (again, see C-P1). I
> do not believe this is worth the effort.
> 
> The main issue is "how do we not overwrite final results with stale
> results?" I do not think that we can find a satisfactory intermediate-data
> production mechanism with the current design. However, if we cannot use
> offsets (see b) above) or timestamps to then I do not see any way forward
> to ensuring consistent intermediate event production.
> 
> Thanks
> Adam
> 
> 
> On Thu, Feb 28, 2019 at 9:20 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Adam,
>>
>> I finally had the time to review the KIP and catch up on the mailing
>> list discussion. Thanks a lot for putting this together! Great work! Of
>> course also big thanks to Jan who started the KIP initially.
>>
>> This is a long email, because I re-read the discussion for multiple
>> month and reply to many things... I don't think there is a need to reply
>> to every point I mention. Just want to add my 2 cents to a couple of
>> points that were discussed.
>>
>>
>> (0) Overall the design makes sense to me. The API is intuitive and clean
>> now. The API in the original proposal leaked a lot of implementation
>> details, what was a major concern to me. I also believe that it's
>> important to partition the data of the result KTable correctly (the
>> KScatteredTable does violate this; ie, the "key is useless" as Jan
>> phrased it), thus the last step seems to be mandatory to me. Also adding
>> a KScatteredKTable adds a lot of new public API that is basically just
>> duplicating existing APIs (filter, mapValue, groupBy, toStream, join).
>> Lastly, I am happy that we don't need to "watermark/header" stuff to fix
>> the ordering race condition.
>>
>> (1) About the optimization potential for multiple consecutive join: I
>> think we could tackle this with the optimization framework we have in
>> place now.
>>
>> (2) I was also thinking about left/outer join, and I believe that we
>> could add a left-join easily (as follow up work; even if I think it's
>> not a big addition to the current design). However, an outer-join does
>> not make too much sense because we don't have a key for the result
>> KTable of "right hand side" records that don't join (ie, the
>> right-outer-join part cannot be done).
>>
>> (3) About the "emit on change" vs "emit on update" discussion. I think
>> this is orthogonal to this KIP and I would stick with "emit on update"
>> because this is the current behavior of all existing operators. If we
>> want to change it, we should consider to do this for all operators. I
>> also believe, even if it does not change the API, it should be backed
>> with a KIP, because it is a semantics (ie, major) change.
>>
>>
>>
>> @Jan:
>>
>>> I have a lengthy track record of loosing those kinda arguments within
>> the streams community and I have no clue why
>>
>> Because you are a power user, that has different goals in mind. We tend
>> to optimize the API that it's easy to use for non-power users what is
>> the majority of people. The KScatteredTable is a hard to grog concept...
>>
>>> where simplicity isn't really that as users still need to understand it
>> I argue
>>
>> I disagree here. If we do a good job designing the APIs, user don't need
>> to understand the nitty-gritty details, and it "just works".
>>
>>
>> For the performance discussion, ie, which side is "larger": this does
>> not really matter (number of keys is irrelevant) IHMO. The question is,
>> which side is _updated_ more often and what is "n" (the factor "n" would
>> not be relevant for Jan's proposal though). For every left hand side
>> update, we send 2 messages to the right hand side and get 2 messages
>> back. For every right hand side update we send n messages to the left
>> hand side.
>>
>> I agree with Jan we can't know this though (not the irrelevant "size" of
>> each side, nor the "n", nor the update rate).
>>
>>
>>
>>
>>
>> Finally, couple of questions/comments on the KIP (please reply to this
>> part :)):
>>
>>  - For the materialized combined-key store, why do we need to disable
>> caching? And why do we need to flush the store?
>>
>>  - About resolving order:
>>
>> (a) for each LHS update, we need to send two records to the RHS (one to
>> "unsubscribe" the old FK, and on to "subscribe" to the new FK). The KIP
>> further proposes to send two records back: `null` for the unsubscribe
>> and a new join "result" for the new FK. This can result in ordering
>> issues that we want to resolve with the FK lookup in the final step.
>>
>>> The thing is that in that last join, we have the opportunity to compare
>> the
>>> current FK in the left table with the incoming PK of the right table. If
>>> they don't match, we just drop the event, since it must be outdated.
>>
>> Jan criticized this as "swallowing" updates if they arrive out-of-order
>> and the delete is not reflected in the result KTable (if I understood
>> him correctly). I disagree with Jan, and actually think, we should
>> always avoid the delete on the result KTable to begin with:
>>
>> If both records arrive in the correct order on the LHS, we would still
>> produce two output messages downstream. This is intuitive, because we
>> _know_ that a single update to the LHS table, should result in _one_
>> update to the result KTable. And we also know, that the update is based
>> on (ie, correct for) the new FK.
>>
>> Thus, I am wondering why we would need to send the `null` message back
>> (from RHS to LHS) in the first place?
>>
>> Instead, we could encode if the RHS should send something back or not.
>> This way, an "unsubscribe" message will only update the store for the
>> CominedKey (ie, delete the corresponding entry) and only the new FK will
>> trigger a join lookup in the RHS table to compute a "result" that is
>> sent back. If the LHS input is a tombstone, we send on "unsubscribe" as
>> always, plus a `null` "subscribe" message: this ensures that we still
>> get a join result tombstone back to update (ie, delete the entry from)
>> the result KTable.
>>
>> Example: (we start with two empty tables)
>>
>> 1- RHS is updated to Y|foo
>> 2- RHS is updated to Z|bar
>> 3- LHS is updates to A|Y
>>    -> sends Y|A+ subscribe message to RHS (no unsubscribe necessary)
>> 4- RHS processes Y|A+ subscribe message
>>    -> sends A|Y,foo message back
>> 5- LHS processed A|Y,foo and produces result record A|Y,foo
>> 6- LSH is updates to A|Z
>>    -> sends Y|A- unsubscribe message to RHS
>>    -> sends Z|A+ subscribe message to RHS
>> 7- RHS processes Y|A- unsubscribe message (update store only)
>> 8- RHS processes Z|A+ subscribe message
>>    -> sends A|Z,bar message back
>> 9- LHS processed A|Z,bar and produces result record A|Z,bar
>>
>>
>> delete case (cont example):
>>
>> 10- LSH is updates to A|null (tombstone)
>>     -> sends Z|A* subscribe message to RHS
>>     (* indicates tombstone, we still need to encode A to be able to
>> delete on RHS)
>> 11- RHS processes Z|A* subscribe message (update store, ie, delete)
>>     -> sends A|null message back
>> 13- LHS processed A|null and produces result record A|null
>>
>>
>> Maybe we could even shortcut this further, by sending only the old FK
>> "unsubscribe" message and emit a tombstone to the result KTable
>> directly. If there are "stuck" join results for the same LHS record on
>> the RHS that arrive later, we can detect this case, because there is no
>> LHS record anymore, and thus drop those records. However, I am not 100%
>> sure if this would be correct (cf. point (c) below).
>>
>> delete case (optimized):
>>
>> 10- LSH is updates to A|null (tombstone)
>>     -> sends Z|A- unsubscribe message to RHS
>>     -> produces result record A|null directly
>> 11- RHS processes Z|A- unsubscribe message (update store only)
>>
>>
>> Also note that we still need the logic to resolve un-order, because
>> there might also be un-order to consecutive LHS updates to the same
>> record between subscribe messages, too. My proposal above only gets rid
>> of the race condition within a single LHS update (ie, race between
>> unsubscribe and subscribe).
>>
>>
>> (b) About using "offsets" to resolve ordering issue: I don't think this
>> would work. The join input table would be created as
>>
>>     stream.flatMapValues().groupByKey().aggregate()
>>
>> For this case, multiple KTable updates have the same input record and
>> thus the same offset. Hence, there is no guarantee that offsets are
>> unique and thus we cannot use them to resolve update conflicts.
>>
>>
>> (c) Using the current method to avoid races, may not be correct though
>> (or maybe the scenario below is a case of eventual consistency and not a
>> correctness issue -- I am not sure how to judge it).
>>
>> We start with two empty tables:
>>
>> 1- RHS is updated to Y|bar
>> 2- LHS is updated to A|Y,1
>>    -> sends Y|A+ subscription message to RHS
>> 3- LHS is updated to A|Y,2
>>    -> sends Y|A- unsubscribe message to RHS
>>    -> sends Y|A+ subscription to RHS
>> 4- RHS processes first Y|A+ message
>>    -> sends A|Y,bar back
>> 5- LHS processes A|Y,bar and produces result record A|Y,2,bar
>> 6- RHS processes Y|A- unsubscribe message (update store only)
>> 7- RHS processes second Y|A+ subscribe message
>>    -> sends A|Y,bar back
>> 8- LHS processes A|Y,bar and produces result record A|Y,2,bar
>>
>> Thus, the first result record, that should have been `A|Y,1,bar`, is now
>> `A|Y,2,bar`. Furthermore, we update result table from `A|Y,2,bar` to
>> `A|Y,2,bar`.
>>
>> It's unclear to me, if this should be considered an incorrect
>> (intermediate) result or not? The point being, the result is "eventually
>> correct too early" because we join the second LHS right twice now
>> (instead of joining each LHS record once).
>>
>> From a user point of view, LHS is updated with A|Y,1 and A|Y,2, and
>> first result record does not produce any output, while second result
>> record produces a "duplicate" result.
>>
>> Because all this happens on the same LHS key, I am wondering if this
>> violated correctness (even if we end up with correct final result).
>>
>>
>> Same "issue" for delete optimization as mentioned above:
>>
>> If we shortcut the round trip and only send one unsubscribe message and
>> emit a tombstone directly on the LHS, there might be in-flight updates
>> to the same LHS record "stuck" on the right hand side. If we get a new
>> update (for the same key) to the LHS after the LHS delete, and
>> afterwards process the "stuck" right hand side updates, we would not be
>> able to drop those records (because the LHS table is not empty any
>> longer). Again, we end up with the correct final result, however, I am
>> not sure if those intermediate results should be consider "incorrect" or
>> only "wrong in an eventual consistent way".
>>
>>
>> (I hope I got all examples right... *urgs*)
>>
>>
>> If you made it this far, I am very proud of you!!
>>
>>
>> -Matthias
>>
>>
>>
>>
>>
>>
>> On 1/11/19 12:29 PM, John Roesler wrote:
>>> Hi Jan,
>>>
>>> Thanks for the reply.
>>>
>>> It sounds like your larger point is that if we provide a building block
>>> instead of the whole operation, then it's not too hard for users to
>>> implement the whole operation, and maybe the building block is
>>> independently useful.
>>>
>>> This is a very fair point. In fact, it's not exclusive with the current
>>> plan,
>>> in that we can always add the "building block" version in addition to,
>>> rather than instead of, the full operation. It very well might be a
>> mistake,
>>> but I still prefer to begin by introducing the fully encapsulated
>> operation
>>> and subsequently consider adding the "building block" version if it turns
>>> out that the encapsulated version is insufficient.
>>>
>>> IMHO, one of Streams's strengths over other processing frameworks
>>> is a simple API, so simplicity as a design goal seems to suggest that:
>>>> a.tomanyJoin(B)
>>> is preferable to
>>>> a.map(retain(key and FK)).tomanyJoin(B).groupBy(a.key()).join(A)
>>> at least to start with.
>>>
>>> To answer your question about my latter potential optimization,
>>> no I don't have any code to look at. But, yes, the implementation
>>> would bring B into A's tasks and keep them in a state store for joining.
>>> Thanks for that reference, it does indeed sound similar to what
>>> MapJoin does in Hive.
>>>
>>> Thanks again,
>>> -John
>>>
>>> On Mon, Jan 7, 2019 at 5:06 PM Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On 02.01.2019 23:44, John Roesler wrote:
>>>>> However, you seem to have a strong intuition that the scatter/gather
>>>>> approach is better.
>>>>> Is this informed by your actual applications at work? Perhaps you can
>>>>> provide an example
>>>>> data set and sequence of operations so we can all do the math and agree
>>>>> with you.
>>>>> It seems like we should have a convincing efficiency argument before
>>>>> choosing a more
>>>>> complicated API over a simpler one.
>>>>
>>>> The way I see this is simple. If we only provide the basic
>>>> implementation of 1:n join (repartition by FK, Range scan on Foreign
>>>> table update). Then this is such a fundamental building block.
>>>>
>>>> I do A join B.
>>>>
>>>> a.map(retain(key and FK)).tomanyJoin(B).groupBy(a.key()).join(A). This
>>>> pretty much performs all your "wire saving optimisations". I don't know!
>>>> to be honest if someone did put this ContextAwareMapper() that was
>>>> discussed at some point. Then I could actually do the high watermark
>>>> thing. a.contextMap(reatain(key, fk and offset).
>>>> omanyJoin(B).aggregate(a.key(), oldest offset wins).join(A).
>>>> I don't find the KIP though. I guess it didn't make it.
>>>>
>>>> After the repartition and the range read the abstraction just becomes to
>>>> weak. I just showed that your implementation is my implementation with
>>>> stuff around it.
>>>>
>>>> I don't know if your scatter gather thing is in code somewhere. If the
>>>> join will only be applied after the gather phase I really wonder where
>>>> we get the other record from? do you also persist the foreign table on
>>>> the original side? If that is put into code somewhere already?
>>>>
>>>> This would essentially bring B to each of the A's tasks. Factors for
>>>> this in my case a rather easy and dramatic. Nevertheless an approach I
>>>> would appreciate. In Hive this could be something closely be related to
>>>> the concept of a MapJoin. Something I whish we had in streams. I often
>>>> stated that at some point we need unbounded ammount off offsets per
>>>> topicpartition and group :D Sooooo good.
>>>>
>>>> Long story short. I hope you can follow my line of thought. I hope you
>>>> can clarify my missunderstanding how the join is performed on A side
>>>> without materializing B there.
>>>>
>>>> I would love if streams would get it right. The basic rule I always say
>>>> is do what Hive does. done.
>>>>
>>>>
>>>>>
>>>>> Last thought:
>>>>>> Regarding what will be observed. I consider it a plus that all events
>>>>>> that are in the inputs have an respective output. Whereas your
>> solution
>>>>>> might "swallow" events.
>>>>>
>>>>> I didn't follow this. Following Adam's example, we have two join
>>>> results: a
>>>>> "dead" one and
>>>>> a "live" one. If we get the dead one first, both solutions emit it,
>>>>> followed by the live result.
>>>>
>>>> there might be multiple dead once in flight right? But it doesn't really
>>>> matter, I never did something with the extra benefit i mentioned.
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to