vcrfxia commented on code in PR #13496: URL: https://github.com/apache/kafka/pull/13496#discussion_r1163422113
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ########## @@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) { @Override public ValueAndTimestamp<VOut> get(final K key) { - final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key); + return computeJoin(key, valueGetter1::get, valueGetter2::get); + } + + @Override + public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) { Review Comment: Sounds like your instinct is still that it's better to formalize our understanding that the output of a table-table join today is not versioned, by having the value getter reflect that. I trust your instinct here and am onboard -- just made the changes. A benefit of this is that it is now easier to explain to users that any stateful transformation (aggregate, join, and also suppress -- if suppress should be considered stateful in the same sense) will cause a versioned table to no longer be versioned (unless the result is explicitly materialized with a versioned store, which should not be done for joins anyway). The cost is that stream-(table-table) join does not get versioned semantics, but I think that's acceptable since (stream-table)-table does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org