This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 96d64af3d80eadacf082e3e8ed5725420407471b Author: Matthias J. Sax <[email protected]> AuthorDate: Mon Nov 25 12:03:57 2024 -0800 KAFKA-17299: add unit tests for previous fix (#17919) https://github.com/apache/kafka/pull/17899 fixed the issue, but did not add any unit tests. Reviewers: Bill Bejeck <[email protected]> --- .../processor/internals/PartitionGroup.java | 3 +- .../processor/internals/PartitionGroupTest.java | 85 ++++++++++++++++++++++ .../processor/internals/StreamTaskTest.java | 71 ++++++++++++++++-- 3 files changed, 153 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 4852ba97932..ea88500ba0f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -248,10 +248,11 @@ class PartitionGroup extends AbstractPartitionGroup { if (queue != null) { // get the first record from this queue. + final int oldSize = queue.size(); record = queue.poll(wallClockTime); if (record != null) { - --totalBuffered; + totalBuffered -= oldSize - queue.size(); if (queue.isEmpty()) { // if a certain queue has been drained, reset the flag diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index ecdf5b7fff0..64b4918e56a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -19,9 +19,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Value; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -43,6 +45,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; import java.util.UUID; @@ -558,6 +561,88 @@ public class PartitionGroupTest { assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), nullValue()); // all available records removed } + @Test + public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() { + final PartitionGroup group = new PartitionGroup( + logContext, + mkMap(mkEntry(partition1, queue1)), + tp -> OptionalLong.of(0L), + getValueSensor(metrics, lastLatenessValue), + enforcedProcessingSensor, + maxTaskIdleMs + ); + final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue), + new ConsumerRecord<>( + "topic", + 1, + -1, // offset as invalid timestamp + -1, // invalid timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + 11, + 0, + TimestampType.CREATE_TIME, + 0, + 0, + new byte[0], // corrupted key + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + -1, // offset as invalid timestamp + -1, // invalid timestamp + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + recordValue, + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>( + "topic", + 1, + 13, + 0, + TimestampType.CREATE_TIME, + 0, + 0, + recordKey, + new byte[0], // corrupted value + new RecordHeaders(), + Optional.empty() + ), + new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue) + ); + + group.addRawRecords(partition1, list1); + assertEquals(7, group.numBuffered()); + + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(6, group.numBuffered()); + + // drain corrupted records + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(1, group.numBuffered()); + + group.nextRecord(new RecordInfo(), time.milliseconds()); + assertEquals(0, group.numBuffered()); + } + @Test public void shouldNeverWaitIfIdlingIsDisabled() { final PartitionGroup group = new PartitionGroup( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 3cc4be8c73c..6d70fdb4744 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -145,8 +145,8 @@ public class StreamTaskTest { private final LogContext logContext = new LogContext("[test] "); private final String topic1 = "topic1"; private final String topic2 = "topic2"; - private final TopicPartition partition1 = new TopicPartition(topic1, 1); - private final TopicPartition partition2 = new TopicPartition(topic2, 1); + private final TopicPartition partition1 = new TopicPartition(topic1, 0); + private final TopicPartition partition2 = new TopicPartition(topic2, 0); private final Set<TopicPartition> partitions = mkSet(partition1, partition2); private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); @@ -1064,6 +1064,67 @@ public class StreamTaskTest { assertEquals(0, consumer.paused().size()); } + @Test + public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() { + task = createStatelessTask(createConfig( + StreamsConfig.AT_LEAST_ONCE, + "-1", + LogAndContinueExceptionHandler.class, + LogAndSkipOnInvalidTimestamp.class + )); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + task.addRecords(partition1, asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, 10), + getConsumerRecordWithOffsetAsTimestamp(partition1, 20), + getConsumerRecordWithInvalidTimestamp(30), + getConsumerRecordWithInvalidTimestamp(40), + getConsumerRecordWithInvalidTimestamp(50) + )); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertEquals(0, consumer.paused().size()); + + assertTrue(task.process(0L)); // drain head record (ie, last invalid record) + assertFalse(task.process(0L)); + assertFalse(task.hasRecordsQueued()); + + + // repeat test for deserialization error + task.resumePollingForPartitionsWithAvailableSpace(); + task.addRecords(partition1, asList( + getConsumerRecordWithOffsetAsTimestamp(partition1, 110), + getConsumerRecordWithOffsetAsTimestamp(partition1, 120), + getCorruptedConsumerRecordWithOffsetAsTimestamp(130), + getCorruptedConsumerRecordWithOffsetAsTimestamp(140), + getCorruptedConsumerRecordWithOffsetAsTimestamp(150) + )); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertTrue(consumer.paused().contains(partition1)); + + assertTrue(task.process(0L)); + + task.resumePollingForPartitionsWithAvailableSpace(); + assertEquals(0, consumer.paused().size()); + + assertTrue(task.process(0L)); // drain head record (ie, last corrupted record) + assertFalse(task.process(0L)); + assertFalse(task.hasRecordsQueued()); + } + @Test public void shouldPunctuateOnceStreamTimeAfterGap() { task = createStatelessTask(createConfig()); @@ -3145,7 +3206,7 @@ public class StreamTaskTest { private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, offset, // use the offset as the timestamp TimestampType.CREATE_TIME, @@ -3161,7 +3222,7 @@ public class StreamTaskTest { private ConsumerRecord<byte[], byte[]> getConsumerRecordWithInvalidTimestamp(final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, -1L, // invalid (negative) timestamp TimestampType.CREATE_TIME, @@ -3177,7 +3238,7 @@ public class StreamTaskTest { private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) { return new ConsumerRecord<>( topic1, - 1, + 0, offset, offset, // use the offset as the timestamp TimestampType.CREATE_TIME,
