vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171929315


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -139,7 +139,7 @@ public void process(final Record<K, Change<V1>> record) {
                 oldValue = joiner.apply(record.value().oldValue, valueRight);
             }
 
-            context().forward(record.withValue(new Change<>(newValue, 
oldValue)).withTimestamp(resultTimestamp));
+            context().forward(record.withValue(new Change<>(newValue, 
oldValue, record.value().isLatest)).withTimestamp(resultTimestamp));

Review Comment:
   Thought of an interesting edge case earlier, has to do with nulls. Doesn't 
apply to inner joins but suppose we have a left join:
   ```
   B: (k, b, ts=3)
   A: (k, a1, ts=1) --> emit join result (a1, b, ts=3)
   B: (k, null, ts=4) --> emit join result (a1, null, ts=4)
   A: (k, a2, ts=2) --> emit join result (a2, null, ts=2)
   ```
   The reason that the last join result is emitted with timestamp 2 instead of 
4 is because when the null is looked up as the latest value from the B store, 
there is no timestamp associated with it, and therefore the A record timestamp 
is used. 
   
   This example is interesting because the final join result `(a2, null)` is 
the "most recent" join result in the sense that it is the join of the latest 
record from the A side (`a2`) with the latest record from the B side (`null`), 
but it does not have the latest timestamp of the join result records (because 
the previous join result has timestamp 4). So, which join result should be 
considered the latest? It should probably be the last one, right? In that case 
it would be extra wrong to materialize the join result as a versioned store, 
since a versioned store would think that the second-to-last join result is the 
latest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to