This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch kafka-10866-consumerrecords-metadata in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ecb1de11e6307610a3a6c8dcb32f220c90e199f9 Author: John Roesler <[email protected]> AuthorDate: Wed Nov 18 13:30:37 2020 -0600 KAFKA-10866: Add fetched metadata to ConsumerRecords Exposed the fetched metadata via the ConsumerRecords object as described in KIP-695 --- checkstyle/suppressions.xml | 5 +- .../kafka/clients/consumer/ConsumerRecords.java | 105 ++++++++++++++++++++- .../kafka/clients/consumer/KafkaConsumer.java | 7 +- .../kafka/clients/consumer/MockConsumer.java | 18 +++- .../clients/consumer/internals/FetchedRecords.java | 100 ++++++++++++++++++++ .../kafka/clients/consumer/internals/Fetcher.java | 59 ++++++++---- .../clients/consumer/internals/FetcherTest.java | 28 +++--- .../processor/internals/StreamTaskTest.java | 2 +- 8 files changed, 283 insertions(+), 41 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 0e348d7..3592a44 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -100,7 +100,10 @@ files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/> <suppress checks="NPathComplexity" - files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/> + files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/> + + <suppress checks="CyclomaticComplexity" + files="MockConsumer"/> <suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)" files="Murmur3Test.java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 4d0f62c..4e0e0a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.FetchedRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.AbstractIterator; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -32,14 +34,99 @@ import java.util.Set; * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. */ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { - - @SuppressWarnings("unchecked") - public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP); + public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>( + Collections.emptyMap(), + Collections.emptyMap() + ); private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private final Map<TopicPartition, Metadata> metadata; + + public static final class Metadata { + + private final long receivedTimestamp; + private final long position; + private final long beginningOffset; + private final long endOffset; + + public Metadata(final long receivedTimestamp, + final long position, + final long beginningOffset, + final long endOffset) { + this.receivedTimestamp = receivedTimestamp; + this.position = position; + this.beginningOffset = beginningOffset; + this.endOffset = endOffset; + } + + /** + * @return The timestamp of the broker response that contained this metadata + */ + public long receivedTimestamp() { + return receivedTimestamp; + } + + /** + * @return The next position the consumer will fetch + */ + public long position() { + return position; + } + + /** + * @return The lag between the next position to fetch and the current end of the partition + */ + public long lag() { + return endOffset - position; + } + + /** + * @return The current first offset in the partition. + */ + public long beginningOffset() { + return beginningOffset; + } + + /** + * @return The current last offset in the partition. The determination of the "last" offset + * depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully + * replicated offset plus one. Under "read_committed," this is the minimum of the last successfully + * replicated offset plus one or the smallest offset of any open transaction. + */ + public long endOffset() { + return endOffset; + } + } + + private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) { + final Map<TopicPartition, Metadata> metadata = new HashMap<>(); + for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) { + metadata.put( + entry.getKey(), + new Metadata( + entry.getValue().receivedTimestamp(), + entry.getValue().position().offset, + entry.getValue().beginningOffset(), + entry.getValue().endOffset() + ) + ); + } + return metadata; + } + + public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) { + this.records = records; + this.metadata = new HashMap<>(); + } - public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) { + public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records, + final Map<TopicPartition, Metadata> metadata) { this.records = records; + this.metadata = metadata; + } + + public ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) { + this(fetchedRecords.records(), extractMetadata(fetchedRecords)); } /** @@ -56,6 +143,16 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { } /** + * Get the updated metadata returned by the brokers along with this record set. + * May be empty or partial depending on the responses from the broker during this particular poll. + * May also include metadata for additional partitions than the ones for which there are records + * in this {@code ConsumerRecords} object. + */ + public Map<TopicPartition, Metadata> metadata() { + return Collections.unmodifiableMap(metadata); + } + + /** * Get just the records for the given topic */ public Iterable<ConsumerRecord<K, V>> records(String topic) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b6bebc1..e60eebe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.clients.consumer.internals.FetchedRecords; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics; @@ -1234,7 +1235,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } } - final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); + final FetchedRecords<K, V> records = pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user @@ -1268,12 +1269,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { /** * @throws KafkaException if the rebalance callback throws exception */ - private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { + private FetchedRecords<K, V> pollForFetches(Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // if data is available already, return it immediately - final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); + final FetchedRecords<K, V> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 7bf4c3f..ecbada7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -218,7 +218,22 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } toClear.forEach(p -> this.records.remove(p)); - return new ConsumerRecords<>(results); + + final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>(); + for (final TopicPartition partition : subscriptions.assignedPartitions()) { + if (subscriptions.hasValidPosition(partition) && beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) { + final SubscriptionState.FetchPosition position = subscriptions.position(partition); + final long offset = position.offset; + final long beginningOffset = beginningOffsets.get(partition); + final long endOffset = endOffsets.get(partition); + metadata.put( + partition, + new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, beginningOffset, endOffset) + ); + } + } + + return new ConsumerRecords<>(results, metadata); } public synchronized void addRecord(ConsumerRecord<K, V> record) { @@ -229,6 +244,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer"); List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>()); recs.add(record); + endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset())); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java new file mode 100644 index 0000000..82d32f1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FetchedRecords<K, V> { + private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private final Map<TopicPartition, FetchMetadata> metadata; + + public static final class FetchMetadata { + + private final long receivedTimestamp; + private final long beginningOffset; + private final SubscriptionState.FetchPosition position; + private final long endOffset; + + public FetchMetadata(final long receivedTimestamp, + final SubscriptionState.FetchPosition position, + final long beginningOffset, + final long endOffset) { + this.receivedTimestamp = receivedTimestamp; + this.position = position; + this.beginningOffset = beginningOffset; + this.endOffset = endOffset; + } + + public long receivedTimestamp() { + return receivedTimestamp; + } + + public SubscriptionState.FetchPosition position() { + return position; + } + + public long beginningOffset() { + return beginningOffset; + } + + public long endOffset() { + return endOffset; + } + } + + public FetchedRecords() { + records = new HashMap<>(); + metadata = new HashMap<>(); + } + + public void addRecords(final TopicPartition topicPartition, final List<ConsumerRecord<K, V>> records) { + if (this.records.containsKey(topicPartition)) { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + final List<ConsumerRecord<K, V>> currentRecords = this.records.get(topicPartition); + final List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + this.records.put(topicPartition, newRecords); + } else { + this.records.put(topicPartition, records); + } + } + + public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() { + return records; + } + + public void addMetadata(final TopicPartition partition, final FetchMetadata fetchMetadata) { + metadata.put(partition, fetchMetadata); + } + + public Map<TopicPartition, FetchMetadata> metadata() { + return metadata; + } + + public boolean isEmpty() { + return records.isEmpty() && metadata.isEmpty(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d5dbf22..9c235cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -319,7 +319,7 @@ public class Fetcher<K, V> implements Closeable { short responseVersion = resp.requestHeader().apiVersion(); completedFetches.add(new CompletedFetch(partition, partitionData, - metricAggregator, batches, fetchOffset, responseVersion)); + metricAggregator, batches, fetchOffset, responseVersion, resp.receivedTimeMs())); } } @@ -598,8 +598,8 @@ public class Fetcher<K, V> implements Closeable { * the defaultResetPolicy is NONE * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse. */ - public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { - Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>(); + public FetchedRecords<K, V> fetchedRecords() { + FetchedRecords<K, V> fetched = new FetchedRecords<>(); Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>(); int recordsRemaining = maxPollRecords; @@ -637,20 +637,42 @@ public class Fetcher<K, V> implements Closeable { } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TopicPartition partition = nextInLineFetch.partition; - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + TopicPartition partition = nextInLineFetch.partition; + + if (subscriptions.isAssigned(partition)) { + final long receivedTimestamp = nextInLineFetch.receivedTimestamp; + + final long startOffset = nextInLineFetch.partitionData.logStartOffset(); + + // read_uncommitted: + //that is, the offset of the last successfully replicated message plus one + final long hwm = nextInLineFetch.partitionData.highWatermark(); + // read_committed: + //the minimum of the high watermark and the smallest offset of any open transaction + final long lso = nextInLineFetch.partitionData.lastStableOffset(); + + // end offset is: + final long endOffset = + isolationLevel == IsolationLevel.READ_UNCOMMITTED ? hwm : lso; + + final FetchPosition fetchPosition = subscriptions.position(partition); + + final FetchedRecords.FetchMetadata fetchMetadata = fetched.metadata().get(partition); + + if (fetchMetadata == null + || !fetchMetadata.position().offsetEpoch.isPresent() + || fetchPosition.offsetEpoch.isPresent() + && fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) { + + fetched.addMetadata( + partition, + new FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, startOffset, endOffset) + ); } + } + + if (!records.isEmpty()) { + fetched.addRecords(partition, records); recordsRemaining -= records.size(); } } @@ -1459,6 +1481,7 @@ public class Fetcher<K, V> implements Closeable { private final FetchResponse.PartitionData<Records> partitionData; private final FetchResponseMetricAggregator metricAggregator; private final short responseVersion; + private final long receivedTimestamp; private int recordsRead; private int bytesRead; @@ -1477,13 +1500,15 @@ public class Fetcher<K, V> implements Closeable { FetchResponseMetricAggregator metricAggregator, Iterator<? extends RecordBatch> batches, Long fetchOffset, - short responseVersion) { + short responseVersion, + final long receivedTimestamp) { this.partition = partition; this.partitionData = partitionData; this.metricAggregator = metricAggregator; this.batches = batches; this.nextFetchOffset = fetchOffset; this.responseVersion = responseVersion; + this.receivedTimestamp = receivedTimestamp; this.lastEpoch = Optional.empty(); this.abortedProducerIds = new HashSet<>(); this.abortedTransactions = abortedTransactions(partitionData); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index b8f9e4c..c71c19b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -524,7 +524,7 @@ public class FetcherTest { consumerClient.poll(time.timer(0)); // the first fetchedRecords() should return the first valid message - assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); + assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size()); assertEquals(1, subscriptions.position(tp0).offset); ensureBlockOnRecord(1L); @@ -928,7 +928,7 @@ public class FetcherTest { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); - assertNull(fetcher.fetchedRecords().get(tp0)); + assertNull(fetcher.fetchedRecords().records().get(tp0)); } @Test @@ -1117,7 +1117,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); } @@ -1130,7 +1130,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); } @@ -1144,7 +1144,7 @@ public class FetcherTest { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.FENCED_LEADER_EPOCH, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals("Should not return any records", 0, fetcher.fetchedRecords().size()); + assertEquals("Should not return any records", 0, fetcher.fetchedRecords().records().size()); assertEquals("Should have requested metadata update", 0L, metadata.timeToNextUpdate(time.milliseconds())); } @@ -1158,7 +1158,7 @@ public class FetcherTest { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.UNKNOWN_LEADER_EPOCH, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals("Should not return any records", 0, fetcher.fetchedRecords().size()); + assertEquals("Should not return any records", 0, fetcher.fetchedRecords().records().size()); assertNotEquals("Should not have requested metadata update", 0L, metadata.timeToNextUpdate(time.milliseconds())); } @@ -1200,7 +1200,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); assertTrue(subscriptions.isOffsetResetNeeded(tp0)); assertNull(subscriptions.validPosition(tp0)); assertNull(subscriptions.position(tp0)); @@ -1218,7 +1218,7 @@ public class FetcherTest { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); subscriptions.seek(tp0, 1); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertEquals(1, subscriptions.position(tp0).offset); } @@ -1236,7 +1236,7 @@ public class FetcherTest { consumerClient.poll(time.timer(0)); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); subscriptions.seek(tp0, 2); - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); } @Test @@ -1392,7 +1392,7 @@ public class FetcherTest { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(2, fetcher.fetchedRecords().get(tp0).size()); + assertEquals(2, fetcher.fetchedRecords().records().get(tp0).size()); subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); @@ -1403,11 +1403,11 @@ public class FetcherTest { FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY)); client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID)); consumerClient.poll(time.timer(0)); - assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); + assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size()); subscriptions.seek(tp1, 10); // Should not throw OffsetOutOfRangeException after the seek - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); } @Test @@ -1420,7 +1420,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0), true); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().size()); + assertEquals(0, fetcher.fetchedRecords().records().size()); // disconnects should have no affect on subscription state assertFalse(subscriptions.isOffsetResetNeeded(tp0)); @@ -4513,7 +4513,7 @@ public class FetcherTest { @SuppressWarnings("unchecked") private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() { - return (Map) fetcher.fetchedRecords(); + return (Map) fetcher.fetchedRecords().records(); } private void buildFetcher(int maxPollRecords) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 81d3db9..a72f0fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1582,7 +1582,7 @@ public class StreamTaskTest { task.postCommit(true); EasyMock.verify(stateManager); } - + @Test public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() { EasyMock.expect(stateManager.changelogOffsets())
