This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95947d2f581 KAFKA-17299: add unit tests for previous fix (#17919)
95947d2f581 is described below

commit 95947d2f581cce85c25ef9eb3501dc13d09ae05a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Nov 25 12:03:57 2024 -0800

    KAFKA-17299: add unit tests for previous fix (#17919)
    
    https://github.com/apache/kafka/pull/17899 fixed the issue, but did not
    add any unit tests.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../processor/internals/PartitionGroup.java        |  3 +-
 .../processor/internals/PartitionGroupTest.java    | 82 ++++++++++++++++++
 .../processor/internals/StreamTaskTest.java        | 96 ++++++++++++++++++----
 3 files changed, 164 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 5e57efb9628..5fb313ff605 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -250,10 +250,11 @@ class PartitionGroup extends AbstractPartitionGroup {
 
         if (queue != null) {
             // get the first record from this queue.
+            final int oldSize = queue.size();
             record = queue.poll(wallClockTime);
 
             if (record != null) {
-                --totalBuffered;
+                totalBuffered -= oldSize - queue.size();
 
                 if (queue.isEmpty()) {
                     // if a certain queue has been drained, reset the flag
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 1f4fec19484..95a5210ae34 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -606,6 +606,88 @@ public class PartitionGroupTest {
         assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), 
nullValue());  // all available records removed
     }
 
+    @Test
+    public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() {
+        final PartitionGroup group = new PartitionGroup(
+            logContext,
+            mkMap(mkEntry(partition1, queue1)),
+            tp -> OptionalLong.of(0L),
+            getValueSensor(metrics, lastLatenessValue),
+            enforcedProcessingSensor,
+            maxTaskIdleMs
+        );
+        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue),
+            new ConsumerRecord<>(
+                "topic",
+                1,
+                -1, // offset as invalid timestamp
+                -1, // invalid timestamp
+                TimestampType.CREATE_TIME,
+                0,
+                0,
+                recordKey,
+                recordValue,
+                new RecordHeaders(),
+                Optional.empty()
+            ),
+            new ConsumerRecord<>(
+                "topic",
+                1,
+                11,
+                0,
+                TimestampType.CREATE_TIME,
+                0,
+                0,
+                new byte[0], // corrupted key
+                recordValue,
+                new RecordHeaders(),
+                Optional.empty()
+            ),
+            new ConsumerRecord<>(
+                "topic",
+                1,
+                -1, // offset as invalid timestamp
+                -1, // invalid timestamp
+                TimestampType.CREATE_TIME,
+                0,
+                0,
+                recordKey,
+                recordValue,
+                new RecordHeaders(),
+                Optional.empty()
+            ),
+            new ConsumerRecord<>(
+                "topic",
+                1,
+                13,
+                0,
+                TimestampType.CREATE_TIME,
+                0,
+                0,
+                recordKey,
+                new byte[0], // corrupted value
+                new RecordHeaders(),
+                Optional.empty()
+            ),
+            new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue)
+        );
+
+        group.addRawRecords(partition1, list1);
+        assertEquals(7, group.numBuffered());
+
+        group.nextRecord(new RecordInfo(), time.milliseconds());
+        assertEquals(6, group.numBuffered());
+
+        // drain corrupted records
+        group.nextRecord(new RecordInfo(), time.milliseconds());
+        assertEquals(1, group.numBuffered());
+
+        group.nextRecord(new RecordInfo(), time.milliseconds());
+        assertEquals(0, group.numBuffered());
+    }
+
     @Test
     public void shouldNeverWaitIfIdlingIsDisabled() {
         final PartitionGroup group = new PartitionGroup(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8555e4d065b..5dab5329026 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -157,8 +157,8 @@ public class StreamTaskTest {
     private final LogContext logContext = new LogContext("[test] ");
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
-    private final TopicPartition partition1 = new TopicPartition(topic1, 1);
-    private final TopicPartition partition2 = new TopicPartition(topic2, 1);
+    private final TopicPartition partition1 = new TopicPartition(topic1, 0);
+    private final TopicPartition partition2 = new TopicPartition(topic2, 0);
     private final Set<TopicPartition> partitions = new 
HashSet<>(List.of(partition1, partition2));
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new 
IntegerDeserializer();
@@ -1082,6 +1082,70 @@ public class StreamTaskTest {
         assertEquals(0, consumer.paused().size());
     }
 
+    @Test
+    public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(
+            StreamsConfig.AT_LEAST_ONCE,
+            "-1",
+            LogAndContinueExceptionHandler.class,
+            LogAndFailProcessingExceptionHandler.class,
+            LogAndSkipOnInvalidTimestamp.class
+        ));
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+
+        task.addRecords(partition1, asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+            getConsumerRecordWithInvalidTimestamp(30),
+            getConsumerRecordWithInvalidTimestamp(40),
+            getConsumerRecordWithInvalidTimestamp(50)
+        ));
+        assertTrue(consumer.paused().contains(partition1));
+
+        assertTrue(task.process(0L));
+
+        task.resumePollingForPartitionsWithAvailableSpace();
+        assertTrue(consumer.paused().contains(partition1));
+
+        assertTrue(task.process(0L));
+
+        task.resumePollingForPartitionsWithAvailableSpace();
+        assertEquals(0, consumer.paused().size());
+
+        assertTrue(task.process(0L)); // drain head record (ie, last invalid 
record)
+        assertFalse(task.process(0L));
+        assertFalse(task.hasRecordsQueued());
+
+
+        // repeat test for deserialization error
+        task.resumePollingForPartitionsWithAvailableSpace();
+        task.addRecords(partition1, asList(
+            getConsumerRecordWithOffsetAsTimestamp(partition1, 110),
+            getConsumerRecordWithOffsetAsTimestamp(partition1, 120),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(130),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(140),
+            getCorruptedConsumerRecordWithOffsetAsTimestamp(150)
+        ));
+        assertTrue(consumer.paused().contains(partition1));
+
+        assertTrue(task.process(0L));
+
+        task.resumePollingForPartitionsWithAvailableSpace();
+        assertTrue(consumer.paused().contains(partition1));
+
+        assertTrue(task.process(0L));
+
+        task.resumePollingForPartitionsWithAvailableSpace();
+        assertEquals(0, consumer.paused().size());
+
+        assertTrue(task.process(0L)); // drain head record (ie, last corrupted 
record)
+        assertFalse(task.process(0L));
+        assertFalse(task.hasRecordsQueued());
+    }
+
     @Test
     public void shouldPunctuateOnceStreamTimeAfterGap() {
         when(stateManager.taskId()).thenReturn(taskId);
@@ -3314,7 +3378,7 @@ public class StreamTaskTest {
     private ConsumerRecord<byte[], byte[]> 
getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) {
         return new ConsumerRecord<>(
             topic1,
-            1,
+            0,
             offset,
             offset, // use the offset as the timestamp
             TimestampType.CREATE_TIME,
@@ -3330,7 +3394,7 @@ public class StreamTaskTest {
     private ConsumerRecord<byte[], byte[]> 
getConsumerRecordWithInvalidTimestamp(final long offset) {
         return new ConsumerRecord<>(
             topic1,
-            1,
+            0,
             offset,
             -1L, // invalid (negative) timestamp
             TimestampType.CREATE_TIME,
@@ -3347,24 +3411,24 @@ public class StreamTaskTest {
                                                                                
                  final long offset,
                                                                                
                  final int leaderEpoch) {
         return new ConsumerRecord<>(
-                topicPartition.topic(),
-                topicPartition.partition(),
-                offset,
-                offset, // use the offset as the timestamp
-                TimestampType.CREATE_TIME,
-                0,
-                0,
-                recordKey,
-                recordValue,
-                new RecordHeaders(),
-                Optional.of(leaderEpoch)
+            topicPartition.topic(),
+            topicPartition.partition(),
+            offset,
+            offset, // use the offset as the timestamp
+            TimestampType.CREATE_TIME,
+            0,
+            0,
+            recordKey,
+            recordValue,
+            new RecordHeaders(),
+            Optional.of(leaderEpoch)
         );
     }
 
     private ConsumerRecord<byte[], byte[]> 
getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) {
         return new ConsumerRecord<>(
             topic1,
-            1,
+            0,
             offset,
             offset, // use the offset as the timestamp
             TimestampType.CREATE_TIME,

Reply via email to