junrao commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1334821243
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -0,0 +1,3565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FetchRequestManagerTest { + + private static final double EPSILON = 0.0001; + + private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener(); + private String topicName = "test"; + private String groupId = "test-group"; + private Uuid topicId = Uuid.randomUuid(); + private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() { + { + put(topicName, topicId); + } + }; + private Map<Uuid, String> topicNames = singletonMap(topicId, topicName); + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; + private TopicPartition tp0 = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); + private TopicPartition tp2 = new TopicPartition(topicName, 2); + private TopicPartition tp3 = new TopicPartition(topicName, 3); + private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0); + private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1); + private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2); + private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3); + private int validLeaderEpoch = 0; + private MetadataResponse initialUpdateResponse = + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 4), topicIds); + + private int minBytes = 1; + private int maxBytes = Integer.MAX_VALUE; + private int maxWaitMs = 0; + private int fetchSize = 1000; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 30000; + private MockTime time = new MockTime(1); + private SubscriptionState subscriptions; + private ConsumerMetadata metadata; + private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; + private MockClient client; + private Metrics metrics; + private ApiVersions apiVersions = new ApiVersions(); + private ConsumerNetworkClient oldConsumerClient; + private TestableFetchRequestManager<?, ?> fetcher; + private TestableNetworkClientDelegate consumerClient; + private OffsetFetcher offsetFetcher; + + private MemoryRecords records; + private MemoryRecords nextRecords; + private MemoryRecords emptyRecords; + private MemoryRecords partialRecords; + + @BeforeEach + public void setup() { + records = buildRecords(1L, 3, 1); + nextRecords = buildRecords(4L, 2, 4); + emptyRecords = buildRecords(0L, 0, 0); + partialRecords = buildRecords(4L, 1, 0); + partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000); + } + + private void assignFromUser(Set<TopicPartition> partitions) { + subscriptions.assignFromUser(partitions); + client.updateMetadata(initialUpdateResponse); + + // A dummy metadata update to ensure valid leader epoch. + metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, + Collections.emptyMap(), singletonMap(topicName, 4), + tp -> validLeaderEpoch, topicIds), false, 0L); + } + + private void assignFromUserNoId(Set<TopicPartition> partitions) { Review Comment: The NoId part is a bit confusing. assignFromUserSingleTopic? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the + * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition + * assignment. + */ +public class FetchRequestManager extends AbstractFetch implements RequestManager { + + private final Logger log; + private final BackgroundEventHandler backgroundEventHandler; + private final NetworkClientDelegate networkClientDelegate; + private final List<CompletableFuture<Queue<CompletedFetch>>> futures; + + FetchRequestManager(final LogContext logContext, + final Time time, + final BackgroundEventHandler backgroundEventHandler, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final FetchMetricsManager metricsManager, + final NetworkClientDelegate networkClientDelegate) { + super(logContext, metadata, subscriptions, fetchConfig, metricsManager, time); + this.log = logContext.logger(FetchRequestManager.class); + this.backgroundEventHandler = backgroundEventHandler; + this.networkClientDelegate = networkClientDelegate; + this.futures = new ArrayList<>(); + } + + @Override + protected boolean isUnavailable(Node node) { + return networkClientDelegate.isUnavailable(node); + } + + @Override + protected void maybeThrowAuthFailure(Node node) { + networkClientDelegate.maybeThrowAuthFailure(node); + } + + /** + * Adds a new {@link Future future} to the list of futures awaiting results. Per the comments on + * {@link #forwardResults()}, there is no guarantee that this particular future will be provided with + * a non-empty result, but it is guaranteed to be completed with a result, assuming that it does not time out. + * + * @param future Future that will be {@link CompletableFuture#complete(Object) completed} if not timed out + */ + public void requestFetch(CompletableFuture<Queue<CompletedFetch>> future) { + futures.add(future); + } + + @Override + public PollResult poll(long currentTimeMs) { Review Comment: Could we describe what `poll` does? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -0,0 +1,3565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FetchRequestManagerTest { + + private static final double EPSILON = 0.0001; + + private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener(); + private String topicName = "test"; + private String groupId = "test-group"; + private Uuid topicId = Uuid.randomUuid(); + private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() { + { + put(topicName, topicId); + } + }; + private Map<Uuid, String> topicNames = singletonMap(topicId, topicName); + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; + private TopicPartition tp0 = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); + private TopicPartition tp2 = new TopicPartition(topicName, 2); + private TopicPartition tp3 = new TopicPartition(topicName, 3); + private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0); + private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1); + private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2); + private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3); + private int validLeaderEpoch = 0; + private MetadataResponse initialUpdateResponse = + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 4), topicIds); + + private int minBytes = 1; + private int maxBytes = Integer.MAX_VALUE; + private int maxWaitMs = 0; + private int fetchSize = 1000; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 30000; + private MockTime time = new MockTime(1); + private SubscriptionState subscriptions; + private ConsumerMetadata metadata; + private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; + private MockClient client; + private Metrics metrics; + private ApiVersions apiVersions = new ApiVersions(); + private ConsumerNetworkClient oldConsumerClient; + private TestableFetchRequestManager<?, ?> fetcher; + private TestableNetworkClientDelegate consumerClient; + private OffsetFetcher offsetFetcher; + + private MemoryRecords records; + private MemoryRecords nextRecords; + private MemoryRecords emptyRecords; + private MemoryRecords partialRecords; + + @BeforeEach + public void setup() { + records = buildRecords(1L, 3, 1); + nextRecords = buildRecords(4L, 2, 4); + emptyRecords = buildRecords(0L, 0, 0); + partialRecords = buildRecords(4L, 1, 0); + partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000); + } + + private void assignFromUser(Set<TopicPartition> partitions) { + subscriptions.assignFromUser(partitions); + client.updateMetadata(initialUpdateResponse); + + // A dummy metadata update to ensure valid leader epoch. + metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, + Collections.emptyMap(), singletonMap(topicName, 4), + tp -> validLeaderEpoch, topicIds), false, 0L); + } + + private void assignFromUserNoId(Set<TopicPartition> partitions) { + subscriptions.assignFromUser(partitions); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singletonMap("noId", 1), Collections.emptyMap())); + + // A dummy metadata update to ensure valid leader epoch. + metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1, + Collections.emptyMap(), singletonMap("noId", 1), + tp -> validLeaderEpoch, topicIds), false, 0L); + } + + @AfterEach + public void teardown() throws Exception { + if (metrics != null) + this.metrics.close(); + if (fetcher != null) + this.fetcher.close(); + } + + private int sendFetches() { + offsetFetcher.validatePositionsOnMetadataChange(); + return fetcher.sendFetches(); + } + + @Test + public void testFetchNormal() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0)); Review Comment: Could we be consistent with the use of `this`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -0,0 +1,3565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FetchRequestManagerTest { + + private static final double EPSILON = 0.0001; + + private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener(); + private String topicName = "test"; + private String groupId = "test-group"; + private Uuid topicId = Uuid.randomUuid(); + private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() { + { + put(topicName, topicId); + } + }; + private Map<Uuid, String> topicNames = singletonMap(topicId, topicName); + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; + private TopicPartition tp0 = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); + private TopicPartition tp2 = new TopicPartition(topicName, 2); + private TopicPartition tp3 = new TopicPartition(topicName, 3); + private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0); + private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1); + private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2); + private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3); + private int validLeaderEpoch = 0; + private MetadataResponse initialUpdateResponse = + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 4), topicIds); + + private int minBytes = 1; + private int maxBytes = Integer.MAX_VALUE; + private int maxWaitMs = 0; + private int fetchSize = 1000; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 30000; + private MockTime time = new MockTime(1); + private SubscriptionState subscriptions; + private ConsumerMetadata metadata; + private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; + private MockClient client; + private Metrics metrics; + private ApiVersions apiVersions = new ApiVersions(); + private ConsumerNetworkClient oldConsumerClient; + private TestableFetchRequestManager<?, ?> fetcher; + private TestableNetworkClientDelegate consumerClient; + private OffsetFetcher offsetFetcher; + + private MemoryRecords records; + private MemoryRecords nextRecords; + private MemoryRecords emptyRecords; + private MemoryRecords partialRecords; + + @BeforeEach + public void setup() { + records = buildRecords(1L, 3, 1); + nextRecords = buildRecords(4L, 2, 4); + emptyRecords = buildRecords(0L, 0, 0); + partialRecords = buildRecords(4L, 1, 0); + partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000); + } + + private void assignFromUser(Set<TopicPartition> partitions) { + subscriptions.assignFromUser(partitions); + client.updateMetadata(initialUpdateResponse); + + // A dummy metadata update to ensure valid leader epoch. + metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Review Comment: `client.updateMetadata `eventually calls `metadata.updateWithCurrentRequestVersion`. So, not sure why we are updating cluster metadata twice with different values. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -693,15 +1015,14 @@ private boolean refreshCommittedOffsetsIfNeeded(Timer timer) { log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); try { - final Map<TopicPartition, OffsetAndMetadata> offsets = eventHandler.addAndGet(new OffsetFetchApplicationEvent(initializingPartitions), timer); + final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(new OffsetFetchApplicationEvent(initializingPartitions), timer); Review Comment: If `initializingPartitions` is empty, do we still send the OffsetFetch request? I didn't see the logic for short-circuiting. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ########## @@ -37,7 +36,7 @@ * * <em>Note</em>: this class is not thread-safe and is intended to only be used from a single thread. */ -public class FetchBuffer implements Closeable { +public class FetchBuffer implements AutoCloseable { Review Comment: The above comment that this class is only used from a single thread seems incorrect. The background thread adds fetched data into `completedFetches` and `PrototypeAsyncConsumer` drains `completedFetches` through `fetchCollector.collectFetch(fetchBuffer)`. There is actually very subtle synchronization between the two threads. Could we document the correct way of using this class and how we achieve synchronization so that future developers don't break it? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -395,12 +523,12 @@ private void maybeThrowInvalidGroupIdException() { @Override public Map<MetricName, ? extends Metric> metrics() { - throw new KafkaException("method not implemented"); + return Collections.unmodifiableMap(this.metrics.metrics()); Review Comment: Is `this` needed? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -0,0 +1,3565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FetchRequestManagerTest { + + private static final double EPSILON = 0.0001; + + private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener(); + private String topicName = "test"; + private String groupId = "test-group"; + private Uuid topicId = Uuid.randomUuid(); + private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() { + { + put(topicName, topicId); + } + }; + private Map<Uuid, String> topicNames = singletonMap(topicId, topicName); + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; + private TopicPartition tp0 = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); + private TopicPartition tp2 = new TopicPartition(topicName, 2); + private TopicPartition tp3 = new TopicPartition(topicName, 3); + private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0); + private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1); + private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2); + private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3); + private int validLeaderEpoch = 0; + private MetadataResponse initialUpdateResponse = + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 4), topicIds); + + private int minBytes = 1; + private int maxBytes = Integer.MAX_VALUE; + private int maxWaitMs = 0; + private int fetchSize = 1000; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 30000; + private MockTime time = new MockTime(1); + private SubscriptionState subscriptions; + private ConsumerMetadata metadata; + private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; + private MockClient client; + private Metrics metrics; + private ApiVersions apiVersions = new ApiVersions(); + private ConsumerNetworkClient oldConsumerClient; + private TestableFetchRequestManager<?, ?> fetcher; + private TestableNetworkClientDelegate consumerClient; Review Comment: consumerClient => networkClientDelegate? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -621,56 +825,174 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); - // assignment change event will trigger autocommit if it is configured and the group id is specified. This is - // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance - eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance + applicationEventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) - eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + /** + * Send the requests for fetch data to the background thread and set up to collect the results in + * {@link #fetchResults}. + */ + private void sendFetches() { + FetchEvent event = new FetchEvent(); + applicationEventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (error != null) + log.warn("An error occurred during poll: {}", error.getMessage(), error); + else + fetchResults.addAll(completedFetches); + }); + } + + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we don't have any waiting data here; it's a 'best case' effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + while (pollTimer.notExpired()) { + CompletedFetch completedFetch = fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS); + + if (completedFetch != null) + fetchBuffer.add(completedFetch); + + pollTimer.update(); + } + } catch (InterruptedException e) { + log.trace("Timeout during fetch", e); + } finally { + timer.update(pollTimer.currentTimeMs()); + } + + return fetchCollector.collectFetch(fetchBuffer); + } + + /** + * Set the fetch position to the committed position (if there is one) + * or reset it using the offset reset policy the user has configured. + * + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + * @return true iff the operation completed without timing out + */ + private boolean updateFetchPositions(final Timer timer) { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + applicationEventHandler.add(new ValidatePositionsApplicationEvent()); + + cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (cachedSubscriptionHasAllFetchPositions) return true; - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + // If there are any partitions which do not have a valid position and are not + // awaiting reset, then we need to fetch committed offsets. We will only do a + // coordinator lookup if there are partitions which have missing positions, so + // a consumer with manually assigned partitions can avoid a coordinator dependence + // by always ensuring that assigned partitions have an initial position. + if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) + return false; + + // If there are partitions still needing a position and a reset policy is defined, + // request reset using the default policy. If no reset strategy is defined and there Review Comment: Earlier, we have "do not have a valid position and are not awaiting reset". The reset there and the reset here mean different things. The former refers to resetting the offset based on OffsetForLeaderEpoch and the latter refers to resetting the offset to either the earliest or latest. It would be useful to make this clear. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -621,56 +825,174 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); - // assignment change event will trigger autocommit if it is configured and the group id is specified. This is - // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance - eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance + applicationEventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) - eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + /** + * Send the requests for fetch data to the background thread and set up to collect the results in + * {@link #fetchResults}. + */ + private void sendFetches() { + FetchEvent event = new FetchEvent(); + applicationEventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (error != null) + log.warn("An error occurred during poll: {}", error.getMessage(), error); + else + fetchResults.addAll(completedFetches); + }); + } + + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we don't have any waiting data here; it's a 'best case' effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + while (pollTimer.notExpired()) { + CompletedFetch completedFetch = fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS); + + if (completedFetch != null) + fetchBuffer.add(completedFetch); Review Comment: Hmm, with this logic, there is a short window between `completedFetch` in `fetchResults` and `completedFetch` be added to back to `fetchBuffer`. During this window, `BackgroundThread` could make a `poll` call, see no buffered data in `fetchBuffer` and fetch the same offset again? Then the consumer could see duplicated data because of this. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -489,20 +623,44 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition if (partitions.isEmpty()) { return Collections.emptyMap(); } - Map<TopicPartition, Long> timestampToSearch = - partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> timestamp)); + Map<TopicPartition, Long> timestampToSearch = partitions + .stream() + .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); final ListOffsetsApplicationEvent listOffsetsEvent = new ListOffsetsApplicationEvent( timestampToSearch, - false); - Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = - eventHandler.addAndGet(listOffsetsEvent, time.timer(timeout)); - return offsetAndTimestampMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), - e -> e.getValue().offset())); + false + ); Review Comment: To be consistent with other places, it seems that we want to combine this with the previous line. Ditto below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -621,56 +825,174 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); - // assignment change event will trigger autocommit if it is configured and the group id is specified. This is - // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance - eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance + applicationEventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) - eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); + applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().isEmpty()) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + /** + * Send the requests for fetch data to the background thread and set up to collect the results in + * {@link #fetchResults}. + */ + private void sendFetches() { + FetchEvent event = new FetchEvent(); + applicationEventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (error != null) + log.warn("An error occurred during poll: {}", error.getMessage(), error); + else + fetchResults.addAll(completedFetches); + }); + } + + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we don't have any waiting data here; it's a 'best case' effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + while (pollTimer.notExpired()) { + CompletedFetch completedFetch = fetchResults.poll(pollTimer.remainingMs(), TimeUnit.MILLISECONDS); + + if (completedFetch != null) + fetchBuffer.add(completedFetch); + + pollTimer.update(); + } + } catch (InterruptedException e) { + log.trace("Timeout during fetch", e); + } finally { + timer.update(pollTimer.currentTimeMs()); + } + + return fetchCollector.collectFetch(fetchBuffer); + } + + /** + * Set the fetch position to the committed position (if there is one) + * or reset it using the offset reset policy the user has configured. + * + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + * @return true iff the operation completed without timing out + */ + private boolean updateFetchPositions(final Timer timer) { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + applicationEventHandler.add(new ValidatePositionsApplicationEvent()); + + cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (cachedSubscriptionHasAllFetchPositions) return true; - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + // If there are any partitions which do not have a valid position and are not + // awaiting reset, then we need to fetch committed offsets. We will only do a + // coordinator lookup if there are partitions which have missing positions, so + // a consumer with manually assigned partitions can avoid a coordinator dependence + // by always ensuring that assigned partitions have an initial position. + if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) Review Comment: It still seems weird that we only use the timer for `refreshCommittedOffsetsIfNeeded`, but not for other cases where we don't have valid fetch positions. For example, if all partitions are in AWAIT_VALIDATION state, it seems that PrototypeAsyncConsumer.poll() will just go in a busy loop, which is not efficient. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -222,58 +99,158 @@ public void run() { } catch (final Throwable t) { log.error("The background thread failed due to unexpected error", t); throw new KafkaException(t); - } finally { - close(); - log.debug("{} closed", getClass()); } } + void initializeResources() { + applicationEventProcessor = applicationEventProcessorSupplier.get(); + networkClientDelegate = networkClientDelegateSupplier.get(); + requestManagers = requestManagersSupplier.get(); + } + /** - * Poll and process an {@link ApplicationEvent}. It performs the following tasks: - * 1. Drains and try to process all the requests in the queue. - * 2. Iterate through the registry, poll, and get the next poll time for the network poll - * 3. Poll the networkClient to send and retrieve the response. + * Poll and process the {@link ApplicationEvent application events}. It performs the following tasks: + * + * <ol> + * <li> + * Drains and processes all the events from the application thread's application event queue via + * {@link ApplicationEventProcessor} + * </li> + * <li> + * Iterate through the {@link RequestManager} list and invoke {@link RequestManager#poll(long)} to get + * the {@link NetworkClientDelegate.UnsentRequest} list and the poll time for the network poll + * </li> + * <li> + * Stage each {@link AbstractRequest.Builder request} to be sent via + * {@link NetworkClientDelegate#addAll(List)} + * </li> + * <li> + * Poll the client via {@link KafkaClient#poll(long, long)} to send the requests, as well as + * retrieve any available responses + * </li> + * </ol> */ void runOnce() { - if (!applicationEventQueue.isEmpty()) { - LinkedList<ApplicationEvent> res = new LinkedList<>(); - this.applicationEventQueue.drainTo(res); - - for (ApplicationEvent event : res) { - log.debug("Consuming application event: {}", event); - Objects.requireNonNull(event); - applicationEventProcessor.process(event); - } - } + // If there are errors processing any events, the error will be thrown immediately. This will have + // the effect of closing the background thread. + applicationEventProcessor.process(); final long currentTimeMs = time.milliseconds(); final long pollWaitTimeMs = requestManagers.entries().stream() .filter(Optional::isPresent) - .map(m -> m.get().poll(currentTimeMs)) - .map(this::handlePollResult) + .map(Optional::get) + .map(rm -> rm.poll(currentTimeMs)) + .map(networkClientDelegate::addAll) .reduce(MAX_POLL_TIMEOUT_MS, Math::min); networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); } - long handlePollResult(NetworkClientDelegate.PollResult res) { - if (!res.unsentRequests.isEmpty()) { - networkClientDelegate.addAll(res.unsentRequests); + /** + * Performs any network I/O that is needed at the time of close for the consumer: + * + * <ol> + * <li> + * Iterate through the {@link RequestManager} list and invoke {@link RequestManager#pollOnClose()} + * to get the {@link NetworkClientDelegate.UnsentRequest} list and the poll time for the network poll + * </li> + * <li> + * Stage each {@link AbstractRequest.Builder request} to be sent via + * {@link NetworkClientDelegate#addAll(List)} + * </li> + * <li> + * {@link KafkaClient#poll(long, long) Poll the client} to send the requests, as well as + * retrieve any available responses + * </li> + * <li> + * Continuously {@link KafkaClient#poll(long, long) poll the client} as long as the + * {@link Timer#notExpired() timer hasn't expired} to retrieve the responses + * </li> + * </ol> + */ + // Visible for testing + static void runAtClose(final Time time, + final Collection<Optional<? extends RequestManager>> requestManagers, + final NetworkClientDelegate networkClientDelegate, + final Timer timer) { + long currentTimeMs = time.milliseconds(); + + // These are the optional outgoing requests at the + List<NetworkClientDelegate.PollResult> pollResults = requestManagers.stream() + .filter(Optional::isPresent) + .map(Optional::get) + .map(RequestManager::pollOnClose) + .collect(Collectors.toList()); + long pollWaitTimeMs = pollResults.stream() + .map(networkClientDelegate::addAll) + .reduce(MAX_POLL_TIMEOUT_MS, Math::min); + pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); + networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); + timer.update(); + + List<Future<?>> requestFutures = pollResults.stream() + .flatMap(fads -> fads.unsentRequests.stream()) + .map(NetworkClientDelegate.UnsentRequest::future) + .collect(Collectors.toList()); + + // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until + // all requests have received a response. + while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { + networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); Review Comment: Instead of using `currentTimeMs`, we need to use `timer.currentTimeMs` to pick up the latest time, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.LogContext; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An {@link EventProcessor} that is created and executes in the application thread for the purpose of processing + * {@link BackgroundEvent background events} generated by {@link DefaultBackgroundThread the background thread}. + * Those events are generally of two types: + * + * <ul> + * <li>Errors that occur in the background thread that need to be propagated to the application thread</li> + * <li>{@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread</li> + * </ul> + */ +public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> { + + public BackgroundEventProcessor(final LogContext logContext, + final BlockingQueue<BackgroundEvent> backgroundEventQueue) { + super(logContext, backgroundEventQueue); + } + + /** + * Process the events—if any—that were produced by the {@link DefaultBackgroundThread background thread}. + * It is possible that when processing the events that a given event will + * {@link ErrorBackgroundEvent represent an error directly}, or it could be that processing an event generates + * an error. In such cases, the processor will continue to process the remaining events. In this case, we + * provide the caller to provide a callback handler that "collects" the errors. We grab the first error that Review Comment: Where is the callback handler? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ########## @@ -222,58 +99,158 @@ public void run() { } catch (final Throwable t) { log.error("The background thread failed due to unexpected error", t); throw new KafkaException(t); - } finally { - close(); - log.debug("{} closed", getClass()); } } + void initializeResources() { + applicationEventProcessor = applicationEventProcessorSupplier.get(); + networkClientDelegate = networkClientDelegateSupplier.get(); + requestManagers = requestManagersSupplier.get(); + } + /** - * Poll and process an {@link ApplicationEvent}. It performs the following tasks: - * 1. Drains and try to process all the requests in the queue. - * 2. Iterate through the registry, poll, and get the next poll time for the network poll - * 3. Poll the networkClient to send and retrieve the response. + * Poll and process the {@link ApplicationEvent application events}. It performs the following tasks: + * + * <ol> + * <li> + * Drains and processes all the events from the application thread's application event queue via + * {@link ApplicationEventProcessor} + * </li> + * <li> + * Iterate through the {@link RequestManager} list and invoke {@link RequestManager#poll(long)} to get + * the {@link NetworkClientDelegate.UnsentRequest} list and the poll time for the network poll + * </li> + * <li> + * Stage each {@link AbstractRequest.Builder request} to be sent via + * {@link NetworkClientDelegate#addAll(List)} + * </li> + * <li> + * Poll the client via {@link KafkaClient#poll(long, long)} to send the requests, as well as + * retrieve any available responses + * </li> + * </ol> */ void runOnce() { - if (!applicationEventQueue.isEmpty()) { - LinkedList<ApplicationEvent> res = new LinkedList<>(); - this.applicationEventQueue.drainTo(res); - - for (ApplicationEvent event : res) { - log.debug("Consuming application event: {}", event); - Objects.requireNonNull(event); - applicationEventProcessor.process(event); - } - } + // If there are errors processing any events, the error will be thrown immediately. This will have + // the effect of closing the background thread. Review Comment: If the background thread dies, should we throw an exception to the user thread on the next `poll` call? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the + * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition + * assignment. + */ +public class FetchRequestManager extends AbstractFetch implements RequestManager { + + private final Logger log; + private final BackgroundEventHandler backgroundEventHandler; + private final NetworkClientDelegate networkClientDelegate; + private final List<CompletableFuture<Queue<CompletedFetch>>> futures; + + FetchRequestManager(final LogContext logContext, + final Time time, + final BackgroundEventHandler backgroundEventHandler, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final FetchMetricsManager metricsManager, + final NetworkClientDelegate networkClientDelegate) { + super(logContext, metadata, subscriptions, fetchConfig, metricsManager, time); + this.log = logContext.logger(FetchRequestManager.class); + this.backgroundEventHandler = backgroundEventHandler; + this.networkClientDelegate = networkClientDelegate; + this.futures = new ArrayList<>(); + } + + @Override + protected boolean isUnavailable(Node node) { + return networkClientDelegate.isUnavailable(node); + } + + @Override + protected void maybeThrowAuthFailure(Node node) { + networkClientDelegate.maybeThrowAuthFailure(node); + } + + /** + * Adds a new {@link Future future} to the list of futures awaiting results. Per the comments on + * {@link #forwardResults()}, there is no guarantee that this particular future will be provided with + * a non-empty result, but it is guaranteed to be completed with a result, assuming that it does not time out. + * + * @param future Future that will be {@link CompletableFuture#complete(Object) completed} if not timed out + */ + public void requestFetch(CompletableFuture<Queue<CompletedFetch>> future) { + futures.add(future); + } + + @Override + public PollResult poll(long currentTimeMs) { + List<UnsentRequest> requests = prepareFetchRequests().entrySet().stream().map(entry -> { + final Node fetchTarget = entry.getKey(); + final FetchSessionHandler.FetchRequestData data = entry.getValue(); + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, t) -> { + if (t != null) { + handleFetchResponse(fetchTarget, t); + log.debug("Attempt to fetch data from node {} failed due to fatal exception", fetchTarget, t); + backgroundEventHandler.add(new ErrorBackgroundEvent(t)); Review Comment: At this stage, we are just propagating connection level errors like DisconnectException, which is retriable. So, it seems that we shouldn't throw this error back to the application. Ditto in `pollOnClose`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception + */ + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + + // Attempt to fetch any data. It's OK if we time out here; it's a best case effort. The + // data may not be immediately available, but the calling method (poll) will correctly + // handle the overall timeout. + try { + Queue<CompletedFetch> completedFetches = eventHandler.addAndGet(new FetchEvent(), pollTimer); + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + } catch (TimeoutException e) { + log.trace("Timeout during fetch", e); + } finally { + timer.update(pollTimer.currentTimeMs()); + } - clusterResourceListeners.maybeAdd(keyDeserializer); - clusterResourceListeners.maybeAdd(valueDeserializer); - return clusterResourceListeners; + return fetchCollector.collectFetch(fetchBuffer); + } + + /** + * Set the fetch position to the committed position (if there is one) + * or reset it using the offset reset policy the user has configured. + * + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + * @return true iff the operation completed without timing out + */ + private boolean updateFetchPositions(final Timer timer) { + // If any partitions have been truncated due to a leader change, we need to validate the offsets + eventHandler.add(new ValidatePositionsApplicationEvent()); Review Comment: @lianetm : Thanks for the reply. That's my understanding too. My question was what happens before the new metadata is received. Will the consumer continuously fetch from the older leader? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -622,12 +837,18 @@ public void assign(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } - // TODO: implementation of refactored Fetcher will be included in forthcoming commits. - // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + // Clear the buffered data which are not a part of newly assigned topics + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (partitions.contains(tp)) + currentTopicPartitions.add(tp); + } + + fetchBuffer.retainAll(currentTopicPartitions); - // assignment change event will trigger autocommit if it is configured and the group id is specified. This is - // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will - // be no following rebalance + // make sure the offsets of topic partitions the consumer is unsubscribing from + // are committed since there will be no following rebalance Review Comment: Was this comment addressed? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -636,42 +857,148 @@ public void assign(Collection<TopicPartition> partitions) { } @Override - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { - throw new KafkaException("method not implemented"); + public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); + if (pattern == null || pattern.toString().equals("")) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? + "null" : "empty")); + + throwIfNoAssignorsConfigured(); + log.info("Subscribed to pattern: '{}'", pattern); + this.subscriptions.subscribe(pattern, listener); + this.updatePatternSubscription(metadata.fetch()); + this.metadata.requestUpdateForNewTopics(); + } + + /** + * TODO: remove this when we implement the KIP-848 protocol. + * + * <p> + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ + private void updatePatternSubscription(Cluster cluster) { + final Set<String> topicsToSubscribe = cluster.topics().stream() + .filter(subscriptions::matchesSubscribedPattern) + .collect(Collectors.toSet()); + if (subscriptions.subscribeFromPattern(topicsToSubscribe)) + metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { - throw new KafkaException("method not implemented"); + subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { - throw new KafkaException("method not implemented"); + fetchBuffer.retainAll(Collections.emptySet()); + this.subscriptions.unsubscribe(); } @Override @Deprecated - public ConsumerRecords<K, V> poll(long timeout) { - throw new KafkaException("method not implemented"); + public ConsumerRecords<K, V> poll(final long timeoutMs) { + return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } - private static <K, V> ClusterResourceListeners configureClusterResourceListeners( - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final List<?>... candidateLists) { - ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); - for (List<?> candidateList: candidateLists) - clusterResourceListeners.maybeAddAll(candidateList); + private void sendFetches() { + FetchEvent event = new FetchEvent(); + eventHandler.add(event); + + event.future().whenComplete((completedFetches, error) -> { + if (completedFetches != null && !completedFetches.isEmpty()) { + fetchBuffer.addAll(completedFetches); + } + }); + } + + /** + * @throws KafkaException if the rebalance callback throws exception + */ + private Fetch<K, V> pollForFetches(Timer timer) { + long pollTimeout = timer.remainingMs(); + + // if data is available already, return it immediately + final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + sendFetches(); + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { Review Comment: fetchPosition is updated in the user thread. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -192,71 +321,38 @@ public PrototypeAsyncConsumer(final ConsumerConfig config, @Override public ConsumerRecords<K, V> poll(final Duration timeout) { Timer timer = time.timer(timeout); + try { - do { - if (!eventHandler.isEmpty()) { - final Optional<BackgroundEvent> backgroundEvent = eventHandler.poll(); - // processEvent() may process 3 types of event: - // 1. Errors - // 2. Callback Invocation - // 3. Fetch responses - // Errors will be handled or rethrown. - // Callback invocation will trigger callback function execution, which is blocking until completion. - // Successful fetch responses will be added to the completedFetches in the fetcher, which will then - // be processed in the collectFetches(). - backgroundEvent.ifPresent(event -> processEvent(event, timeout)); - } + backgroundEventProcessor.process(); - updateFetchPositionsIfNeeded(timer); + this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); - // The idea here is to have the background thread sending fetches autonomously, and the fetcher - // uses the poll loop to retrieve successful fetchResponse and process them on the polling thread. - final Fetch<K, V> fetch = collectFetches(); - if (!fetch.isEmpty()) { - return processFetchResults(fetch); - } - // We will wait for retryBackoffMs - } while (time.timer(timeout).notExpired()); - } catch (final Exception e) { - throw new RuntimeException(e); - } - // TODO: Once we implement poll(), clear wakeupTrigger in a finally block: wakeupTrigger.clearActiveTask(); + if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { + throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); + } - return ConsumerRecords.empty(); - } + do { + updateAssignmentMetadataIfNeeded(timer); + final Fetch<K, V> fetch = pollForFetches(timer); - /** - * Set the fetch position to the committed position (if there is one) or reset it using the - * offset reset policy the user has configured (if partitions require reset) - * - * @return true if the operation completed without timing out - * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined - */ - private boolean updateFetchPositionsIfNeeded(final Timer timer) { - // If any partitions have been truncated due to a leader change, we need to validate the offsets - ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); - eventHandler.add(validatePositionsEvent); + if (!fetch.isEmpty()) { + sendFetches(); - // If there are any partitions which do not have a valid position and are not - // awaiting reset, then we need to fetch committed offsets. We will only do a - // coordinator lookup if there are partitions which have missing positions, so - // a consumer with manually assigned partitions can avoid a coordinator dependence - // by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !refreshCommittedOffsetsIfNeeded(timer)) - return false; + if (fetch.records().isEmpty()) { + log.trace("Returning empty records from `poll()` " + + "since the consumer's position has advanced for at least one topic partition"); + } - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); + return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records())); + } + // We will wait for retryBackoffMs + } while (timer.notExpired()); - // Finally send an asynchronous request to look up and update the positions of any - // partitions which are awaiting reset. - ResetPositionsApplicationEvent resetPositionsEvent = new ResetPositionsApplicationEvent(); - eventHandler.add(resetPositionsEvent); - return true; + return ConsumerRecords.empty(); + } finally { + this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); + } + // TODO: Once we implement poll(), clear wakeupTrigger in a finally block: wakeupTrigger.clearActiveTask(); Review Comment: Is this comment addressed? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -0,0 +1,3565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FetchMetadata; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FetchRequestManagerTest { + + private static final double EPSILON = 0.0001; + + private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener(); + private String topicName = "test"; + private String groupId = "test-group"; + private Uuid topicId = Uuid.randomUuid(); + private Map<String, Uuid> topicIds = new HashMap<String, Uuid>() { + { + put(topicName, topicId); + } + }; + private Map<Uuid, String> topicNames = singletonMap(topicId, topicName); + private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; + private TopicPartition tp0 = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); + private TopicPartition tp2 = new TopicPartition(topicName, 2); + private TopicPartition tp3 = new TopicPartition(topicName, 3); + private TopicIdPartition tidp0 = new TopicIdPartition(topicId, tp0); + private TopicIdPartition tidp1 = new TopicIdPartition(topicId, tp1); + private TopicIdPartition tidp2 = new TopicIdPartition(topicId, tp2); + private TopicIdPartition tidp3 = new TopicIdPartition(topicId, tp3); + private int validLeaderEpoch = 0; + private MetadataResponse initialUpdateResponse = + RequestTestUtils.metadataUpdateWithIds(1, singletonMap(topicName, 4), topicIds); + + private int minBytes = 1; + private int maxBytes = Integer.MAX_VALUE; + private int maxWaitMs = 0; + private int fetchSize = 1000; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 30000; + private MockTime time = new MockTime(1); + private SubscriptionState subscriptions; + private ConsumerMetadata metadata; + private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; + private MockClient client; + private Metrics metrics; + private ApiVersions apiVersions = new ApiVersions(); + private ConsumerNetworkClient oldConsumerClient; + private TestableFetchRequestManager<?, ?> fetcher; + private TestableNetworkClientDelegate consumerClient; + private OffsetFetcher offsetFetcher; + + private MemoryRecords records; + private MemoryRecords nextRecords; + private MemoryRecords emptyRecords; + private MemoryRecords partialRecords; + + @BeforeEach + public void setup() { + records = buildRecords(1L, 3, 1); + nextRecords = buildRecords(4L, 2, 4); + emptyRecords = buildRecords(0L, 0, 0); + partialRecords = buildRecords(4L, 1, 0); + partialRecords.buffer().putInt(Records.SIZE_OFFSET, 10000); + } + + private void assignFromUser(Set<TopicPartition> partitions) { + subscriptions.assignFromUser(partitions); + client.updateMetadata(initialUpdateResponse); + + // A dummy metadata update to ensure valid leader epoch. + metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, + Collections.emptyMap(), singletonMap(topicName, 4), + tp -> validLeaderEpoch, topicIds), false, 0L); + } + + private void assignFromUserNoId(Set<TopicPartition> partitions) { + subscriptions.assignFromUser(partitions); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singletonMap("noId", 1), Collections.emptyMap())); + + // A dummy metadata update to ensure valid leader epoch. + metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1, + Collections.emptyMap(), singletonMap("noId", 1), + tp -> validLeaderEpoch, topicIds), false, 0L); + } + + @AfterEach + public void teardown() throws Exception { + if (metrics != null) + this.metrics.close(); + if (fetcher != null) + this.fetcher.close(); + } + + private int sendFetches() { + offsetFetcher.validatePositionsOnMetadataChange(); + return fetcher.sendFetches(); + } + + @Test + public void testFetchNormal() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + + List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp0); + assertEquals(3, records.size()); + assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position + long offset = 1; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(offset, record.offset()); + offset += 1; + } + } + + @Test + public void testInflightFetchOnPendingPartitions() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + assertEquals(1, sendFetches()); + subscriptions.markPendingRevocation(singleton(tp0)); + + client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + assertNull(fetchedRecords().get(tp0)); + } + + @Test + public void testCloseShouldBeIdempotent() { + buildFetcher(); + + fetcher.close(); + fetcher.close(); + fetcher.close(); + + verify(fetcher, times(1)).closeInternal(any(Timer.class)); + } + + @Test + public void testFetcherCloseClosesFetchSessionsInBroker() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + final FetchResponse fetchResponse = fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0); + client.prepareResponse(fetchResponse); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + assertEquals(0, consumerClient.pendingRequestCount()); + + final ArgumentCaptor<NetworkClientDelegate.UnsentRequest> argument = ArgumentCaptor.forClass(NetworkClientDelegate.UnsentRequest.class); + + // send request to close the fetcher + Timer timer = time.timer(Duration.ofSeconds(10)); + this.fetcher.close(timer); + NetworkClientDelegate.PollResult pollResult = fetcher.poll(time.milliseconds()); + consumerClient.addAll(pollResult.unsentRequests); + consumerClient.poll(timer); + + // validate that Fetcher.close() has sent a request with final epoch. 2 requests are sent, one for the normal + // fetch earlier and another for the finish fetch here. + verify(consumerClient, times(2)).doSend(argument.capture(), any(Long.class)); + NetworkClientDelegate.UnsentRequest unsentRequest = argument.getValue(); + FetchRequest.Builder builder = (FetchRequest.Builder) unsentRequest.requestBuilder(); + // session Id is the same + assertEquals(fetchResponse.sessionId(), builder.metadata().sessionId()); + // contains final epoch + assertEquals(FetchMetadata.FINAL_EPOCH, builder.metadata().epoch()); // final epoch indicates we want to close the session + assertTrue(builder.fetchData().isEmpty()); // partition data should be empty + } + + @Test + public void testFetchingPendingPartitions() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + // normal fetch + assertEquals(1, sendFetches()); + client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchedRecords(); + assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position + + // mark partition unfetchable + subscriptions.markPendingRevocation(singleton(tp0)); + assertEquals(0, sendFetches()); + consumerClient.poll(time.timer(0)); + assertFalse(fetcher.hasCompletedFetches()); + fetchedRecords(); + assertEquals(4L, subscriptions.position(tp0).offset); + } + + @Test + public void testFetchWithNoTopicId() { + // Should work and default to using old request type. + buildFetcher(); + + TopicIdPartition noId = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition("noId", 0)); + assignFromUserNoId(singleton(noId.topicPartition())); + subscriptions.seek(noId.topicPartition(), 0); + + // Fetch should use request version 12 + assertEquals(1, sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse( + fetchRequestMatcher((short) 12, noId, 0, Optional.of(validLeaderEpoch)), + fullFetchResponse(noId, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords(); + assertTrue(partitionRecords.containsKey(noId.topicPartition())); + + List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(noId.topicPartition()); + assertEquals(3, records.size()); + assertEquals(4L, subscriptions.position(noId.topicPartition()).offset); // this is the next fetching position + long offset = 1; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(offset, record.offset()); + offset += 1; + } + } + + @Test + public void testFetchWithTopicId() { + buildFetcher(); + + TopicIdPartition tp = new TopicIdPartition(topicId, new TopicPartition(topicName, 0)); + assignFromUser(singleton(tp.topicPartition())); + subscriptions.seek(tp.topicPartition(), 0); + + // Fetch should use latest version Review Comment: Should this comment be moved to above the `client.prepareResponse` below? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org