This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e615106b38a289bc624a8554b86c83f9785352d3 Author: Zongwen Li <[email protected]> AuthorDate: Thu Jan 6 16:31:54 2022 +0800 [FLINK-25510][Connectors / Kafka][tests] Update the validation method and add comments --- .../source/reader/KafkaPartitionSplitReader.java | 6 ++ .../reader/KafkaPartitionSplitReaderTest.java | 80 +++++----------------- 2 files changed, 23 insertions(+), 63 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index ebadef3..ee7183d 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source.reader; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -220,6 +221,11 @@ public class KafkaPartitionSplitReader consumer.commitAsync(offsetsToCommit, offsetCommitCallback); } + @VisibleForTesting + KafkaConsumer<byte[], byte[]> consumer() { + return consumer; + } + // --------------- private helper method ---------------------- private void parseStartingOffsets( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index e014d6d..7aecc33 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -51,6 +50,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EmptySource; import org.junit.jupiter.params.provider.ValueSource; @@ -260,7 +260,10 @@ public class KafkaPartitionSplitReaderTest { ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-none-offset-reset"); KafkaPartitionSplitReader reader = createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup()); - // Add a committed offset split and catch kafka exception + // We expect that there is a committed offset, but the group does not actually have a + // committed offset, and the offset reset strategy is none (Throw exception to the consumer + // if no previous offset is found for the consumer's group); + // So it is expected to throw an exception that missing the committed offset. final KafkaException undefinedOffsetException = Assertions.assertThrows( KafkaException.class, @@ -277,74 +280,25 @@ public class KafkaPartitionSplitReaderTest { CoreMatchers.containsString("Undefined offset with no reset policy for partition")); } - @Test - public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() throws Throwable { - MetricListener metricListener = new MetricListener(); + @ParameterizedTest + @CsvSource({"earliest, 0", "latest, 10"}) + public void testUsingCommittedOffsetsWithEarliestOrLatestOffsetResetStrategy( + String offsetResetStrategy, Long expectedOffset) { final Properties props = new Properties(); - props.setProperty( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetResetStrategy.EARLIEST.name().toLowerCase()); - props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); - props.setProperty( - ConsumerConfig.GROUP_ID_CONFIG, - "using-committed-offset-with-earliest-offset-reset"); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetResetStrategy); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset"); KafkaPartitionSplitReader reader = - createReader( - props, - InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); - // Add a committed offset split + createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup()); + // Add committed offset split + final TopicPartition partition = new TopicPartition(TOPIC1, 0); reader.handleSplitsChanges( new SplitsAddition<>( Collections.singletonList( new KafkaPartitionSplit( - new TopicPartition(TOPIC1, 0), - KafkaPartitionSplit.COMMITTED_OFFSET)))); - // pendingRecords should have not been registered because of lazily registration - assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent()); - // Trigger first fetch - reader.fetch(); - final Optional<Gauge<Long>> pendingRecords = - metricListener.getGauge(MetricNames.PENDING_RECORDS); - assertTrue(pendingRecords.isPresent()); - // Validate pendingRecords - assertNotNull(pendingRecords); - assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) pendingRecords.get().getValue()); - for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) { - reader.fetch(); - assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) pendingRecords.get().getValue()); - } - } - - @Test - public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() throws Throwable { - final Properties props = new Properties(); - props.setProperty( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - OffsetResetStrategy.LATEST.name().toLowerCase()); - props.setProperty( - ConsumerConfig.GROUP_ID_CONFIG, "using-committed-offset-with-latest-offset-reset"); - KafkaPartitionSplitReader reader = - createReader(props, UnregisteredMetricsGroup.createSourceReaderMetricGroup()); - // Add empty latest offset reset split - final KafkaPartitionSplit latestOffsetResetEmptySplit = - new KafkaPartitionSplit( - new TopicPartition(TOPIC1, 0), - KafkaPartitionSplit.COMMITTED_OFFSET, - KafkaPartitionSplit.LATEST_OFFSET); - final KafkaPartitionSplit latestOffsetResetNormalSplit = - new KafkaPartitionSplit( - new TopicPartition(TOPIC2, 0), KafkaPartitionSplit.COMMITTED_OFFSET); + partition, KafkaPartitionSplit.COMMITTED_OFFSET)))); - reader.handleSplitsChanges( - new SplitsAddition<>( - Arrays.asList(latestOffsetResetEmptySplit, latestOffsetResetNormalSplit))); - - // Fetch and check latest offset reset split is added to finished splits - RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> recordsWithSplitIds = reader.fetch(); - assertTrue( - recordsWithSplitIds - .finishedSplits() - .contains(latestOffsetResetEmptySplit.splitId())); + // Verify that the current offset of the consumer is the expected offset + assertEquals(expectedOffset, reader.consumer().position(partition)); } // ------------------
