This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3da8f500eb KAFKA-18936: Fix share fetch when records are larger than
max bytes (#19145)
f3da8f500eb is described below
commit f3da8f500eb3ecd64440bf72ad46e8cf89eb5f68
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Mar 12 09:03:35 2025 +0000
KAFKA-18936: Fix share fetch when records are larger than max bytes (#19145)
The PR fixes the behaviour when records are fetched which are larger
than `fetch.max.bytes` config.
The usage of `hardMaxBytesLimit` is in ReplicaManager where it decides
whether to fetch a single record or not. The file records get sliced
based on the bytes requested. However, if `hardMaxBytesLimit` is false
then at least one record is fetched and bytes are adjusted accordingly in
`localLog`.
Reviewers: Jun Rao <[email protected]>, Andrew Schofield
<[email protected]>, Abhinav Dixit <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 5 +----
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ---
.../scala/kafka/server/LocalLeaderEndPoint.scala | 1 -
.../main/scala/kafka/server/ReplicaManager.scala | 10 +++++-----
.../kafka/log/remote/RemoteLogManagerTest.java | 8 ++++----
.../java/kafka/log/remote/RemoteLogReaderTest.java | 4 ++--
.../kafka/server/share/DelayedShareFetchTest.java | 15 +++++++-------
.../kafka/server/share/ShareFetchUtilsTest.java | 3 +--
.../server/share/SharePartitionManagerTest.java | 2 +-
.../java/kafka/test/api/ShareConsumerTest.java | 19 ++++++++++++++++++
.../kafka/server/DelayedFetchTest.scala | 3 +--
.../kafka/server/DelayedRemoteFetchTest.scala | 13 ++++++------
.../unit/kafka/cluster/PartitionLockTest.scala | 2 --
.../scala/unit/kafka/cluster/PartitionTest.scala | 2 --
.../server/ReplicaAlterLogDirsThreadTest.scala | 3 +--
.../server/ReplicaManagerConcurrencyTest.scala | 3 +--
.../kafka/server/ReplicaManagerQuotasTest.scala | 3 ---
.../unit/kafka/server/ReplicaManagerTest.scala | 13 ++++++------
.../kafka/server/storage/log/FetchParams.java | 23 ++++++----------------
.../internals/log/RemoteStorageFetchInfo.java | 6 +-----
20 files changed, 62 insertions(+), 79 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5bdc9f34c4a..250e064e059 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -1716,10 +1716,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
// An empty record is sent instead of an incomplete batch when
// - there is no minimum-one-message constraint and
// - the first batch size is more than maximum bytes that can be
sent and
- // - for FetchRequest version 3 or above.
- if (!remoteStorageFetchInfo.minOneMessage &&
- !remoteStorageFetchInfo.hardMaxBytesLimit &&
- firstBatchSize > maxBytes) {
+ if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize >
maxBytes) {
return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY);
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e77a67b35b3..02c1cb5b73b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -723,7 +723,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val params = new FetchParams(
- versionId,
fetchRequest.replicaId,
fetchRequest.replicaEpoch,
fetchRequest.maxWait,
@@ -3153,7 +3152,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val shareFetchRequest = request.body[ShareFetchRequest]
val clientId = request.header.clientId
- val versionId = request.header.apiVersion
val groupId = shareFetchRequest.data.groupId
if (interestedWithMaxBytes.isEmpty) {
@@ -3176,7 +3174,6 @@ class KafkaApis(val requestChannel: RequestChannel,
request.context.listenerName.value))
val params = new FetchParams(
- versionId,
FetchRequest.CONSUMER_REPLICA_ID,
-1,
shareFetchRequest.maxWait,
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 1e2a6cd033e..22d3ba2c0c1 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -92,7 +92,6 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
val fetchData = request.fetchData(topicNames.asJava)
val fetchParams = new FetchParams(
- request.version,
FetchRequest.FUTURE_LOCAL_REPLICA_ID,
-1,
0L, // timeout is 0 so that the callback will be executed immediately
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 885cfc82be5..9fdd42b3174 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1685,9 +1685,9 @@ class ReplicaManager(val config: KafkaConfig,
if (params.isFromFollower && shouldLeaderThrottle(quota, partition,
params.replicaId)) {
// If the partition is being throttled, simply return an empty set.
new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata,
MemoryRecords.EMPTY)
- } else if (!params.hardMaxBytesLimit &&
givenFetchedDataInfo.firstEntryIncomplete) {
- // For FetchRequest version 3, we replace incomplete message sets with
an empty one as consumers can make
- // progress in such cases and don't need to report a
`RecordTooLargeException`
+ } else if (givenFetchedDataInfo.firstEntryIncomplete) {
+ // Replace incomplete message sets with an empty one as consumers can
make progress in such
+ // cases and don't need to report a `RecordTooLargeException`
new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata,
MemoryRecords.EMPTY)
} else {
givenFetchedDataInfo
@@ -1799,7 +1799,7 @@ class ReplicaManager(val config: KafkaConfig,
var limitBytes = params.maxBytes
val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]
- var minOneMessage = !params.hardMaxBytesLimit
+ var minOneMessage = true
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
val recordBatchSize = readResult.info.records.sizeInBytes
@@ -1855,7 +1855,7 @@ class ReplicaManager(val config: KafkaConfig,
// For the first topic-partition that needs remote data, we will use
this information to read the data in another thread.
new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY, false, Optional.empty(),
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes,
minOneMessage, tp.topicPartition(),
- fetchInfo, params.isolation, params.hardMaxBytesLimit())))
+ fetchInfo, params.isolation)))
}
LogReadResult(fetchDataInfo,
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 272c78fce84..acb22dd5765 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -3129,7 +3129,7 @@ public class RemoteLogManagerTest {
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
- 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED,
false
+ 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3206,7 +3206,7 @@ public class RemoteLogManagerTest {
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
- 0, minOneMessage, tp, partitionData,
FetchIsolation.HIGH_WATERMARK, false
+ 0, minOneMessage, tp, partitionData,
FetchIsolation.HIGH_WATERMARK
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3290,7 +3290,7 @@ public class RemoteLogManagerTest {
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
doNothing().when(firstBatch).writeTo(capture.capture());
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
- 0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK,
false
+ 0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK
);
@@ -3674,7 +3674,7 @@ public class RemoteLogManagerTest {
Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(
1048576, true, leaderTopicIdPartition.topicPartition(),
- partitionData, FetchIsolation.HIGH_WATERMARK, false);
+ partitionData, FetchIsolation.HIGH_WATERMARK);
FetchDataInfo fetchDataInfo =
remoteLogManager.read(remoteStorageFetchInfo);
// firstBatch baseOffset may not be equal to the fetchOffset
assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index 400cf3c2dff..53905e6d114 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -70,7 +70,7 @@ public class RemoteLogReaderTest {
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
- RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null,
false);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
RemoteLogReader remoteLogReader =
new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback,
brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();
@@ -103,7 +103,7 @@ public class RemoteLogReaderTest {
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new
RuntimeException("error"));
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
- RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null,
false);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
RemoteLogReader remoteLogReader =
new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback,
brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 27aae04f176..aa0e855d931 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
@@ -96,7 +95,7 @@ public class DelayedShareFetchTest {
private static final int MAX_WAIT_MS = 5000;
private static final int BATCH_SIZE = 500;
private static final int MAX_FETCH_RECORDS = 100;
- private static final FetchParams FETCH_PARAMS = new
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
+ private static final FetchParams FETCH_PARAMS = new FetchParams(
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024,
FetchIsolation.HIGH_WATERMARK,
Optional.empty(), true);
private static final BrokerTopicStats BROKER_TOPIC_STATS = new
BrokerTopicStats();
@@ -180,7 +179,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE,
MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
@@ -253,7 +252,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE,
MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
@@ -627,7 +626,7 @@ public class DelayedShareFetchTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
@@ -684,7 +683,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0);
ShareFetch shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE,
MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
@@ -866,7 +865,7 @@ public class DelayedShareFetchTest {
CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
@@ -1103,7 +1102,7 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp2, sp2);
ShareFetch shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE,
MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 6baa3b05b53..f4bd75971ee 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
@@ -80,7 +79,7 @@ import static org.mockito.Mockito.when;
public class ShareFetchUtilsTest {
- private static final FetchParams FETCH_PARAMS = new
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
+ private static final FetchParams FETCH_PARAMS = new FetchParams(
FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, 1, 1024 * 1024,
FetchIsolation.HIGH_WATERMARK,
Optional.empty(), true);
private static final int BATCH_SIZE = 500;
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 4f550c6751b..832303a58ef 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -152,7 +152,7 @@ public class SharePartitionManagerTest {
private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
private static final int BATCH_SIZE = 500;
- private static final FetchParams FETCH_PARAMS = new
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
+ private static final FetchParams FETCH_PARAMS = new FetchParams(
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
private static final String TIMER_NAME_PREFIX = "share-partition-manager";
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index f485e4f816e..44509f7bb26 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -289,6 +289,25 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testPollRecordsGreaterThanMaxBytes() {
+ setup();
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1))
+ ) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record);
+ producer.flush();
+ shareConsumer.subscribe(List.of(tp.topic()));
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(1, records.count());
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+ }
+
@ClusterTest
public void testAcknowledgementSentOnSubscriptionChange() throws
ExecutionException, InterruptedException {
setup();
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 34000b44173..9c5fcf90779 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -22,7 +22,7 @@ import kafka.cluster.Partition
import org.apache.kafka.common.{TopicIdPartition, Uuid}
import org.apache.kafka.common.errors.{FencedLeaderEpochException,
NotLeaderOrFollowerException}
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
@@ -230,7 +230,6 @@ class DelayedFetchTest {
minBytes: Int = 1,
): FetchParams = {
new FetchParams(
- ApiKeys.FETCH.latestVersion,
replicaId,
1,
maxWaitMs,
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index 264f5310c2d..424c8cc04ec 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -18,7 +18,7 @@ package kafka.server
import kafka.cluster.Partition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.{TopicIdPartition, Uuid}
@@ -61,7 +61,7 @@ class DelayedRemoteFetchTest {
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
future.complete(null)
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
@@ -97,7 +97,7 @@ class DelayedRemoteFetchTest {
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
future.complete(null)
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
@@ -122,7 +122,7 @@ class DelayedRemoteFetchTest {
.thenThrow(new NotLeaderOrFollowerException(s"Replica for
$topicIdPartition not available"))
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
val logReadInfo = buildReadResult(Errors.NONE)
@@ -152,7 +152,7 @@ class DelayedRemoteFetchTest {
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
future.complete(null)
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
// build a read result with error
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
@@ -183,7 +183,7 @@ class DelayedRemoteFetchTest {
val remoteFetchTask = mock(classOf[Future[Void]])
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null, false)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future,
fetchInfo, remoteFetchMaxWaitMs,
@@ -220,7 +220,6 @@ class DelayedRemoteFetchTest {
private def buildFetchParams(replicaId: Int,
maxWaitMs: Int): FetchParams = {
new FetchParams(
- ApiKeys.FETCH.latestVersion,
replicaId,
1,
maxWaitMs,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 356496fe9d4..a74cd745627 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -25,7 +25,6 @@ import kafka.log._
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.{FetchRequest, LeaderAndIsrRequest}
import org.apache.kafka.common.utils.Utils
@@ -395,7 +394,6 @@ class PartitionLockTest extends Logging {
while (fetchOffset < numRecords) {
val fetchParams = new FetchParams(
- ApiKeys.FETCH.latestVersion,
followerId,
1,
0L,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 1bb32f1d6c2..837f3abd93b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -129,7 +129,6 @@ object PartitionTest {
maxBytes: Int = Int.MaxValue
): FetchParams = {
new FetchParams(
- ApiKeys.FETCH.latestVersion,
replicaId,
replicaEpoch,
maxWaitMs,
@@ -148,7 +147,6 @@ object PartitionTest {
isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK
): FetchParams = {
new FetchParams(
- ApiKeys.FETCH.latestVersion,
FetchRequest.CONSUMER_REPLICA_ID,
-1,
maxWaitMs,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index f60d0f0e3fd..be4152a0a6d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.errors.KafkaStorageException
import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
@@ -503,7 +503,6 @@ class ReplicaAlterLogDirsThreadTest {
ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition,
FetchPartitionData)] => Unit])
val expectedFetchParams = new FetchParams(
- ApiKeys.FETCH.latestVersion,
FetchRequest.FUTURE_LOCAL_REPLICA_ID,
-1,
0L,
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 98e872d331d..a7395dccde3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -26,7 +26,7 @@ import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.metadata.{FeatureLevelRecord,
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.{FetchRequest, ProduceResponse}
@@ -253,7 +253,6 @@ class ReplicaManagerConcurrencyTest extends Logging {
}
val fetchParams = new FetchParams(
- ApiKeys.FETCH.latestVersion,
replicaId,
defaultBrokerEpoch(replicaId),
random.nextInt(100),
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index f0a4be811bb..d48a35637e4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -24,7 +24,6 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils._
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -175,7 +174,6 @@ class ReplicaManagerQuotasTest {
new LogOffsetMetadata(50L, 0L, 250),
new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = new FetchParams(
- ApiKeys.FETCH.latestVersion,
1,
1,
600,
@@ -227,7 +225,6 @@ class ReplicaManagerQuotasTest {
new LogOffsetMetadata(50L, 0L, 250),
new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty()))
val fetchParams = new FetchParams(
- ApiKeys.FETCH.latestVersion,
FetchRequest.CONSUMER_REPLICA_ID,
-1,
600L,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 59d9b4b1a63..32677f7c4c2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3169,7 +3169,6 @@ class ReplicaManagerTest {
clientMetadata: Option[ClientMetadata] = None
): Unit = {
val params = new FetchParams(
- requestVersion,
replicaId,
1,
maxWaitMs,
@@ -3670,7 +3669,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
- val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1,
1000, 0, 100, FetchIsolation.LOG_END, None.asJava)
+ val params = new FetchParams(replicaId, 1, 1000, 0, 100,
FetchIsolation.LOG_END, None.asJava)
// when reading log, it'll throw OffsetOutOfRangeException, which will
be handled separately
val result = replicaManager.readFromLog(params, Seq(tidp0 -> new
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch),
Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
@@ -3725,7 +3724,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
- val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1,
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+ val params = new FetchParams(replicaId, 1, 1000, 10, 100,
FetchIsolation.LOG_END, None.asJava)
val fetchOffset = 1
def fetchCallback(responseStatus: Seq[(TopicIdPartition,
FetchPartitionData)]): Unit = {
@@ -3817,7 +3816,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
- val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1,
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+ val params = new FetchParams(replicaId, 1, 1000, 10, 100,
FetchIsolation.LOG_END, None.asJava)
val fetchOffset = 1
val responseLatch = new CountDownLatch(5)
@@ -3943,7 +3942,7 @@ class ReplicaManagerTest {
endOffsetMetadata,
endOffsetMetadata))
- val params = new FetchParams(ApiKeys.FETCH.latestVersion, replicaId, 1,
1000, 10, 100, FetchIsolation.LOG_END, None.asJava)
+ val params = new FetchParams(replicaId, 1, 1000, 10, 100,
FetchIsolation.LOG_END, None.asJava)
val fetchOffset = 1
def fetchCallback(responseStatus: Seq[(TopicIdPartition,
FetchPartitionData)]): Unit = {
@@ -6092,7 +6091,7 @@ class ReplicaManagerTest {
val future = new CompletableFuture[util.Map[TopicIdPartition,
ShareFetchResponseData.PartitionData]]
val shareFetch = new ShareFetch(
- new FetchParams(ApiKeys.SHARE_FETCH.latestVersion,
FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 * 1024,
FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
+ new FetchParams(FetchRequest.ORDINARY_CONSUMER_ID, -1, 500, 1, 1024 *
1024, FetchIsolation.HIGH_WATERMARK, Optional.empty, true),
groupId,
Uuid.randomUuid.toString,
future,
@@ -6159,7 +6158,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) =>
())
- val params = new FetchParams(ApiKeys.FETCH.latestVersion, -1, 1, 1000,
0, 100, FetchIsolation.HIGH_WATERMARK, None.asJava)
+ val params = new FetchParams(-1, 1, 1000, 0, 100,
FetchIsolation.HIGH_WATERMARK, None.asJava)
replicaManager.readFromLog(
params,
Seq(new TopicIdPartition(topicId, 0, topic) -> new
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch),
Optional.of(leaderEpoch))),
diff --git
a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
index 9829ce76aed..5de55023580 100644
---
a/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
+++
b/server-common/src/main/java/org/apache/kafka/server/storage/log/FetchParams.java
@@ -25,7 +25,6 @@ import java.util.Optional;
import static
org.apache.kafka.common.requests.FetchRequest.FUTURE_LOCAL_REPLICA_ID;
public class FetchParams {
- public final short requestVersion;
public final int replicaId;
public final long replicaEpoch;
public final long maxWaitMs;
@@ -35,19 +34,17 @@ public class FetchParams {
public final Optional<ClientMetadata> clientMetadata;
public final boolean shareFetchRequest;
- public FetchParams(short requestVersion,
- int replicaId,
+ public FetchParams(int replicaId,
long replicaEpoch,
long maxWaitMs,
int minBytes,
int maxBytes,
FetchIsolation isolation,
Optional<ClientMetadata> clientMetadata) {
- this(requestVersion, replicaId, replicaEpoch, maxWaitMs, minBytes,
maxBytes, isolation, clientMetadata, false);
+ this(replicaId, replicaEpoch, maxWaitMs, minBytes, maxBytes,
isolation, clientMetadata, false);
}
- public FetchParams(short requestVersion,
- int replicaId,
+ public FetchParams(int replicaId,
long replicaEpoch,
long maxWaitMs,
int minBytes,
@@ -57,7 +54,6 @@ public class FetchParams {
boolean shareFetchRequest) {
Objects.requireNonNull(isolation);
Objects.requireNonNull(clientMetadata);
- this.requestVersion = requestVersion;
this.replicaId = replicaId;
this.replicaEpoch = replicaEpoch;
this.maxWaitMs = maxWaitMs;
@@ -84,17 +80,12 @@ public class FetchParams {
return isFromFollower() || (isFromConsumer() &&
clientMetadata.isEmpty()) || shareFetchRequest;
}
- public boolean hardMaxBytesLimit() {
- return requestVersion <= 2;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FetchParams that = (FetchParams) o;
- return requestVersion == that.requestVersion
- && replicaId == that.replicaId
+ return replicaId == that.replicaId
&& replicaEpoch == that.replicaEpoch
&& maxWaitMs == that.maxWaitMs
&& minBytes == that.minBytes
@@ -106,8 +97,7 @@ public class FetchParams {
@Override
public int hashCode() {
- int result = requestVersion;
- result = 31 * result + replicaId;
+ int result = replicaId;
result = 31 * result + (int) replicaEpoch;
result = 31 * result + Long.hashCode(32);
result = 31 * result + minBytes;
@@ -121,8 +111,7 @@ public class FetchParams {
@Override
public String toString() {
return "FetchParams(" +
- "requestVersion=" + requestVersion +
- ", replicaId=" + replicaId +
+ "replicaId=" + replicaId +
", replicaEpoch=" + replicaEpoch +
", maxWaitMs=" + maxWaitMs +
", minBytes=" + minBytes +
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
index cf908afce83..c110e750d7c 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
@@ -27,17 +27,14 @@ public class RemoteStorageFetchInfo {
public final TopicPartition topicPartition;
public final FetchRequest.PartitionData fetchInfo;
public final FetchIsolation fetchIsolation;
- public final boolean hardMaxBytesLimit;
public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage,
TopicPartition topicPartition,
- FetchRequest.PartitionData fetchInfo,
FetchIsolation fetchIsolation,
- boolean hardMaxBytesLimit) {
+ FetchRequest.PartitionData fetchInfo,
FetchIsolation fetchIsolation) {
this.fetchMaxBytes = fetchMaxBytes;
this.minOneMessage = minOneMessage;
this.topicPartition = topicPartition;
this.fetchInfo = fetchInfo;
this.fetchIsolation = fetchIsolation;
- this.hardMaxBytesLimit = hardMaxBytesLimit;
}
@Override
@@ -48,7 +45,6 @@ public class RemoteStorageFetchInfo {
", topicPartition=" + topicPartition +
", fetchInfo=" + fetchInfo +
", fetchIsolation=" + fetchIsolation +
- ", hardMaxBytesLimit=" + hardMaxBytesLimit +
'}';
}
}