This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kafka-10866-consumerrecords-metadata
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit dc27e3ff0fddf3b7fd80fef126449bdcae4f42a6
Author: John Roesler <[email protected]>
AuthorDate: Wed Jan 6 14:07:22 2021 -0600

    tests
---
 build.gradle                                       |   1 +
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 184 ++++++++++++++++++---
 2 files changed, 163 insertions(+), 22 deletions(-)

diff --git a/build.gradle b/build.gradle
index dd490be..11660fc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1070,6 +1070,7 @@ project(':clients') {
     testCompile libs.junitJupiterApi
     testCompile libs.junitVintageEngine
     testCompile libs.mockitoCore
+    testCompile libs.hamcrest
 
     testRuntime libs.slf4jlog4j
     testRuntime libs.jacksonDatabind
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7abcc2a..8472306 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -102,7 +102,6 @@ import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -129,10 +128,18 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -596,7 +603,7 @@ public class KafkaConsumerTest {
         initMetadata(client, Collections.singletonMap(topic, 2));
 
         KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, 
client, subscription, metadata);
-        consumer.assign(Arrays.asList(tp0, tp1));
+        consumer.assign(asList(tp0, tp1));
         consumer.seekToEnd(singleton(tp0));
         consumer.seekToBeginning(singleton(tp1));
 
@@ -756,7 +763,7 @@ public class KafkaConsumerTest {
         
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 
offset1), Errors.NONE), coordinator);
         assertEquals(offset1, 
consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
 
-        consumer.assign(Arrays.asList(tp0, tp1));
+        consumer.assign(asList(tp0, tp1));
 
         // fetch offset for two topics
         Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -828,14 +835,14 @@ public class KafkaConsumerTest {
         ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
-        consumer.assign(Arrays.asList(tp0, tp1));
+        consumer.assign(asList(tp0, tp1));
 
         // lookup coordinator
         
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
 
         // fetch offset for one topic
-        
client.prepareResponseFrom(offsetResponse(Utils.mkMap(Utils.mkEntry(tp0, 
offset1), Utils.mkEntry(tp1, -1L)), Errors.NONE), coordinator);
+        client.prepareResponseFrom(offsetResponse(mkMap(mkEntry(tp0, offset1), 
mkEntry(tp1, -1L)), Errors.NONE), coordinator);
         final Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(Utils.mkSet(tp0, tp1));
         assertEquals(2, committed.size());
         assertEquals(offset1, committed.get(tp0).offset());
@@ -1045,6 +1052,7 @@ public class KafkaConsumerTest {
 
         ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
         assertEquals(0, records.count());
+        assertThat(records.metadata(), equalTo(emptyMap()));
         consumer.close(Duration.ofMillis(0));
     }
 
@@ -1074,7 +1082,7 @@ public class KafkaConsumerTest {
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
 
         // initial subscription
-        consumer.subscribe(Arrays.asList(topic, topic2), 
getConsumerRebalanceListener(consumer));
+        consumer.subscribe(asList(topic, topic2), 
getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still 
unchanged
         assertEquals(2, consumer.subscription().size());
@@ -1082,7 +1090,7 @@ public class KafkaConsumerTest {
         assertTrue(consumer.assignment().isEmpty());
 
         // mock rebalance responses
-        Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0, t2p0), null);
+        Node coordinator = prepareRebalance(client, node, assignor, 
asList(tp0, t2p0), null);
 
         consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
         consumer.poll(Duration.ZERO);
@@ -1111,10 +1119,12 @@ public class KafkaConsumerTest {
         // verify that the fetch occurred as expected
         assertEquals(11, records.count());
         assertEquals(1L, consumer.position(tp0));
+        assertEquals(1L, records.metadata().get(tp0).position());
         assertEquals(10L, consumer.position(t2p0));
+        assertEquals(10L, records.metadata().get(t2p0).position());
 
         // subscription change
-        consumer.subscribe(Arrays.asList(topic, topic3), 
getConsumerRebalanceListener(consumer));
+        consumer.subscribe(asList(topic, topic3), 
getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still 
unchanged
         assertEquals(2, consumer.subscription().size());
@@ -1129,7 +1139,7 @@ public class KafkaConsumerTest {
         AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, 
coordinator, partitionOffsets1);
 
         // mock rebalance responses
-        prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), 
coordinator);
+        prepareRebalance(client, node, assignor, asList(tp0, t3p0), 
coordinator);
 
         // mock a response to the next fetch from the new assignment
         Map<TopicPartition, FetchInfo> fetches2 = new HashMap<>();
@@ -1142,7 +1152,9 @@ public class KafkaConsumerTest {
         // verify that the fetch occurred as expected
         assertEquals(101, records.count());
         assertEquals(2L, consumer.position(tp0));
+        assertEquals(2L, records.metadata().get(tp0).position());
         assertEquals(100L, consumer.position(t3p0));
+        assertEquals(100L, records.metadata().get(t3p0).position());
 
         // verify that the offset commits occurred as expected
         assertTrue(commitReceived.get());
@@ -1485,7 +1497,7 @@ public class KafkaConsumerTest {
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
         LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()));
-        consumerCloseTest(5000, Arrays.asList(commitResponse, 
leaveGroupResponse), 0, false);
+        consumerCloseTest(5000, asList(commitResponse, leaveGroupResponse), 0, 
false);
     }
 
     @Test
@@ -1856,12 +1868,12 @@ public class KafkaConsumerTest {
         ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
 
-        initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), 
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+        initMetadata(client, mkMap(mkEntry(topic, 1), mkEntry(topic2, 1), 
mkEntry(topic3, 1)));
 
-        consumer.subscribe(Arrays.asList(topic, topic2), 
getConsumerRebalanceListener(consumer));
+        consumer.subscribe(asList(topic, topic2), 
getConsumerRebalanceListener(consumer));
 
         Node node = metadata.fetch().nodes().get(0);
-        Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0, t2p0), null);
+        Node coordinator = prepareRebalance(client, node, assignor, 
asList(tp0, t2p0), null);
 
         // a poll with non-zero milliseconds would complete three round-trips 
(discover, join, sync)
         TestUtils.waitForCondition(() -> {
@@ -1892,7 +1904,7 @@ public class KafkaConsumerTest {
         client.respondFrom(fetchResponse(fetches1), node);
 
         // subscription change
-        consumer.subscribe(Arrays.asList(topic, topic3), 
getConsumerRebalanceListener(consumer));
+        consumer.subscribe(asList(topic, topic3), 
getConsumerRebalanceListener(consumer));
 
         // verify that subscription has changed but assignment is still 
unchanged
         assertEquals(Utils.mkSet(topic, topic3), consumer.subscription());
@@ -1938,7 +1950,7 @@ public class KafkaConsumerTest {
         client.respondFrom(fetchResponse(fetches1), node);
 
         // now complete the rebalance
-        client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), 
Errors.NONE), coordinator);
+        client.respondFrom(syncGroupResponse(asList(tp0, t3p0), Errors.NONE), 
coordinator);
 
         AtomicInteger count = new AtomicInteger(0);
         TestUtils.waitForCondition(() -> {
@@ -2036,6 +2048,122 @@ public class KafkaConsumerTest {
         assertThrows(IllegalStateException.class, consumer::groupMetadata);
     }
 
+    @Test
+    public void testPollMetadata() {
+        final Time time = new MockTime();
+        final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer =
+            newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+        consumer.assign(singleton(tp0));
+        consumer.seek(tp0, 50L);
+
+        final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
+        client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+
+        final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+        assertEquals(5, records.count());
+        assertEquals(55L, consumer.position(tp0));
+
+        // verify that the consumer computes the correct metadata based on the 
fetch response
+        final ConsumerRecords.Metadata actualMetadata = 
records.metadata().get(tp0);
+        assertEquals(1L, actualMetadata.beginningOffset());
+        assertEquals(100L, actualMetadata.endOffset());
+        assertEquals(55L, actualMetadata.position());
+        assertEquals(45L, actualMetadata.lag());
+        consumer.close(Duration.ZERO);
+    }
+
+
+    @Test
+    public void testPollMetadataWithExtraPartitions() {
+        final Time time = new MockTime();
+        final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 2));
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer =
+            newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+        consumer.assign(asList(tp0, tp1));
+        consumer.seek(tp0, 50L);
+        consumer.seek(tp1, 10L);
+
+        client.prepareResponse(
+            fetchResponse(
+                mkMap(
+                    mkEntry(tp0, new FetchInfo(1L, 99L, 50L, 5)),
+                    mkEntry(tp1, new FetchInfo(0L, 29L, 10L, 0))
+                )
+            )
+        );
+
+        final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+        assertEquals(5, records.count());
+        assertEquals(55L, consumer.position(tp0));
+
+        assertEquals(5, records.records(tp0).size());
+        final ConsumerRecords.Metadata tp0Metadata = 
records.metadata().get(tp0);
+        assertEquals(1L, tp0Metadata.beginningOffset());
+        assertEquals(100L, tp0Metadata.endOffset());
+        assertEquals(55L, tp0Metadata.position());
+        assertEquals(45L, tp0Metadata.lag());
+
+        // we may get back metadata for other assigned partitions even if we 
don't get records for them
+        assertEquals(0, records.records(tp1).size());
+        final ConsumerRecords.Metadata tp1Metadata = 
records.metadata().get(tp1);
+        assertEquals(0L, tp1Metadata.beginningOffset());
+        assertEquals(30L, tp1Metadata.endOffset());
+        assertEquals(10L, tp1Metadata.position());
+        assertEquals(20L, tp1Metadata.lag());
+
+        consumer.close(Duration.ZERO);
+    }
+
+    @Test
+    public void testPollMetadataWithNoRecords() {
+        final Time time = new MockTime();
+        final SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        final ConsumerMetadata metadata = createMetadata(subscription);
+        final MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+        final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer =
+            newConsumer(time, client, subscription, metadata, assignor, true, 
groupInstanceId);
+
+        consumer.assign(singleton(tp0));
+        consumer.seek(tp0, 50L);
+
+        final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 0);
+        client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+
+        final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+
+        // we got no records back ...
+        assertEquals(0, records.count());
+        assertEquals(50L, consumer.position(tp0));
+
+        // ... but we can still get metadata that was in the fetch response
+        final ConsumerRecords.Metadata actualMetadata = 
records.metadata().get(tp0);
+        assertEquals(1L, actualMetadata.beginningOffset());
+        assertEquals(100L, actualMetadata.endOffset());
+        assertEquals(50L, actualMetadata.position());
+        assertEquals(50L, actualMetadata.lag());
+
+        consumer.close(Duration.ZERO);
+    }
+
     private KafkaConsumer<String, String> 
consumerWithPendingAuthenticationError() {
         Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
@@ -2201,7 +2329,7 @@ public class KafkaConsumerTest {
     }
 
     private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> 
offsets) {
-        return listOffsetsResponse(offsets, Collections.emptyMap());
+        return listOffsetsResponse(offsets, emptyMap());
     }
 
     private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> 
partitionOffsets,
@@ -2237,6 +2365,8 @@ public class KafkaConsumerTest {
             TopicPartition partition = fetchEntry.getKey();
             long fetchOffset = fetchEntry.getValue().offset;
             int fetchCount = fetchEntry.getValue().count;
+            final long highWatermark = fetchEntry.getValue().logLastOffset + 1;
+            final long logStartOffset = fetchEntry.getValue().logFirstOffset;
             final MemoryRecords records;
             if (fetchCount == 0) {
                 records = MemoryRecords.EMPTY;
@@ -2248,8 +2378,8 @@ public class KafkaConsumerTest {
                 records = builder.build();
             }
             tpResponses.put(partition, new FetchResponse.PartitionData<>(
-                    Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                    0L, null, records));
+                    Errors.NONE, highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
+                    logStartOffset, null, records));
         }
         return new FetchResponse<>(Errors.NONE, tpResponses, 0, 
INVALID_SESSION_ID);
     }
@@ -2374,10 +2504,20 @@ public class KafkaConsumerTest {
     }
 
     private static class FetchInfo {
+        long logFirstOffset;
+        long logLastOffset;
         long offset;
         int count;
 
         FetchInfo(long offset, int count) {
+            this(0L, offset + count, offset, count);
+        }
+
+        FetchInfo(long logFirstOffset, long logLastOffset, long offset, int 
count) {
+            assertThat(logFirstOffset, lessThanOrEqualTo(offset));
+            assertThat(logLastOffset, greaterThanOrEqualTo(offset + count));
+            this.logFirstOffset = logFirstOffset;
+            this.logLastOffset = logLastOffset;
             this.offset = offset;
             this.count = count;
         }
@@ -2516,7 +2656,7 @@ public class KafkaConsumerTest {
     }
 
     private static boolean consumerMetricPresent(KafkaConsumer<String, String> 
consumer, String name) {
-        MetricName metricName = new MetricName(name, "consumer-metrics", "", 
Collections.emptyMap());
+        MetricName metricName = new MetricName(name, "consumer-metrics", "", 
emptyMap());
         return consumer.metrics.metrics().containsKey(metricName);
     }
 
@@ -2556,11 +2696,11 @@ public class KafkaConsumerTest {
         ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
         KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
         MockRebalanceListener countingRebalanceListener = new 
MockRebalanceListener();
-        initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), 
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+        initMetadata(client, mkMap(mkEntry(topic, 1), mkEntry(topic2, 1), 
mkEntry(topic3, 1)));
 
-        consumer.subscribe(Arrays.asList(topic, topic2), 
countingRebalanceListener);
+        consumer.subscribe(asList(topic, topic2), countingRebalanceListener);
         Node node = metadata.fetch().nodes().get(0);
-        prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), 
null);
+        prepareRebalance(client, node, assignor, asList(tp0, t2p0), null);
 
         // a first rebalance to get the assignment, we need two poll calls 
since we need two round trips to finish join / sync-group
         consumer.poll(Duration.ZERO);

Reply via email to