This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new f292e1960b9 KAFKA-14260: add `synchronized` to `prefixScan` method 
(#12893)
f292e1960b9 is described below

commit f292e1960b931922a7a4be19424fd1e4580d9264
Author: Lucia Cerchie <[email protected]>
AuthorDate: Tue Dec 6 20:39:32 2022 -0700

    KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
    
    As a result of "14260: InMemoryKeyValueStore iterator still throws 
ConcurrentModificationException", I'm adding synchronized to prefixScan as an 
alternative to going back to the ConcurrentSkipList.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../state/internals/InMemoryKeyValueStore.java     |  2 +-
 .../state/internals/AbstractKeyValueStoreTest.java | 23 +++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 98f377d0b24..7599bff82b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -169,7 +169,7 @@ public class InMemoryKeyValueStore implements 
KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
-    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+    public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes, 
byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
 
         final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
         final Bytes to = Bytes.increment(from);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 9e0fb306f28..4989b6cefb4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -55,7 +56,6 @@ import static org.junit.Assert.fail;
 public abstract class AbstractKeyValueStoreTest {
 
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final 
StateStoreContext context);
-
     protected InternalMockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
     protected KeyValueStoreTestDriver<Integer, String> driver;
@@ -648,4 +648,25 @@ public abstract class AbstractKeyValueStoreTest {
             );
         }
     }
+
+    @Test
+    public void prefixScanShouldNotThrowConcurrentModificationException() {
+
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(222, "two-hundred-twenty-two");
+        store.put(2, "two");
+        store.put(22, "twenty-two");
+        store.put(3, "three");
+
+        try (final KeyValueIterator<Integer, String> iter = 
store.prefixScan(2, new IntegerSerializer())) {
+
+            store.delete(22);
+
+            while (iter.hasNext()) {
+                iter.next();
+            }
+        }
+    }                  
 }
+

Reply via email to