This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 365e34dd2b0f154a89a2300a2ccf00cc7b25fc7f
Author: Micah Paul Ramos <[email protected]>
AuthorDate: Mon Sep 28 12:56:36 2020 -0700

    KAFKA-9584: Fix Headers ConcurrentModificationException in Streams (#8181)
    
    Avoid forwarding a shared reference to the record context in punctuate 
calls.
    Note, this fix isn't airtight, since all processors triggered by a single 
punctuate
    call will still see the same reference to the record context. It's also not 
a terribly
    principled approach, since the context is still technically not defined, 
but this
    is about the best we can do without significant refactoring. We will 
probably
    follow up with a more comprehensive solution, but this should avoid the 
issue
    for most programs.
    
    Reviewers: Matthias J. Sax <[email protected]>, John Roesler 
<[email protected]>
---
 .../kafka/streams/processor/internals/StreamTask.java      |  4 ++--
 .../kafka/streams/processor/internals/StreamTaskTest.java  | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index fe0d7e3..b621420 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -72,7 +72,6 @@ import static 
org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordL
  */
 public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator {
 
-    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
     // visible for testing
     static final byte LATEST_MAGIC_BYTE = 1;
 
@@ -484,7 +483,8 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
             throw new IllegalStateException(String.format("%sCurrent node is 
not null", logPrefix));
         }
 
-        updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), 
node);
+        updateProcessorContext(new StampedRecord(new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
+            timestamp), node);
 
         if (log.isTraceEnabled()) {
             log.trace("Punctuating processor {} with timestamp {} and 
punctuation type {}", node.name(), timestamp, type);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8c4e66d..e399360 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -986,6 +986,20 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldNotShareHeadersBetweenPunctuateIterations() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeMetadata();
+        task.initializeTopology();
+
+        task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
+            task.processorContext.recordContext().headers().add("dummy", 
(byte[]) null);
+        });
+        task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
+            
assertFalse(task.processorContext.recordContext().headers().iterator().hasNext());
+        });
+    }
+
+    @Test
     public void shouldCheckpointOffsetsOnCommit() throws IOException {
         task = createStatefulTask(createConfig(false), true);
         task.initializeStateStores();

Reply via email to