This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kafka-10866-consumerrecords-metadata
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ecb1de11e6307610a3a6c8dcb32f220c90e199f9
Author: John Roesler <[email protected]>
AuthorDate: Wed Nov 18 13:30:37 2020 -0600

    KAFKA-10866: Add fetched metadata to ConsumerRecords
    
    Exposed the fetched metadata via the ConsumerRecords
    object as described in KIP-695
---
 checkstyle/suppressions.xml                        |   5 +-
 .../kafka/clients/consumer/ConsumerRecords.java    | 105 ++++++++++++++++++++-
 .../kafka/clients/consumer/KafkaConsumer.java      |   7 +-
 .../kafka/clients/consumer/MockConsumer.java       |  18 +++-
 .../clients/consumer/internals/FetchedRecords.java | 100 ++++++++++++++++++++
 .../kafka/clients/consumer/internals/Fetcher.java  |  59 ++++++++----
 .../clients/consumer/internals/FetcherTest.java    |  28 +++---
 .../processor/internals/StreamTaskTest.java        |   2 +-
 8 files changed, 283 insertions(+), 41 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0e348d7..3592a44 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -100,7 +100,10 @@
               
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
 
     <suppress checks="NPathComplexity"
-              
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
+              
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="MockConsumer"/>
 
     <suppress 
checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 4d0f62c..4e0e0a6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.consumer.internals.FetchedRecords;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.AbstractIterator;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -32,14 +34,99 @@ import java.util.Set;
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-
-    @SuppressWarnings("unchecked")
-    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.EMPTY_MAP);
+    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, Metadata> metadata;
+
+    public static final class Metadata {
+
+        private final long receivedTimestamp;
+        private final long position;
+        private final long beginningOffset;
+        private final long endOffset;
+
+        public Metadata(final long receivedTimestamp,
+                        final long position,
+                        final long beginningOffset,
+                        final long endOffset) {
+            this.receivedTimestamp = receivedTimestamp;
+            this.position = position;
+            this.beginningOffset = beginningOffset;
+            this.endOffset = endOffset;
+        }
+
+        /**
+         * @return The timestamp of the broker response that contained this 
metadata
+         */
+        public long receivedTimestamp() {
+            return receivedTimestamp;
+        }
+
+        /**
+         * @return The next position the consumer will fetch
+         */
+        public long position() {
+            return position;
+        }
+
+        /**
+         * @return The lag between the next position to fetch and the current 
end of the partition
+         */
+        public long lag() {
+            return endOffset - position;
+        }
+
+        /**
+         * @return The current first offset in the partition.
+         */
+        public long beginningOffset() {
+            return beginningOffset;
+        }
+
+        /**
+         * @return The current last offset in the partition. The determination 
of the "last" offset
+         * depends on the Consumer's isolation level. Under 
"read_uncommitted," this is the last successfully
+         * replicated offset plus one. Under "read_committed," this is the 
minimum of the last successfully
+         * replicated offset plus one or the smallest offset of any open 
transaction.
+         */
+        public long endOffset() {
+            return endOffset;
+        }
+    }
+
+    private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final 
FetchedRecords<K, V> fetchedRecords) {
+        final Map<TopicPartition, Metadata> metadata = new HashMap<>();
+        for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> 
entry : fetchedRecords.metadata().entrySet()) {
+            metadata.put(
+                entry.getKey(),
+                new Metadata(
+                    entry.getValue().receivedTimestamp(),
+                    entry.getValue().position().offset,
+                    entry.getValue().beginningOffset(),
+                    entry.getValue().endOffset()
+                )
+            );
+        }
+        return metadata;
+    }
+
+    public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, 
V>>> records) {
+        this.records = records;
+        this.metadata = new HashMap<>();
+    }
 
-    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records) {
+    public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, 
V>>> records,
+                           final Map<TopicPartition, Metadata> metadata) {
         this.records = records;
+        this.metadata = metadata;
+    }
+
+    public ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
+        this(fetchedRecords.records(), extractMetadata(fetchedRecords));
     }
 
     /**
@@ -56,6 +143,16 @@ public class ConsumerRecords<K, V> implements 
Iterable<ConsumerRecord<K, V>> {
     }
 
     /**
+     * Get the updated metadata returned by the brokers along with this record 
set.
+     * May be empty or partial depending on the responses from the broker 
during this particular poll.
+     * May also include metadata for additional partitions than the ones for 
which there are records
+     * in this {@code ConsumerRecords} object.
+     */
+    public Map<TopicPartition, Metadata> metadata() {
+        return Collections.unmodifiableMap(metadata);
+    }
+
+    /**
      * Get just the records for the given topic
      */
     public Iterable<ConsumerRecord<K, V>> records(String topic) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b6bebc1..e60eebe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.consumer.internals.FetchedRecords;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
 import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
@@ -1234,7 +1235,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records 
= pollForFetches(timer);
+                final FetchedRecords<K, V> records = pollForFetches(timer);
                 if (!records.isEmpty()) {
                     // before returning the fetched records, we can send off 
the next round of fetches
                     // and avoid block waiting for their responses to enable 
pipelining while the user
@@ -1268,12 +1269,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    private Map<TopicPartition, List<ConsumerRecord<K, V>>> 
pollForFetches(Timer timer) {
+    private FetchedRecords<K, V> pollForFetches(Timer timer) {
         long pollTimeout = coordinator == null ? timer.remainingMs() :
                 Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());
 
         // if data is available already, return it immediately
-        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();
+        final FetchedRecords<K, V> records = fetcher.fetchedRecords();
         if (!records.isEmpty()) {
             return records;
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 7bf4c3f..ecbada7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -218,7 +218,22 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         }
 
         toClear.forEach(p -> this.records.remove(p));
-        return new ConsumerRecords<>(results);
+
+        final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new 
HashMap<>();
+        for (final TopicPartition partition : 
subscriptions.assignedPartitions()) {
+            if (subscriptions.hasValidPosition(partition) && 
beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) {
+                final SubscriptionState.FetchPosition position = 
subscriptions.position(partition);
+                final long offset = position.offset;
+                final long beginningOffset = beginningOffsets.get(partition);
+                final long endOffset = endOffsets.get(partition);
+                metadata.put(
+                    partition,
+                    new ConsumerRecords.Metadata(System.currentTimeMillis(), 
offset, beginningOffset, endOffset)
+                );
+            }
+        }
+
+        return new ConsumerRecords<>(results, metadata);
     }
 
     public synchronized void addRecord(ConsumerRecord<K, V> record) {
@@ -229,6 +244,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             throw new IllegalStateException("Cannot add records for a 
partition that is not assigned to the consumer");
         List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k 
-> new ArrayList<>());
         recs.add(record);
+        endOffsets.compute(tp, (ignore, offset) -> offset == null ? 
record.offset() : Math.max(offset, record.offset()));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java
new file mode 100644
index 0000000..82d32f1
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchedRecords<K, V> {
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, FetchMetadata> metadata;
+
+    public static final class FetchMetadata {
+
+        private final long receivedTimestamp;
+        private final long beginningOffset;
+        private final SubscriptionState.FetchPosition position;
+        private final long endOffset;
+
+        public FetchMetadata(final long receivedTimestamp,
+                             final SubscriptionState.FetchPosition position,
+                             final long beginningOffset,
+                             final long endOffset) {
+            this.receivedTimestamp = receivedTimestamp;
+            this.position = position;
+            this.beginningOffset = beginningOffset;
+            this.endOffset = endOffset;
+        }
+
+        public long receivedTimestamp() {
+            return receivedTimestamp;
+        }
+
+        public SubscriptionState.FetchPosition position() {
+            return position;
+        }
+
+        public long beginningOffset() {
+            return beginningOffset;
+        }
+
+        public long endOffset() {
+            return endOffset;
+        }
+    }
+
+    public FetchedRecords() {
+        records = new HashMap<>();
+        metadata = new HashMap<>();
+    }
+
+    public void addRecords(final TopicPartition topicPartition, final 
List<ConsumerRecord<K, V>> records) {
+        if (this.records.containsKey(topicPartition)) {
+            // this case shouldn't usually happen because we only send one 
fetch at a time per partition,
+            // but it might conceivably happen in some rare cases (such as 
partition leader changes).
+            // we have to copy to a new list because the old one may be 
immutable
+            final List<ConsumerRecord<K, V>> currentRecords = 
this.records.get(topicPartition);
+            final List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
+            newRecords.addAll(currentRecords);
+            newRecords.addAll(records);
+            this.records.put(topicPartition, newRecords);
+        } else {
+            this.records.put(topicPartition, records);
+        }
+    }
+
+    public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() {
+        return records;
+    }
+
+    public void addMetadata(final TopicPartition partition, final 
FetchMetadata fetchMetadata) {
+        metadata.put(partition, fetchMetadata);
+    }
+
+    public Map<TopicPartition, FetchMetadata> metadata() {
+        return metadata;
+    }
+
+    public boolean isEmpty() {
+        return records.isEmpty() && metadata.isEmpty();
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index d5dbf22..9c235cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -319,7 +319,7 @@ public class Fetcher<K, V> implements Closeable {
                                     short responseVersion = 
resp.requestHeader().apiVersion();
 
                                     completedFetches.add(new 
CompletedFetch(partition, partitionData,
-                                            metricAggregator, batches, 
fetchOffset, responseVersion));
+                                            metricAggregator, batches, 
fetchOffset, responseVersion, resp.receivedTimeMs()));
                                 }
                             }
 
@@ -598,8 +598,8 @@ public class Fetcher<K, V> implements Closeable {
      *         the defaultResetPolicy is NONE
      * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
      */
-    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
-        Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new 
HashMap<>();
+    public FetchedRecords<K, V> fetchedRecords() {
+        FetchedRecords<K, V> fetched = new FetchedRecords<>();
         Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
         int recordsRemaining = maxPollRecords;
 
@@ -637,20 +637,42 @@ public class Fetcher<K, V> implements Closeable {
                 } else {
                     List<ConsumerRecord<K, V>> records = 
fetchRecords(nextInLineFetch, recordsRemaining);
 
-                    if (!records.isEmpty()) {
-                        TopicPartition partition = nextInLineFetch.partition;
-                        List<ConsumerRecord<K, V>> currentRecords = 
fetched.get(partition);
-                        if (currentRecords == null) {
-                            fetched.put(partition, records);
-                        } else {
-                            // this case shouldn't usually happen because we 
only send one fetch at a time per partition,
-                            // but it might conceivably happen in some rare 
cases (such as partition leader changes).
-                            // we have to copy to a new list because the old 
one may be immutable
-                            List<ConsumerRecord<K, V>> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
-                            newRecords.addAll(currentRecords);
-                            newRecords.addAll(records);
-                            fetched.put(partition, newRecords);
+                    TopicPartition partition = nextInLineFetch.partition;
+
+                    if (subscriptions.isAssigned(partition)) {
+                        final long receivedTimestamp = 
nextInLineFetch.receivedTimestamp;
+
+                        final long startOffset = 
nextInLineFetch.partitionData.logStartOffset();
+
+                        // read_uncommitted:
+                        //that is, the offset of the last successfully 
replicated message plus one
+                        final long hwm = 
nextInLineFetch.partitionData.highWatermark();
+                        // read_committed:
+                        //the minimum of the high watermark and the smallest 
offset of any open transaction
+                        final long lso = 
nextInLineFetch.partitionData.lastStableOffset();
+
+                        // end offset is:
+                        final long endOffset =
+                            isolationLevel == IsolationLevel.READ_UNCOMMITTED 
? hwm : lso;
+
+                        final FetchPosition fetchPosition = 
subscriptions.position(partition);
+
+                        final FetchedRecords.FetchMetadata fetchMetadata = 
fetched.metadata().get(partition);
+
+                        if (fetchMetadata == null
+                            || 
!fetchMetadata.position().offsetEpoch.isPresent()
+                            || fetchPosition.offsetEpoch.isPresent()
+                            && fetchMetadata.position().offsetEpoch.get() <= 
fetchPosition.offsetEpoch.get()) {
+
+                            fetched.addMetadata(
+                                partition,
+                                new 
FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, startOffset, 
endOffset)
+                            );
                         }
+                    }
+
+                    if (!records.isEmpty()) {
+                        fetched.addRecords(partition, records);
                         recordsRemaining -= records.size();
                     }
                 }
@@ -1459,6 +1481,7 @@ public class Fetcher<K, V> implements Closeable {
         private final FetchResponse.PartitionData<Records> partitionData;
         private final FetchResponseMetricAggregator metricAggregator;
         private final short responseVersion;
+        private final long receivedTimestamp;
 
         private int recordsRead;
         private int bytesRead;
@@ -1477,13 +1500,15 @@ public class Fetcher<K, V> implements Closeable {
                                FetchResponseMetricAggregator metricAggregator,
                                Iterator<? extends RecordBatch> batches,
                                Long fetchOffset,
-                               short responseVersion) {
+                               short responseVersion,
+                               final long receivedTimestamp) {
             this.partition = partition;
             this.partitionData = partitionData;
             this.metricAggregator = metricAggregator;
             this.batches = batches;
             this.nextFetchOffset = fetchOffset;
             this.responseVersion = responseVersion;
+            this.receivedTimestamp = receivedTimestamp;
             this.lastEpoch = Optional.empty();
             this.abortedProducerIds = new HashSet<>();
             this.abortedTransactions = abortedTransactions(partitionData);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b8f9e4c..c71c19b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -524,7 +524,7 @@ public class FetcherTest {
         consumerClient.poll(time.timer(0));
 
         // the first fetchedRecords() should return the first valid message
-        assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
+        assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size());
         assertEquals(1, subscriptions.position(tp0).offset);
 
         ensureBlockOnRecord(1L);
@@ -928,7 +928,7 @@ public class FetcherTest {
 
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
-        assertNull(fetcher.fetchedRecords().get(tp0));
+        assertNull(fetcher.fetchedRecords().records().get(tp0));
     }
 
     @Test
@@ -1117,7 +1117,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
         consumerClient.poll(time.timer(0));
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
     }
 
@@ -1130,7 +1130,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
         consumerClient.poll(time.timer(0));
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
     }
 
@@ -1144,7 +1144,7 @@ public class FetcherTest {
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.FENCED_LEADER_EPOCH, 100L, 0));
         consumerClient.poll(time.timer(0));
 
-        assertEquals("Should not return any records", 0, 
fetcher.fetchedRecords().size());
+        assertEquals("Should not return any records", 0, 
fetcher.fetchedRecords().records().size());
         assertEquals("Should have requested metadata update", 0L, 
metadata.timeToNextUpdate(time.milliseconds()));
     }
 
@@ -1158,7 +1158,7 @@ public class FetcherTest {
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.UNKNOWN_LEADER_EPOCH, 100L, 0));
         consumerClient.poll(time.timer(0));
 
-        assertEquals("Should not return any records", 0, 
fetcher.fetchedRecords().size());
+        assertEquals("Should not return any records", 0, 
fetcher.fetchedRecords().records().size());
         assertNotEquals("Should not have requested metadata update", 0L, 
metadata.timeToNextUpdate(time.milliseconds()));
     }
 
@@ -1200,7 +1200,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(time.timer(0));
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
         assertTrue(subscriptions.isOffsetResetNeeded(tp0));
         assertNull(subscriptions.validPosition(tp0));
         assertNull(subscriptions.position(tp0));
@@ -1218,7 +1218,7 @@ public class FetcherTest {
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         subscriptions.seek(tp0, 1);
         consumerClient.poll(time.timer(0));
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         assertEquals(1, subscriptions.position(tp0).offset);
     }
@@ -1236,7 +1236,7 @@ public class FetcherTest {
         consumerClient.poll(time.timer(0));
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
         subscriptions.seek(tp0, 2);
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
     }
 
     @Test
@@ -1392,7 +1392,7 @@ public class FetcherTest {
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
 
-        assertEquals(2, fetcher.fetchedRecords().get(tp0).size());
+        assertEquals(2, fetcher.fetchedRecords().records().get(tp0).size());
 
         subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
         subscriptions.seekUnvalidated(tp1, new 
SubscriptionState.FetchPosition(1, Optional.empty(), 
metadata.currentLeader(tp1)));
@@ -1403,11 +1403,11 @@ public class FetcherTest {
                 FetchResponse.INVALID_LAST_STABLE_OFFSET, 
FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, 
MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse<>(Errors.NONE, new 
LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
         consumerClient.poll(time.timer(0));
-        assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
+        assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size());
 
         subscriptions.seek(tp1, 10);
         // Should not throw OffsetOutOfRangeException after the seek
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
     }
 
     @Test
@@ -1420,7 +1420,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L, 0), true);
         consumerClient.poll(time.timer(0));
-        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0, fetcher.fetchedRecords().records().size());
 
         // disconnects should have no affect on subscription state
         assertFalse(subscriptions.isOffsetResetNeeded(tp0));
@@ -4513,7 +4513,7 @@ public class FetcherTest {
 
     @SuppressWarnings("unchecked")
     private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> 
fetchedRecords() {
-        return (Map) fetcher.fetchedRecords();
+        return (Map) fetcher.fetchedRecords().records();
     }
 
     private void buildFetcher(int maxPollRecords) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 81d3db9..a72f0fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1582,7 +1582,7 @@ public class StreamTaskTest {
         task.postCommit(true);
         EasyMock.verify(stateManager);
     }
-    
+
     @Test
     public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
         EasyMock.expect(stateManager.changelogOffsets())

Reply via email to