vcrfxia commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1171925337


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##########
@@ -116,10 +118,13 @@ public void process(final Record<KIn, Change<VIn>> 
record) {
             }
 
             // update the store with the new value
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+            // if not put to store, do not forward downstream either
+            if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {

Review Comment:
   Is it valid for a computed aggregation to be null? If it is, I think it's 
actually not guaranteed that aggregation result timestamps are always 
nondecreasing. If an out-of-order record arrives after the `newAgg` value is 
set to null, then the aggregation result will be re-initialized, and the 
timestamp will be the timestamp of the out-of-order record, which is earlier 
than the previous aggregation timestamp.
   
   If that scenario is possible, then it's a bit unclear which of the two 
should be considered the "latest" aggregation value, and we should chose 
whether to keep the current code or to always set `isLatest = true` depending 
on what semantics we want.
   
   If that scenario is not possible, i.e., if it is not valid for an 
aggregation value to be null, then the two are equivalent and I can update this 
code to clean it up per your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to