[ 
https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612164#comment-17612164
 ] 

Guozhang Wang edited comment on KAFKA-14260 at 10/3/22 3:50 AM:
----------------------------------------------------------------

Hello [~aviperksy] sorry for the late reply! I looked at the code again and I 
think I agree with you --- we are probably looking at different versions of the 
source code since in latest trunk, line 125 seems irrelevant 
(https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java#L125)
 --- but the thing is that at the time when this line was called:

{code}
if (forward) {
                this.iter = new TreeSet<>(keySet).iterator();
            } else {
                this.iter = new TreeSet<>(keySet).descendingIterator();
            }
{code}

in which the constructor of {{TreeSet}} loops over the {{keySet}}, if that 
{{keySet}}'s underlying map is modified then we will still have an issue. As 
for the fix, I think in the near term we'd have to bite the bullet of 
performance can turn back to ConcurrentSkipListMap (we may tune some initial 
params e.g. using concurrencyLevel of 1). In the long term, I think we could 
leverage on similar ideas we are pursuing for transactional state stores, where 
we keep two in-memory maps, and the first map is read-only for IQ, and second 
is used to maintain deltas within a commit interval and during processing, we'd 
need to read both maps, and upon committing we lock the first one to apply the 
deltas.

cc [~ableegoldman] what do you think?


was (Author: guozhang):
Hello [~aviperksy] sorry for the late reply!

> InMemoryKeyValueStore iterator still throws ConcurrentModificationException
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-14260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14260
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1, 3.2.3
>            Reporter: Avi Cherry
>            Priority: Major
>
> This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802.
> Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a 
> ConcurrentModificationException because the backing map is not concurrent 
> safe. I expect that this only happens when the store is retrieved from 
> {{KafkaStreams.store()}} from outside of the topology since any usage of the 
> store from inside of the topology should be naturally single-threaded.
> To start off, a reminder that this behaviour explicitly violates the 
> interface contract for {{ReadOnlyKeyValueStore}} which states
> {quote}The returned iterator must be safe from 
> java.util.ConcurrentModificationExceptions
> {quote}
> It is often complicated to make code to demonstrate concurrency bugs, but 
> thankfully it is trivial to reason through the source code in 
> {{InMemoryKeyValueStore.java}} to show why this happens:
>  * All of the InMemoryKeyValueStore methods that return iterators do so by 
> passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator 
> constructor.
>  * These keySets are all VIEWS of the backing map, not copies.
>  * The InMemoryKeyValueIterator then makes a private copy of the keySet by 
> passing the original keySet into the constructor for TreeSet. This copying 
> was implemented in KAFKA-8802, incorrectly intending it to fix the 
> concurrency problem.
>  * TreeSet then iterates over the keySet to make a copy. If the original 
> backing TreeMap in InMemoryKeyValueStore is changed while this copy is being 
> created it will fail-fast a ConcurrentModificationException.
> This bug should be able to be trivially fixed by replacing the backing 
> TreeMap with a ConcurrentSkipListMap but here's the rub:
> This bug has already been found in KAFKA-7912 and the TreeMap was replaced 
> with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in 
> KAFKA-8802 because of the performance regression. I can [see from one of the 
> PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed]
>  that it was believed the concurrency problem with the TreeMap implementation 
> was fixed by copying the keyset when the iterator is created but the problem 
> remains, plus the fix creates an extra copy of the iterated portion of the 
> set in memory.
> For what it's worth, the performance difference between TreeMap and 
> ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar 
> ~2x speed through all operations with any size of data, but at the cost of 
> what turned out to be an easy-to-encounter bug.
> This is all unfortunate since the only time the state stores ever get 
> accessed concurrently is through the `KafkaStreams.store()` mechanism, but I 
> would imagine that "correct and slightly slower) is better than "incorrect 
> and faster".
> Too bad BoilerBay's AirConcurrentMap is closed-source and patented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to