Interesting! That issue seems to be cache-specific: that two subsequent processors can be backed by the same cache (as in the join case).
I don't think loops are generally allowed in the subtopology, are they? If so, this code would indeed result in an infinite loop or possibly a concurrent modification exception. I was concerned that the remove might be sent to the buffer's changelog record collector and maybe sent to the broker, and then some exception might happen before the forward, resulting in the record being forgotten upon restart. I looked at some other processors, and they tend to do (logged) store operations first and then forward last. But then again, normal operations are forwarding a value that's a direct consequence of processing the _current_ record, which wouldn't have been committed and would therefore get re-processed upon restart. But the buffer is forwarding some older record, which has already been committed. Reprocessing the new record (which caused the eviction the first time) won't cause us to remember the old record, which we were supposed to emit. Under EOS, if we crash after the changelog update but before the forward, we'll be fine because the changelog update won't be visible (it'll be in an aborted transaction) on restart, so the buffer will go back to it's correct starting point for reprocessing the new record. If we can't be sure that Streams subtopologies are acyclic, then I reckon we'd better swap these two lines and tell people they'd better use EOS if they want to be protected from all crash corruption (which I think is true anyway). Otherwise, if subtopologies are acyclic, then I think it's better to leave it as is. WDYT? [ Full content available at: https://github.com/apache/kafka/pull/5693 ] This message was relayed via gitbox.apache.org for [email protected]
