This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch kafka-10866-consumerrecords-metadata in repository https://gitbox.apache.org/repos/asf/kafka.git
commit dc27e3ff0fddf3b7fd80fef126449bdcae4f42a6 Author: John Roesler <[email protected]> AuthorDate: Wed Jan 6 14:07:22 2021 -0600 tests --- build.gradle | 1 + .../kafka/clients/consumer/KafkaConsumerTest.java | 184 ++++++++++++++++++--- 2 files changed, 163 insertions(+), 22 deletions(-) diff --git a/build.gradle b/build.gradle index dd490be..11660fc 100644 --- a/build.gradle +++ b/build.gradle @@ -1070,6 +1070,7 @@ project(':clients') { testCompile libs.junitJupiterApi testCompile libs.junitVintageEngine testCompile libs.mockitoCore + testCompile libs.hamcrest testRuntime libs.slf4jlog4j testRuntime libs.jacksonDatabind diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 7abcc2a..8472306 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -102,7 +102,6 @@ import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -129,10 +128,18 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -596,7 +603,7 @@ public class KafkaConsumerTest { initMetadata(client, Collections.singletonMap(topic, 2)); KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, subscription, metadata); - consumer.assign(Arrays.asList(tp0, tp1)); + consumer.assign(asList(tp0, tp1)); consumer.seekToEnd(singleton(tp0)); consumer.seekToBeginning(singleton(tp1)); @@ -756,7 +763,7 @@ public class KafkaConsumerTest { client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator); assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); - consumer.assign(Arrays.asList(tp0, tp1)); + consumer.assign(asList(tp0, tp1)); // fetch offset for two topics Map<TopicPartition, Long> offsets = new HashMap<>(); @@ -828,14 +835,14 @@ public class KafkaConsumerTest { ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - consumer.assign(Arrays.asList(tp0, tp1)); + consumer.assign(asList(tp0, tp1)); // lookup coordinator client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // fetch offset for one topic - client.prepareResponseFrom(offsetResponse(Utils.mkMap(Utils.mkEntry(tp0, offset1), Utils.mkEntry(tp1, -1L)), Errors.NONE), coordinator); + client.prepareResponseFrom(offsetResponse(mkMap(mkEntry(tp0, offset1), mkEntry(tp1, -1L)), Errors.NONE), coordinator); final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(Utils.mkSet(tp0, tp1)); assertEquals(2, committed.size()); assertEquals(offset1, committed.get(tp0).offset()); @@ -1045,6 +1052,7 @@ public class KafkaConsumerTest { ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO); assertEquals(0, records.count()); + assertThat(records.metadata(), equalTo(emptyMap())); consumer.close(Duration.ofMillis(0)); } @@ -1074,7 +1082,7 @@ public class KafkaConsumerTest { KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); // initial subscription - consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); + consumer.subscribe(asList(topic, topic2), getConsumerRebalanceListener(consumer)); // verify that subscription has changed but assignment is still unchanged assertEquals(2, consumer.subscription().size()); @@ -1082,7 +1090,7 @@ public class KafkaConsumerTest { assertTrue(consumer.assignment().isEmpty()); // mock rebalance responses - Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); + Node coordinator = prepareRebalance(client, node, assignor, asList(tp0, t2p0), null); consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); consumer.poll(Duration.ZERO); @@ -1111,10 +1119,12 @@ public class KafkaConsumerTest { // verify that the fetch occurred as expected assertEquals(11, records.count()); assertEquals(1L, consumer.position(tp0)); + assertEquals(1L, records.metadata().get(tp0).position()); assertEquals(10L, consumer.position(t2p0)); + assertEquals(10L, records.metadata().get(t2p0).position()); // subscription change - consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); + consumer.subscribe(asList(topic, topic3), getConsumerRebalanceListener(consumer)); // verify that subscription has changed but assignment is still unchanged assertEquals(2, consumer.subscription().size()); @@ -1129,7 +1139,7 @@ public class KafkaConsumerTest { AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets1); // mock rebalance responses - prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), coordinator); + prepareRebalance(client, node, assignor, asList(tp0, t3p0), coordinator); // mock a response to the next fetch from the new assignment Map<TopicPartition, FetchInfo> fetches2 = new HashMap<>(); @@ -1142,7 +1152,9 @@ public class KafkaConsumerTest { // verify that the fetch occurred as expected assertEquals(101, records.count()); assertEquals(2L, consumer.position(tp0)); + assertEquals(2L, records.metadata().get(tp0).position()); assertEquals(100L, consumer.position(t3p0)); + assertEquals(100L, records.metadata().get(t3p0).position()); // verify that the offset commits occurred as expected assertTrue(commitReceived.get()); @@ -1485,7 +1497,7 @@ public class KafkaConsumerTest { response.put(tp0, Errors.NONE); OffsetCommitResponse commitResponse = offsetCommitResponse(response); LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())); - consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse), 0, false); + consumerCloseTest(5000, asList(commitResponse, leaveGroupResponse), 0, false); } @Test @@ -1856,12 +1868,12 @@ public class KafkaConsumerTest { ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); + initMetadata(client, mkMap(mkEntry(topic, 1), mkEntry(topic2, 1), mkEntry(topic3, 1))); - consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); + consumer.subscribe(asList(topic, topic2), getConsumerRebalanceListener(consumer)); Node node = metadata.fetch().nodes().get(0); - Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); + Node coordinator = prepareRebalance(client, node, assignor, asList(tp0, t2p0), null); // a poll with non-zero milliseconds would complete three round-trips (discover, join, sync) TestUtils.waitForCondition(() -> { @@ -1892,7 +1904,7 @@ public class KafkaConsumerTest { client.respondFrom(fetchResponse(fetches1), node); // subscription change - consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); + consumer.subscribe(asList(topic, topic3), getConsumerRebalanceListener(consumer)); // verify that subscription has changed but assignment is still unchanged assertEquals(Utils.mkSet(topic, topic3), consumer.subscription()); @@ -1938,7 +1950,7 @@ public class KafkaConsumerTest { client.respondFrom(fetchResponse(fetches1), node); // now complete the rebalance - client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator); + client.respondFrom(syncGroupResponse(asList(tp0, t3p0), Errors.NONE), coordinator); AtomicInteger count = new AtomicInteger(0); TestUtils.waitForCondition(() -> { @@ -2036,6 +2048,122 @@ public class KafkaConsumerTest { assertThrows(IllegalStateException.class, consumer::groupMetadata); } + @Test + public void testPollMetadata() { + final Time time = new MockTime(); + final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + + initMetadata(client, singletonMap(topic, 1)); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer<String, String> consumer = + newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + + consumer.assign(singleton(tp0)); + consumer.seek(tp0, 50L); + + final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); + client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); + + final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1)); + assertEquals(5, records.count()); + assertEquals(55L, consumer.position(tp0)); + + // verify that the consumer computes the correct metadata based on the fetch response + final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); + assertEquals(1L, actualMetadata.beginningOffset()); + assertEquals(100L, actualMetadata.endOffset()); + assertEquals(55L, actualMetadata.position()); + assertEquals(45L, actualMetadata.lag()); + consumer.close(Duration.ZERO); + } + + + @Test + public void testPollMetadataWithExtraPartitions() { + final Time time = new MockTime(); + final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + + initMetadata(client, singletonMap(topic, 2)); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer<String, String> consumer = + newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + + consumer.assign(asList(tp0, tp1)); + consumer.seek(tp0, 50L); + consumer.seek(tp1, 10L); + + client.prepareResponse( + fetchResponse( + mkMap( + mkEntry(tp0, new FetchInfo(1L, 99L, 50L, 5)), + mkEntry(tp1, new FetchInfo(0L, 29L, 10L, 0)) + ) + ) + ); + + final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1)); + assertEquals(5, records.count()); + assertEquals(55L, consumer.position(tp0)); + + assertEquals(5, records.records(tp0).size()); + final ConsumerRecords.Metadata tp0Metadata = records.metadata().get(tp0); + assertEquals(1L, tp0Metadata.beginningOffset()); + assertEquals(100L, tp0Metadata.endOffset()); + assertEquals(55L, tp0Metadata.position()); + assertEquals(45L, tp0Metadata.lag()); + + // we may get back metadata for other assigned partitions even if we don't get records for them + assertEquals(0, records.records(tp1).size()); + final ConsumerRecords.Metadata tp1Metadata = records.metadata().get(tp1); + assertEquals(0L, tp1Metadata.beginningOffset()); + assertEquals(30L, tp1Metadata.endOffset()); + assertEquals(10L, tp1Metadata.position()); + assertEquals(20L, tp1Metadata.lag()); + + consumer.close(Duration.ZERO); + } + + @Test + public void testPollMetadataWithNoRecords() { + final Time time = new MockTime(); + final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + final ConsumerMetadata metadata = createMetadata(subscription); + final MockClient client = new MockClient(time, metadata); + + initMetadata(client, singletonMap(topic, 1)); + final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer<String, String> consumer = + newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + + consumer.assign(singleton(tp0)); + consumer.seek(tp0, 50L); + + final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 0); + client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); + + final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1)); + + // we got no records back ... + assertEquals(0, records.count()); + assertEquals(50L, consumer.position(tp0)); + + // ... but we can still get metadata that was in the fetch response + final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); + assertEquals(1L, actualMetadata.beginningOffset()); + assertEquals(100L, actualMetadata.endOffset()); + assertEquals(50L, actualMetadata.position()); + assertEquals(50L, actualMetadata.lag()); + + consumer.close(Duration.ZERO); + } + private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() { Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); @@ -2201,7 +2329,7 @@ public class KafkaConsumerTest { } private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> offsets) { - return listOffsetsResponse(offsets, Collections.emptyMap()); + return listOffsetsResponse(offsets, emptyMap()); } private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> partitionOffsets, @@ -2237,6 +2365,8 @@ public class KafkaConsumerTest { TopicPartition partition = fetchEntry.getKey(); long fetchOffset = fetchEntry.getValue().offset; int fetchCount = fetchEntry.getValue().count; + final long highWatermark = fetchEntry.getValue().logLastOffset + 1; + final long logStartOffset = fetchEntry.getValue().logFirstOffset; final MemoryRecords records; if (fetchCount == 0) { records = MemoryRecords.EMPTY; @@ -2248,8 +2378,8 @@ public class KafkaConsumerTest { records = builder.build(); } tpResponses.put(partition, new FetchResponse.PartitionData<>( - Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, - 0L, null, records)); + Errors.NONE, highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, + logStartOffset, null, records)); } return new FetchResponse<>(Errors.NONE, tpResponses, 0, INVALID_SESSION_ID); } @@ -2374,10 +2504,20 @@ public class KafkaConsumerTest { } private static class FetchInfo { + long logFirstOffset; + long logLastOffset; long offset; int count; FetchInfo(long offset, int count) { + this(0L, offset + count, offset, count); + } + + FetchInfo(long logFirstOffset, long logLastOffset, long offset, int count) { + assertThat(logFirstOffset, lessThanOrEqualTo(offset)); + assertThat(logLastOffset, greaterThanOrEqualTo(offset + count)); + this.logFirstOffset = logFirstOffset; + this.logLastOffset = logLastOffset; this.offset = offset; this.count = count; } @@ -2516,7 +2656,7 @@ public class KafkaConsumerTest { } private static boolean consumerMetricPresent(KafkaConsumer<String, String> consumer, String name) { - MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap()); + MetricName metricName = new MetricName(name, "consumer-metrics", "", emptyMap()); return consumer.metrics.metrics().containsKey(metricName); } @@ -2556,11 +2696,11 @@ public class KafkaConsumerTest { ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); MockRebalanceListener countingRebalanceListener = new MockRebalanceListener(); - initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); + initMetadata(client, mkMap(mkEntry(topic, 1), mkEntry(topic2, 1), mkEntry(topic3, 1))); - consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener); + consumer.subscribe(asList(topic, topic2), countingRebalanceListener); Node node = metadata.fetch().nodes().get(0); - prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); + prepareRebalance(client, node, assignor, asList(tp0, t2p0), null); // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group consumer.poll(Duration.ZERO);
