This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 365e34dd2b0f154a89a2300a2ccf00cc7b25fc7f Author: Micah Paul Ramos <[email protected]> AuthorDate: Mon Sep 28 12:56:36 2020 -0700 KAFKA-9584: Fix Headers ConcurrentModificationException in Streams (#8181) Avoid forwarding a shared reference to the record context in punctuate calls. Note, this fix isn't airtight, since all processors triggered by a single punctuate call will still see the same reference to the record context. It's also not a terribly principled approach, since the context is still technically not defined, but this is about the best we can do without significant refactoring. We will probably follow up with a more comprehensive solution, but this should avoid the issue for most programs. Reviewers: Matthias J. Sax <[email protected]>, John Roesler <[email protected]> --- .../kafka/streams/processor/internals/StreamTask.java | 4 ++-- .../kafka/streams/processor/internals/StreamTaskTest.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index fe0d7e3..b621420 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -72,7 +72,6 @@ import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordL */ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator { - private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null); // visible for testing static final byte LATEST_MAGIC_BYTE = 1; @@ -484,7 +483,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix)); } - updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node); + updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null), + timestamp), node); if (log.isTraceEnabled()) { log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 8c4e66d..e399360 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -986,6 +986,20 @@ public class StreamTaskTest { } @Test + public void shouldNotShareHeadersBetweenPunctuateIterations() { + task = createStatelessTask(createConfig(false)); + task.initializeMetadata(); + task.initializeTopology(); + + task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> { + task.processorContext.recordContext().headers().add("dummy", (byte[]) null); + }); + task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> { + assertFalse(task.processorContext.recordContext().headers().iterator().hasNext()); + }); + } + + @Test public void shouldCheckpointOffsetsOnCommit() throws IOException { task = createStatefulTask(createConfig(false), true); task.initializeStateStores();
