vcrfxia commented on code in PR #13564: URL: https://github.com/apache/kafka/pull/13564#discussion_r1171929315
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ########## @@ -139,7 +139,7 @@ public void process(final Record<K, Change<V1>> record) { oldValue = joiner.apply(record.value().oldValue, valueRight); } - context().forward(record.withValue(new Change<>(newValue, oldValue)).withTimestamp(resultTimestamp)); + context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest)).withTimestamp(resultTimestamp)); Review Comment: Thought of an interesting edge case earlier, has to do with nulls. Doesn't apply to inner joins but suppose we have a left join: ``` B: (k, b, ts=3) A: (k, a1, ts=1) --> emit join result (a1, b, ts=3) B: (k, null, ts=4) --> emit join result (a1, null, ts=4) A: (k, a2, ts=2) --> emit join result (a2, null, ts=2) ``` The reason that the last join result is emitted with timestamp 2 instead of 4 is because when the null is looked up as the latest value from the B store, there is no timestamp associated with it, and therefore the A record timestamp is used. This example is interesting because the final join result `(a2, null)` is the "most recent" join result in the sense that it is the join of the latest record from the A side (`a2`) with the latest record from the B side (`null`), but it does not have the latest timestamp of the join result records (because the previous join result has timestamp 4). So, which join result should be considered the latest? It should probably be the last one, right? In that case it would be extra wrong to materialize the join result as a versioned store, since a versioned store would think that the second-to-last join result is the latest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org