Interesting! That issue seems to be cache-specific: that two subsequent 
processors can be backed by the same cache (as in the join case).

I don't think loops are generally allowed in the subtopology, are they? If so, 
this code would indeed result in an infinite loop or possibly a concurrent 
modification exception.

I was concerned that the remove might be sent to the buffer's changelog record 
collector and maybe sent to the broker, and then some exception might happen 
before the forward, resulting in the record being forgotten upon restart.

I looked at some other processors, and they tend to do (logged) store 
operations first and then forward last. But then again, normal operations are 
forwarding a value that's a direct consequence of processing the _current_ 
record, which wouldn't have been committed and would therefore get re-processed 
upon restart.

But the buffer is forwarding some older record, which has already been 
committed. Reprocessing the new record (which caused the eviction the first 
time) won't cause us to remember the old record, which we were supposed to emit.

Under EOS, if we crash after the changelog update but before the forward, we'll 
be fine because the changelog update won't be visible (it'll be in an aborted 
transaction) on restart, so the buffer will go back to it's correct starting 
point for reprocessing the new record.

If we can't be sure that Streams subtopologies are acyclic, then I reckon we'd 
better swap these two lines and tell people they'd better use EOS if they want 
to be protected from all crash corruption (which I think is true anyway).
Otherwise, if subtopologies are acyclic, then I think it's better to leave it 
as is.

WDYT?

[ Full content available at: https://github.com/apache/kafka/pull/5693 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to