kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1358693714


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -0,0 +1,3573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.test.TestUtils.assertOptional;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class FetchRequestManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private ConsumerRebalanceListener listener = new 
NoOpConsumerRebalanceListener();
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private Uuid topicId = Uuid.randomUuid();
+    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() {
+        {
+            put(topicName, topicId);
+        }
+    };
+    private Map<Uuid, String> topicNames = singletonMap(topicId, topicName);
+    private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
+    private TopicPartition tp2 = new TopicPartition(topicName, 2);
+    private TopicPartition tp3 = new TopicPartition(topicName, 3);
+    private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0);
+    private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1);
+    private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2);
+    private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3);
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+            RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 
4), topicIds);
+
+    private int minBytes = 1;
+    private int maxBytes = Integer.MAX_VALUE;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 30000;
+    private MockTime time = new MockTime(1);
+    private SubscriptionState subscriptions;
+    private ConsumerMetadata metadata;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+    private MockClient client;
+    private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
+    private TestableFetchRequestManager<?, ?> fetcher;
+    private TestableNetworkClientDelegate networkClientDelegate;
+    private OffsetFetcher offsetFetcher;
+
+    private MemoryRecords records;
+    private MemoryRecords nextRecords;
+    private MemoryRecords emptyRecords;
+    private MemoryRecords partialRecords;
+
+    @BeforeEach
+    public void setup() {
+        records = buildRecords(1L, 3, 1);
+        nextRecords = buildRecords(4L, 2, 4);
+        emptyRecords = buildRecords(0L, 0, 0);
+        partialRecords = buildRecords(4L, 1, 0);
+        partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000);
+    }
+
+    private void assignFromUser(Set<TopicPartition> partitions) {
+        subscriptions.assignFromUser(partitions);
+        client.updateMetadata(initialUpdateResponse);
+
+        // A dummy metadata update to ensure valid leader epoch.
+        
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
+                Collections.emptyMap(), singletonMap(topicName, 4),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    private void assignFromUser(TopicPartition partition) {
+        subscriptions.assignFromUser(singleton(partition));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singletonMap(partition.topic(), 1), Collections.emptyMap()));
+
+        // A dummy metadata update to ensure valid leader epoch.
+        metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1,
+                Collections.emptyMap(), singletonMap(partition.topic(), 1),
+                tp -> validLeaderEpoch, topicIds), false, 0L);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        if (metrics != null)
+            metrics.close();
+        if (fetcher != null)
+            fetcher.close();
+    }
+
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();
+        return fetcher.sendFetches();
+    }
+
+    @Test
+    public void testFetchNormal() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp0);
+        assertEquals(3, records.size());
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testInflightFetchOnPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        subscriptions.markPendingRevocation(singleton(tp0));
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertNull(fetchedRecords().get(tp0));
+    }
+
+    @Test
+    public void testCloseShouldBeIdempotent() {
+        buildFetcher();
+
+        fetcher.close();
+        fetcher.close();
+        fetcher.close();
+
+        verify(fetcher, times(1)).closeInternal(any(Timer.class));
+    }
+
+    @Test
+    public void testFetcherCloseClosesFetchSessionsInBroker() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        final FetchResponse fetchResponse = fullFetchResponse(tidp0, records, 
Errors.NONE, 100L, 0);
+        client.prepareResponse(fetchResponse);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        assertEquals(0, networkClientDelegate.pendingRequestCount());
+
+        final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument = 
ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class);
+
+        // send request to close the fetcher
+        Timer timer = time.timer(Duration.ofSeconds(10));
+        // fetcher.close(timer);
+        //
+        // NOTE: by design the FetchRequestManager doesn't perform network I/O 
internally. That means that calling
+        // close with a Timer will NOT send out the close session requests on 
close. The network I/O logic is
+        // handled inside ConsumerNetworkThread.runAtClose, so we need to run 
that logic here.
+        ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate, timer);
+
+        NetworkClientDelegate.PollResult pollResult = 
fetcher.poll(time.milliseconds());
+        networkClientDelegate.addAll(pollResult.unsentRequests);
+        networkClientDelegate.poll(timer);
+
+        // validate that Fetcher.close() has sent a request with final epoch. 
2 requests are sent, one for the normal
+        // fetch earlier and another for the finish fetch here.
+        verify(networkClientDelegate, times(2)).doSend(argument.capture(), 
any(Long.class));
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
argument.getValue();
+        FetchRequest.Builder builder = (FetchRequest.Builder) 
unsentRequest.requestBuilder();
+        // session Id is the same
+        assertEquals(fetchResponse.sessionId(), 
builder.metadata().sessionId());
+        // contains final epoch
+        assertEquals(FetchMetadata.FINAL_EPOCH, builder.metadata().epoch());  
// final epoch indicates we want to close the session
+        assertTrue(builder.fetchData().isEmpty()); // partition data should be 
empty
+    }
+
+    @Test
+    public void testFetchingPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+        // mark partition unfetchable
+        subscriptions.markPendingRevocation(singleton(tp0));
+        assertEquals(0, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        assertFalse(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset);
+    }
+
+    @Test
+    public void testFetchWithNoTopicId() {
+        // Should work and default to using old request type.
+        buildFetcher();
+
+        TopicIdPartition noId = new TopicIdPartition(Uuid.ZERO_UUID, new 
TopicPartition("noId", 0));
+        assignFromUser(noId.topicPartition());
+        subscriptions.seek(noId.topicPartition(), 0);
+
+        // Fetch should use request version 12
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12, noId, 0, 
Optional.of(validLeaderEpoch)),
+                fullFetchResponse(noId, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(noId.topicPartition()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(noId.topicPartition());
+        assertEquals(3, records.size());
+        assertEquals(4L, 
subscriptions.position(noId.topicPartition()).offset); // this is the next 
fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testFetchWithTopicId() {
+        buildFetcher();
+
+        TopicIdPartition tp = new TopicIdPartition(topicId, new 
TopicPartition(topicName, 0));
+        assignFromUser(singleton(tp.topicPartition()));
+        subscriptions.seek(tp.topicPartition(), 0);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Fetch should use latest version
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), tp, 0, 
Optional.of(validLeaderEpoch)),
+                fullFetchResponse(tp, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp.topicPartition()));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp.topicPartition());
+        assertEquals(3, records.size());
+        assertEquals(4L, subscriptions.position(tp.topicPartition()).offset); 
// this is the next fetching position
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testFetchForgetTopicIdWhenUnassigned() {
+        buildFetcher();
+
+        TopicIdPartition foo = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition bar = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0));
+
+        // Assign foo and bar.
+        subscriptions.assignFromUser(singleton(foo.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(foo), tp -> validLeaderEpoch));
+        subscriptions.seek(foo.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(foo, new PartitionData(
+                                foo.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, foo, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Assign bar and unassign foo.
+        subscriptions.assignFromUser(singleton(bar.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(bar), tp -> validLeaderEpoch));
+        subscriptions.seek(bar.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(bar, new PartitionData(
+                                bar.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        singletonList(foo)
+                ),
+                fullFetchResponse(1, bar, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+    }
+
+    @Test
+    public void testFetchForgetTopicIdWhenReplaced() {
+        buildFetcher();
+
+        TopicIdPartition fooWithOldTopicId = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+        TopicIdPartition fooWithNewTopicId = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+
+        // Assign foo with old topic id.
+        
subscriptions.assignFromUser(singleton(fooWithOldTopicId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithOldTopicId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithOldTopicId.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(fooWithOldTopicId, new PartitionData(
+                                fooWithOldTopicId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithOldTopicId, records, Errors.NONE, 
100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Replace foo with old topic id with foo with new topic id.
+        
subscriptions.assignFromUser(singleton(fooWithNewTopicId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithNewTopicId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithNewTopicId.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // foo with old topic id should be removed from the session.
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(fooWithNewTopicId, new PartitionData(
+                                fooWithNewTopicId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        singletonList(fooWithOldTopicId)
+                ),
+                fullFetchResponse(1, fooWithNewTopicId, records, Errors.NONE, 
100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+    }
+
+    @Test
+    public void testFetchTopicIdUpgradeDowngrade() {
+        buildFetcher();
+
+        TopicIdPartition fooWithoutId = new TopicIdPartition(Uuid.ZERO_UUID, 
new TopicPartition("foo", 0));
+        TopicIdPartition fooWithId = new TopicIdPartition(Uuid.randomUuid(), 
new TopicPartition("foo", 0));
+
+        // Assign foo without a topic id.
+        subscriptions.assignFromUser(singleton(fooWithoutId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithoutId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithoutId.topicPartition(), 0);
+
+        // Fetch should use version 12.
+        assertEquals(1, sendFetches());
+
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12,
+                        singletonMap(fooWithoutId, new PartitionData(
+                                fooWithoutId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithoutId, records, Errors.NONE, 100L, 
0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Upgrade.
+        subscriptions.assignFromUser(singleton(fooWithId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithId.topicPartition(), 0);
+
+        // Fetch should use latest version.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // foo with old topic id should be removed from the session.
+        client.prepareResponse(
+                fetchRequestMatcher(ApiKeys.FETCH.latestVersion(),
+                        singletonMap(fooWithId, new PartitionData(
+                                fooWithId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithId, records, Errors.NONE, 100L, 0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+
+        // Downgrade.
+        subscriptions.assignFromUser(singleton(fooWithoutId.topicPartition()));
+        client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, 
singleton(fooWithoutId), tp -> validLeaderEpoch));
+        subscriptions.seek(fooWithoutId.topicPartition(), 0);
+
+        // Fetch should use version 12.
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // foo with old topic id should be removed from the session.
+        client.prepareResponse(
+                fetchRequestMatcher((short) 12,
+                        singletonMap(fooWithoutId, new PartitionData(
+                                fooWithoutId.topicId(),
+                                0,
+                                FetchRequest.INVALID_LOG_START_OFFSET,
+                                fetchSize,
+                                Optional.of(validLeaderEpoch))
+                        ),
+                        emptyList()
+                ),
+                fullFetchResponse(1, fooWithoutId, records, Errors.NONE, 100L, 
0)
+        );
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+    }
+
+    private MockClient.RequestMatcher fetchRequestMatcher(
+            short expectedVersion,
+            TopicIdPartition tp,
+            long expectedFetchOffset,
+            Optional<Integer> expectedCurrentLeaderEpoch
+    ) {
+        return fetchRequestMatcher(
+                expectedVersion,
+                singletonMap(tp, new PartitionData(
+                        tp.topicId(),
+                        expectedFetchOffset,
+                        FetchRequest.INVALID_LOG_START_OFFSET,
+                        fetchSize,
+                        expectedCurrentLeaderEpoch
+                )),
+                emptyList()
+        );
+    }
+
+    private MockClient.RequestMatcher fetchRequestMatcher(
+            short expectedVersion,
+            Map<TopicIdPartition, PartitionData> fetch,
+            List<TopicIdPartition> forgotten
+    ) {
+        return body -> {
+            if (body instanceof FetchRequest) {
+                FetchRequest fetchRequest = (FetchRequest) body;
+                assertEquals(expectedVersion, fetchRequest.version());
+                assertEquals(fetch, fetchRequest.fetchData(topicNames(new 
ArrayList<>(fetch.keySet()))));
+                assertEquals(forgotten, 
fetchRequest.forgottenTopics(topicNames(forgotten)));
+                return true;
+            } else {
+                fail("Should have seen FetchRequest");
+                return false;
+            }
+        };
+    }
+
+    private Map<Uuid, String> topicNames(List<TopicIdPartition> partitions) {
+        Map<Uuid, String> topicNames = new HashMap<>();
+        partitions.forEach(partition -> 
topicNames.putIfAbsent(partition.topicId(), partition.topic()));
+        return topicNames;
+    }
+
+    @Test
+    public void testMissingLeaderEpochInRecords() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V0,
+                CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 
System.currentTimeMillis(),
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
+        builder.append(0L, "key".getBytes(), "1".getBytes());
+        builder.append(0L, "key".getBytes(), "2".getBytes());
+        MemoryRecords records = builder.build();
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertEquals(2, partitionRecords.get(tp0).size());
+
+        for (ConsumerRecord<byte[], byte[]> record : 
partitionRecords.get(tp0)) {
+            assertEquals(Optional.empty(), record.leaderEpoch());
+        }
+    }
+
+    @Test
+    public void testLeaderEpochInConsumerRecord() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        Integer partitionLeaderEpoch = 1;
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 
System.currentTimeMillis(),
+                partitionLeaderEpoch);
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.close();
+
+        partitionLeaderEpoch += 7;
+
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 2L, System.currentTimeMillis(), 
partitionLeaderEpoch);
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.close();
+
+        partitionLeaderEpoch += 5;
+        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 3L, System.currentTimeMillis(), 
partitionLeaderEpoch);
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.append(0L, "key".getBytes(), 
partitionLeaderEpoch.toString().getBytes());
+        builder.close();
+
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertEquals(6, partitionRecords.get(tp0).size());
+
+        for (ConsumerRecord<byte[], byte[]> record : 
partitionRecords.get(tp0)) {
+            int expectedLeaderEpoch = 
Integer.parseInt(Utils.utf8(record.value()));
+            assertEquals(Optional.of(expectedLeaderEpoch), 
record.leaderEpoch());
+        }
+    }
+
+    @Test
+    public void testClearBufferedDataForTopicPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, Errors.NONE, 
100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Set<TopicPartition> newAssignedTopicPartitions = new HashSet<>();
+        newAssignedTopicPartitions.add(tp1);
+
+        
fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
+        assertFalse(fetcher.hasCompletedFetches());
+    }
+
+    @Test
+    public void testFetchSkipsBlackedOutNodes() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+        Node node = initialUpdateResponse.brokers().iterator().next();
+
+        client.backoff(node, 500);
+        assertEquals(0, sendFetches());
+
+        time.sleep(500);
+        assertEquals(1, sendFetches());
+    }
+
+    @Test
+    public void testFetcherIgnoresControlRecords() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        long producerId = 1;
+        short producerEpoch = 0;
+        int baseSequence = 0;
+        int partitionLeaderEpoch = 0;
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder(buffer, 
CompressionType.NONE, 0L, producerId,
+                producerEpoch, baseSequence);
+        builder.append(0L, "key".getBytes(), null);
+        builder.close();
+
+        MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 
time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
+                new EndTransactionMarker(ControlRecordType.ABORT, 0));
+
+        buffer.flip();
+
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        List<ConsumerRecord<byte[], byte[]>> records = 
partitionRecords.get(tp0);
+        assertEquals(1, records.size());
+        assertEquals(2L, subscriptions.position(tp0).offset);
+
+        ConsumerRecord<byte[], byte[]> record = records.get(0);
+        assertArrayEquals("key".getBytes(), record.key());
+    }
+
+    @Test
+    public void testFetchError() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
+        assertFalse(partitionRecords.containsKey(tp0));
+    }
+
+    private MockClient.RequestMatcher matchesOffset(final TopicIdPartition tp, 
final long offset) {
+        return body -> {
+            FetchRequest fetch = (FetchRequest) body;
+            Map<TopicIdPartition, FetchRequest.PartitionData> fetchData = 
fetch.fetchData(topicNames);
+            return fetchData.containsKey(tp) &&
+                    fetchData.get(tp).fetchOffset == offset;
+        };
+    }
+
+    @Test
+    public void testFetchedRecordsRaisesOnSerializationErrors() {
+        // raise an exception from somewhere in the middle of the fetch 
response
+        // so that we can verify that our position does not advance after 
raising
+        ByteArrayDeserializer deserializer = new ByteArrayDeserializer() {
+            int i = 0;
+            @Override
+            public byte[] deserialize(String topic, byte[] data) {
+                if (i++ % 2 == 1) {
+                    // Should be blocked on the value deserialization of the 
first record.
+                    assertEquals("value-1", new String(data, 
StandardCharsets.UTF_8));
+                    throw new SerializationException();
+                }
+                return data;
+            }
+        };
+
+        buildFetcher(deserializer, deserializer);
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
+
+        client.prepareResponse(matchesOffset(tidp0, 1), 
fullFetchResponse(tidp0, records, Errors.NONE, 100L, 0));
+
+        assertEquals(1, sendFetches());
+        networkClientDelegate.poll(time.timer(0));
+        // The fetcher should block on Deserialization error
+        for (int i = 0; i < 2; i++) {
+            try {
+                fetcher.collectFetch();
+                fail("fetchedRecords should have raised");
+            } catch (SerializationException e) {
+                // the position should not advance since no data has been 
returned
+                assertEquals(1, subscriptions.position(tp0).offset);
+            }
+        }
+    }
+
+    @Test
+    public void testParseCorruptedRecord() throws Exception {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        DataOutputStream out = new DataOutputStream(new 
ByteBufferOutputStream(buffer));
+
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
+        byte[] key = "foo".getBytes();
+        byte[] value = "baz".getBytes();
+        long offset = 0;
+        long timestamp = 500L;
+
+        int size = LegacyRecord.recordSize(magic, key.length, value.length);
+        byte attributes = LegacyRecord.computeAttributes(magic, 
CompressionType.NONE, TimestampType.CREATE_TIME);
+        long crc = LegacyRecord.computeChecksum(magic, attributes, timestamp, 
key, value);
+
+        // write one valid record
+        out.writeLong(offset);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        // and one invalid record (note the crc)
+        out.writeLong(offset + 1);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc + 1, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        // write one valid record
+        out.writeLong(offset + 2);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        // Write a record whose size field is invalid.
+        out.writeLong(offset + 3);
+        out.writeInt(1);
+
+        // write one valid record
+        out.writeLong(offset + 4);
+        out.writeInt(size);
+        LegacyRecord.write(out, magic, crc, 
LegacyRecord.computeAttributes(magic, CompressionType.NONE, 
TimestampType.CREATE_TIME), timestamp, key, value);
+
+        buffer.flip();
+
+        subscriptions.seekUnvalidated(tp0, new 
SubscriptionState.FetchPosition(0, Optional.empty(), 
metadata.currentLeader(tp0)));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, 
MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        networkClientDelegate.poll(time.timer(0));
+
+        // the first fetchedRecords() should return the first valid message
+        assertEquals(1, fetchedRecords().get(tp0).size());
+        assertEquals(1, subscriptions.position(tp0).offset);
+
+        ensureBlockOnRecord(1L);
+        seekAndConsumeRecord(buffer, 2L);
+        ensureBlockOnRecord(3L);
+        try {
+            // For a record that cannot be retrieved from the iterator, we 
cannot seek over it within the batch.
+            seekAndConsumeRecord(buffer, 4L);
+            fail("Should have thrown exception when fail to retrieve a record 
from iterator.");
+        } catch (KafkaException ke) {
+            // let it go
+        }
+        ensureBlockOnRecord(4L);
+    }
+
+    private void ensureBlockOnRecord(long blockedOffset) {
+        // the fetchedRecords() should always throw exception due to the 
invalid message at the starting offset.

Review Comment:
   `fetchedRecords()` calls `fetcher.collectFetch()`, so it's the other way 
around. I updated the call sites and changed `fetchedRecords()` to just 
`fetchRecords()` so that it's hopefully a little more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to