showuon commented on code in PR #13512:
URL: https://github.com/apache/kafka/pull/13512#discussion_r1159573303


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -644,14 +644,15 @@ private int restoreChangelog(final Task task, final 
ChangelogMetadata changelogM
         final int numRecords = changelogMetadata.bufferedLimitIndex;
 
         if (numRecords != 0) {
-            final List<ConsumerRecord<byte[], byte[]>> records = 
changelogMetadata.bufferedRecords.subList(0, numRecords);
+            final List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords));

Review Comment:
   We'll get concurrent modification for the list exception in L660 
`records.size()` since there are also other threads updating buffer records. 
Fixing it by creating a clone of the list, and remove all from the 
`bufferedRecords` below. We can optimize it later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to