vcrfxia commented on code in PR #13564: URL: https://github.com/apache/kafka/pull/13564#discussion_r1171925337
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java: ########## @@ -116,10 +118,13 @@ public void process(final Record<KIn, Change<VIn>> record) { } // update the store with the new value - store.put(record.key(), newAgg, newTimestamp); - tupleForwarder.maybeForward( - record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null)) - .withTimestamp(newTimestamp)); + final long putReturnCode = store.put(record.key(), newAgg, newTimestamp); + // if not put to store, do not forward downstream either + if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) { Review Comment: Is it valid for a computed aggregation to be null? If it is, I think it's actually not guaranteed that aggregation result timestamps are always nondecreasing. If an out-of-order record arrives after the `newAgg` value is set to null, then the aggregation result will be re-initialized, and the timestamp will be the timestamp of the out-of-order record, which is earlier than the previous aggregation timestamp. If that scenario is possible, then it's a bit unclear which of the two should be considered the "latest" aggregation value, and we should chose whether to keep the current code or to always set `isLatest = true` depending on what semantics we want. If that scenario is not possible, i.e., if it is not valid for an aggregation value to be null, then the two are equivalent and I can update this code to clean it up per your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org