This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d42462b21b93aa475ce9130ab5d26d57b1902027 Author: Zongwen Li <[email protected]> AuthorDate: Wed Jan 5 10:13:50 2022 +0800 [FLINK-25510][Connectors / Kafka][tests] Add using committed offsets test cases for KafkaPartitionSplitReader --- .../reader/KafkaPartitionSplitReaderTest.java | 99 ++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index 864d8e9..e014d6d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -38,11 +38,16 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -248,6 +253,100 @@ public class KafkaPartitionSplitReaderTest { assertTrue(recordsWithSplitIds.finishedSplits().isEmpty()); } + @Test + public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() { + final Properties props = new Properties(); + props.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset"); + KafkaPartitionSplitReader reader = + createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup()); + // Add a committed offset split and catch kafka exception + final KafkaException undefinedOffsetException = + Assertions.assertThrows( + KafkaException.class, + () -> + reader.handleSplitsChanges( + new SplitsAddition<>( + Collections.singletonList( + new KafkaPartitionSplit( + new TopicPartition(TOPIC1, 0), + KafkaPartitionSplit + .COMMITTED_OFFSET))))); + MatcherAssert.assertThat( + undefinedOffsetException.getMessage(), + CoreMatchers.containsString("Undefined offset with no reset policy for partition")); + } + + @Test + public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable { + MetricListener metricListener = new MetricListener(); + final Properties props = new Properties(); + props.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.name().toLowerCase()); + props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); + props.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, + "using-committed-offset-with-earliest-offset-reset"); + KafkaPartitionSplitReader reader = + createReader( + props, + InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); + // Add a committed offset split + reader.handleSplitsChanges( + new SplitsAddition<>( + Collections.singletonList( + new KafkaPartitionSplit( + new TopicPartition(TOPIC1, 0), + KafkaPartitionSplit.COMMITTED_OFFSET)))); + // pendingRecords should have not been registered because of lazily registration + assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent()); + // Trigger first fetch + reader.fetch(); + final Optional<Gauge<Long>> pendingRecords = + metricListener.getGauge(MetricNames.PENDING_RECORDS); + assertTrue(pendingRecords.isPresent()); + // Validate pendingRecords + assertNotNull(pendingRecords); + assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue()); + for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) { + reader.fetch(); + assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue()); + } + } + + @Test + public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable { + final Properties props = new Properties(); + props.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.LATEST.name().toLowerCase()); + props.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset"); + KafkaPartitionSplitReader reader = + createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup()); + // Add empty latest offset reset split + final KafkaPartitionSplit latestOffsetResetEmptySplit = + new KafkaPartitionSplit( + new TopicPartition(TOPIC1, 0), + KafkaPartitionSplit.COMMITTED_OFFSET, + KafkaPartitionSplit.LATEST_OFFSET); + final KafkaPartitionSplit latestOffsetResetNormalSplit = + new KafkaPartitionSplit( + new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET); + + reader.handleSplitsChanges( + new SplitsAddition<>( + Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit))); + + // Fetch and check latest offset reset split is added to finished splits + RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch(); + assertTrue( + recordsWithSplitIds + .finishedSplits() + .contains(latestOffsetResetEmptySplit.splitId())); + } + // ------------------ private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, int readerId)
