showuon commented on code in PR #13512: URL: https://github.com/apache/kafka/pull/13512#discussion_r1159573303
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -644,14 +644,15 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM final int numRecords = changelogMetadata.bufferedLimitIndex; if (numRecords != 0) { - final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords); + final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords)); Review Comment: We'll get concurrent modification for the list exception in L660 `records.size()` since there are also other threads updating buffer records. Fixing it by creating a clone of the list, and remove all from the `bufferedRecords` below. We can optimize it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org