This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 67120f26a140f5390e310a05a2ce0cf1a5c22e8c Author: Matthias J. Sax <[email protected]> AuthorDate: Mon Nov 25 12:03:57 2024 -0800 KAFKA-17299: add unit tests for previous fix (#17919) https://github.com/apache/kafka/pull/17899 fixed the issue, but did not add any unit tests. Reviewers: Bill Bejeck <[email protected]> --- .../processor/internals/PartitionGroup.java | 3 +- .../processor/internals/PartitionGroupTest.java | 82 ++++++++++++++++++ .../processor/internals/StreamTaskTest.java | 96 ++++++++++++++++++---- 3 files changed, 164 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index ac85a17ca0e..a3a624b4b0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -249,10 +249,11 @@ class PartitionGroup extends AbstractPartitionGroup { if (queue != null) { // get the first record from this queue. + final int oldSize = queue.size(); record = queue.poll(wallClockTime); if (record != null) { - --totalBuffered; + totalBuffered -= oldSize - queue.size(); if (queue.isEmpty()) { // if a certain queue has been drained, reset the flag diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 9396faa1be1..486b0d7a325 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -559,6 +559,88 @@ public class PartitionGroupTest { assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), nullValue()); // all available records removed } + @Test + public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() { + final PartitionGroup group = new PartitionGroup( + logContext, + mkMap(mkEntry(partition1, queue1)), + tp -> OptionalLong.of(0L), + getValueSensor(metrics, lastLatenessValue), + enforcedProcessingSensor, + maxTaskIdleMs + ); + final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue), + new ConsumerRecord<>( + "topic", + 1, + -1, // offset as invalid timestamp + -1, // invalid timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + 11, + 0, + TimestampType.CREATE_TIME, + 0, + 0, + new byte[0], // corrupted key + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + -1, // offset as invalid timestamp + -1, // invalid timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + 13, + 0, + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + new byte[0], // corrupted value + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue) + ); + + group.addRawRecords(partition1, list1); + assertEquals(7, group.numBuffered()); + + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(6, group.numBuffered()); + + // drain corrupted records + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(1, group.numBuffered()); + + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(0, group.numBuffered()); + } + @Test public void shouldNeverWaitIfIdlingIsDisabled() { final PartitionGroup group = new PartitionGroup( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 875c7440b0f..3a2ecafe469 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -159,8 +159,8 @@ public class StreamTaskTest { private final LogContext logContext = new LogContext("[test] "); private final String topic1 = "topic1"; private final String topic2 = "topic2"; - private final TopicPartition partition1 = new TopicPartition(topic1, 1); - private final TopicPartition partition2 = new TopicPartition(topic2, 1); + private final TopicPartition partition1 = new TopicPartition(topic1, 0); + private final TopicPartition partition2 = new TopicPartition(topic2, 0); private final Set<TopicPartition> partitions = mkSet(partition1, partition2); private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); @@ -1111,6 +1111,70 @@ public class StreamTaskTest { assertEquals(0, consumer.paused().size()); } + @Test + public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig( + StreamsConfig.AT_LEAST_ONCE, + "-1", + LogAndContinueExceptionHandler.class, + LogAndFailProcessingExceptionHandler.class, + LogAndSkipOnInvalidTimestamp.class + )); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + task.addRecords(partition1, asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, 10), + getConsumerRecordWithOffsetAsTimestamp(partition1, 20), + getConsumerRecordWithInvalidTimestamp(30), + getConsumerRecordWithInvalidTimestamp(40), + getConsumerRecordWithInvalidTimestamp(50) + )); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertEquals(0, consumer.paused().size()); + + assertTrue(task.process(0L)); // drain head record (ie, last invalid record) + assertFalse(task.process(0L)); + assertFalse(task.hasRecordsQueued()); + + + // repeat test for deserialization error + task.resumePollingForPartitionsWithAvailableSpace(); + task.addRecords(partition1, asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, 110), + getConsumerRecordWithOffsetAsTimestamp(partition1, 120), + getCorruptedConsumerRecordWithOffsetAsTimestamp(130), + getCorruptedConsumerRecordWithOffsetAsTimestamp(140), + getCorruptedConsumerRecordWithOffsetAsTimestamp(150) + )); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertEquals(0, consumer.paused().size()); + + assertTrue(task.process(0L)); // drain head record (ie, last corrupted record) + assertFalse(task.process(0L)); + assertFalse(task.hasRecordsQueued()); + } + @Test public void shouldPunctuateOnceStreamTimeAfterGap() { when(stateManager.taskId()).thenReturn(taskId); @@ -3306,7 +3370,7 @@ public class StreamTaskTest { private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, offset, // use the offset as the timestamp TimestampType.CREATE_TIME, @@ -3322,7 +3386,7 @@ public class StreamTaskTest { private ConsumerRecord<byte[], byte[]> getConsumerRecordWithInvalidTimestamp(final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, -1L, // invalid (negative) timestamp TimestampType.CREATE_TIME, @@ -3339,24 +3403,24 @@ public class StreamTaskTest { final long offset, final int leaderEpoch) { return new ConsumerRecord<>( - topicPartition.topic(), - topicPartition.partition(), - offset, - offset, // use the offset as the timestamp - TimestampType.CREATE_TIME, - 0, - 0, - recordKey, - recordValue, - new RecordHeaders(), - Optional.of(leaderEpoch) + topicPartition.topic(), + topicPartition.partition(), + offset, + offset, // use the offset as the timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.of(leaderEpoch) ); } private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, offset, // use the offset as the timestamp TimestampType.CREATE_TIME,
