vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1163422113


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = 
valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {

Review Comment:
   Sounds like your instinct is still that it's better to formalize our 
understanding that the output of a table-table join today is not versioned, by 
having the value getter reflect that. I trust your instinct here and am onboard 
-- just made the changes. 
   
   A benefit of this is that it is now easier to explain to users that any 
stateful transformation (aggregate, join, and also suppress -- if suppress 
should be considered stateful in the same sense) will cause a versioned table 
to no longer be versioned (unless the result is explicitly materialized with a 
versioned store, which should not be done for joins anyway). 
   
   The cost is that stream-(table-table) join does not get versioned semantics, 
but I think that's acceptable since (stream-table)-table does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to