mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1160906740


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = 
valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {

Review Comment:
   > For example, is caching a functional change?
   
   I would say, yes, because it impacts what intermediate results one gets. But 
I agree that the answer is not "black or white", because -> `In either case, 
the guarantee is that the latest (by offset) record is the "correct" value for 
the key, and the others should be effectively ignored.`)
   
   > but if they somehow had access to the inner/left/outer join processor 
(before the join merge processor) and called get(key, ts) on the 
KTableValueGetter, what should be returned?
   
   For this case, they won't query the result table, but the input table, right 
(and only indirectly, cf my comments below)? -> ` I think it's natural to 
return the correct older join result because we have it available.` You say 
"older join result" -- don't think they would get a join result, would they?
   
   Overall, it seems to be a very unlikely scenario anyway. But I would like to 
call out the following: in the end, the input table is maintained by an 
upstream processor, and the "join processor" just has access to it. Thus, if a 
user would tab into the "join processor" to indirectly access the upstream 
table store, it seem ok to me if they would actually get a different result. If 
they really want to query the table, they should not tab into the "join 
processor" but into the upstream processor that maintains the table.
   
   So to me it seems, the value getter should always use `get(k)` and not 
`get(k, ts)` to re-compute the "latest by timestamp" result, because it is the 
same result that a materialized downstream table would store? Not sure what the 
cleanest way to implement this is, but given how the "join processor" works, it 
basically get a versioned input ktable, and produced a non-versioned ktable and 
drops out-of-order records. So if we would only expose a value-getter that does 
not support `get(k, ts)` it would be reasonable to me. -- Also, we are talking 
about implementation details, not public API -- the public API is two input 
KTables and one output KTable -- the fact that we have 5 processor is internal, 
and we can change it any time -- so if users tap into any of the 5 internal 
processor, they don't have any guarantee that it will be backward compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to