This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 012880d KAFKA-8052; Ensure fetch session epoch is updated before new
request (#6582)
012880d is described below
commit 012880d4246fedb5c1ea7621e86217c57fd217e2
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue May 21 21:46:04 2019 +0100
KAFKA-8052; Ensure fetch session epoch is updated before new request (#6582)
Reviewers: Jason Gustafson <[email protected]>, Colin Patrick McCabe
<[email protected]>, Andrew Olson <[email protected]>, José Armando García
Sancio <[email protected]>
---
.../kafka/clients/consumer/internals/Fetcher.java | 103 ++++++++++++---------
.../clients/consumer/internals/FetcherTest.java | 56 +++++++++++
2 files changed, 116 insertions(+), 43 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4ea9b0b..7b633d6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -111,6 +111,10 @@ import static java.util.Collections.emptyList;
* the caller.</li>
* <li>Responses that collate partial responses from multiple brokers
(e.g. to list offsets) are
* synchronized on the response future.</li>
+ * <li>At most one request is pending for each node at any time. Nodes
with pending requests are
+ * tracked and updated after processing the response. This ensures that
any state (e.g. epoch)
+ * updated while processing responses on one thread are visible while
creating the subsequent request
+ * on a different thread.</li>
* </ul>
*/
public class Fetcher<K, V> implements Closeable {
@@ -139,6 +143,7 @@ public class Fetcher<K, V> implements Closeable {
private final AtomicReference<RuntimeException> cachedListOffsetsException
= new AtomicReference<>();
private final AtomicReference<RuntimeException>
cachedOffsetForLeaderException = new AtomicReference<>();
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+ private final Set<Integer> nodesWithPendingFetchRequests;
private PartitionRecords nextInLineRecords = null;
@@ -183,6 +188,7 @@ public class Fetcher<K, V> implements Closeable {
this.isolationLevel = isolationLevel;
this.sessionHandlers = new HashMap<>();
this.offsetsForLeaderEpochClient = new
OffsetsForLeaderEpochClient(client, logContext);
+ this.nodesWithPendingFetchRequests = new HashSet<>();
}
/**
@@ -237,63 +243,73 @@ public class Fetcher<K, V> implements Closeable {
@Override
public void onSuccess(ClientResponse resp) {
synchronized (Fetcher.this) {
- @SuppressWarnings("unchecked")
- FetchResponse<Records> response =
(FetchResponse<Records>) resp.responseBody();
- FetchSessionHandler handler =
sessionHandler(fetchTarget.id());
- if (handler == null) {
- log.error("Unable to find
FetchSessionHandler for node {}. Ignoring fetch response.",
- fetchTarget.id());
- return;
- }
- if (!handler.handleResponse(response)) {
- return;
- }
+ try {
+ @SuppressWarnings("unchecked")
+ FetchResponse<Records> response =
(FetchResponse<Records>) resp.responseBody();
+ FetchSessionHandler handler =
sessionHandler(fetchTarget.id());
+ if (handler == null) {
+ log.error("Unable to find
FetchSessionHandler for node {}. Ignoring fetch response.",
+ fetchTarget.id());
+ return;
+ }
+ if (!handler.handleResponse(response)) {
+ return;
+ }
- Set<TopicPartition> partitions = new
HashSet<>(response.responseData().keySet());
- FetchResponseMetricAggregator metricAggregator
= new FetchResponseMetricAggregator(sensors, partitions);
-
- for (Map.Entry<TopicPartition,
FetchResponse.PartitionData<Records>> entry :
response.responseData().entrySet()) {
- TopicPartition partition = entry.getKey();
- FetchRequest.PartitionData requestData =
data.sessionPartitions().get(partition);
- if (requestData == null) {
- String message;
- if (data.metadata().isFull()) {
- message =
MessageFormatter.arrayFormat(
- "Response for missing full
request partition: partition={}; metadata={}",
- new Object[]{partition,
data.metadata()}).getMessage();
+ Set<TopicPartition> partitions = new
HashSet<>(response.responseData().keySet());
+ FetchResponseMetricAggregator
metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
+
+ for (Map.Entry<TopicPartition,
FetchResponse.PartitionData<Records>> entry :
response.responseData().entrySet()) {
+ TopicPartition partition =
entry.getKey();
+ FetchRequest.PartitionData requestData
= data.sessionPartitions().get(partition);
+ if (requestData == null) {
+ String message;
+ if (data.metadata().isFull()) {
+ message =
MessageFormatter.arrayFormat(
+ "Response for missing
full request partition: partition={}; metadata={}",
+ new
Object[]{partition, data.metadata()}).getMessage();
+ } else {
+ message =
MessageFormatter.arrayFormat(
+ "Response for missing
session request partition: partition={}; metadata={}; toSend={}; toForget={}",
+ new
Object[]{partition, data.metadata(), data.toSend(),
data.toForget()}).getMessage();
+ }
+
+ // Received fetch response for
missing session partition
+ throw new
IllegalStateException(message);
} else {
- message =
MessageFormatter.arrayFormat(
- "Response for missing
session request partition: partition={}; metadata={}; toSend={}; toForget={}",
- new Object[]{partition,
data.metadata(), data.toSend(), data.toForget()}).getMessage();
- }
-
- // Received fetch response for missing
session partition
- throw new
IllegalStateException(message);
- } else {
- long fetchOffset =
requestData.fetchOffset;
- FetchResponse.PartitionData<Records>
fetchData = entry.getValue();
+ long fetchOffset =
requestData.fetchOffset;
+
FetchResponse.PartitionData<Records> fetchData = entry.getValue();
- log.debug("Fetch {} at offset {} for
partition {} returned fetch data {}",
- isolationLevel, fetchOffset,
partition, fetchData);
- completedFetches.add(new
CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
+ log.debug("Fetch {} at offset {}
for partition {} returned fetch data {}",
+ isolationLevel,
fetchOffset, partition, fetchData);
+ completedFetches.add(new
CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
+ }
}
- }
-
sensors.fetchLatency.record(resp.requestLatencyMs());
+
sensors.fetchLatency.record(resp.requestLatencyMs());
+ } finally {
+
nodesWithPendingFetchRequests.remove(fetchTarget.id());
+ }
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (Fetcher.this) {
- FetchSessionHandler handler =
sessionHandler(fetchTarget.id());
- if (handler != null) {
- handler.handleError(e);
+ try {
+ FetchSessionHandler handler =
sessionHandler(fetchTarget.id());
+ if (handler != null) {
+ handler.handleError(e);
+ }
+ } finally {
+
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
}
}
});
+
+ this.nodesWithPendingFetchRequests.add(entry.getKey().id());
}
return fetchRequestMap.size();
}
@@ -1055,8 +1071,9 @@ public class Fetcher<K, V> implements Closeable {
// If we try to send during the reconnect blackout window,
then the request is just
// going to be failed anyway before being sent, so skip the
send for now
log.trace("Skipping fetch for partition {} because node {} is
awaiting reconnect backoff", partition, node);
- } else if (client.hasPendingRequests(node)) {
- log.trace("Skipping fetch for partition {} because there is an
in-flight request to {}", partition, node);
+
+ } else if (this.nodesWithPendingFetchRequests.contains(node.id()))
{
+ log.trace("Skipping fetch for partition {} because previous
request to {} has not been processed", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new
fetch
FetchSessionHandler.Builder builder = fetchable.get(node);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1dfdf23..2f40ffc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2941,6 +2941,62 @@ public class FetcherTest {
}
@Test
+ public void testFetcherSessionEpochUpdate() throws Exception {
+ buildFetcher(2);
+
+ MetadataResponse initialMetadataResponse =
TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
+ client.updateMetadata(initialMetadataResponse);
+ assignFromUser(Collections.singleton(tp0));
+ subscriptions.seek(tp0, 0L);
+
+ AtomicInteger fetchesRemaining = new AtomicInteger(1000);
+ executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(() -> {
+ long nextOffset = 0;
+ long nextEpoch = 0;
+ while (fetchesRemaining.get() > 0) {
+ synchronized (consumerClient) {
+ if (!client.requests().isEmpty()) {
+ ClientRequest request = client.requests().peek();
+ FetchRequest fetchRequest = (FetchRequest)
request.requestBuilder().build();
+ int epoch = fetchRequest.metadata().epoch();
+ assertTrue(String.format("Unexpected epoch expected %d
got %d", nextEpoch, epoch), epoch == 0 || epoch == nextEpoch);
+ nextEpoch++;
+ LinkedHashMap<TopicPartition,
FetchResponse.PartitionData<MemoryRecords>> responseMap = new LinkedHashMap<>();
+ responseMap.put(tp0, new
FetchResponse.PartitionData<>(Errors.NONE, nextOffset + 2L, nextOffset + 2,
+ 0L, null, buildRecords(nextOffset, 2,
nextOffset)));
+ nextOffset += 2;
+ client.respondToRequest(request, new
FetchResponse<>(Errors.NONE, responseMap, 0, 123));
+ consumerClient.poll(time.timer(0));
+ }
+ }
+ }
+ return fetchesRemaining.get();
+ });
+ long nextFetchOffset = 0;
+ while (fetchesRemaining.get() > 0 && !future.isDone()) {
+ if (fetcher.sendFetches() == 1) {
+ synchronized (consumerClient) {
+ consumerClient.poll(time.timer(0));
+ }
+ }
+ if (fetcher.hasCompletedFetches()) {
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
fetchedRecords = fetchedRecords();
+ if (!fetchedRecords.isEmpty()) {
+ fetchesRemaining.decrementAndGet();
+ List<ConsumerRecord<byte[], byte[]>> records =
fetchedRecords.get(tp0);
+ assertEquals(2, records.size());
+ assertEquals(nextFetchOffset, records.get(0).offset());
+ assertEquals(nextFetchOffset + 1, records.get(1).offset());
+ nextFetchOffset += 2;
+ }
+ assertTrue(fetchedRecords().isEmpty());
+ }
+ }
+ assertEquals(0, future.get());
+ }
+
+ @Test
public void testEmptyControlBatch() {
buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(),
new ByteArrayDeserializer(), Integer.MAX_VALUE,
IsolationLevel.READ_COMMITTED);