This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 8625d0e MINOR: Raft max batch size needs to propagate to log config
(#10256)
8625d0e is described below
commit 8625d0e0a6f3d2dfae62036e2e3a245501b05dc4
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Mar 4 14:40:31 2021 -0800
MINOR: Raft max batch size needs to propagate to log config (#10256)
This patch ensures that the constant max batch size defined in
`KafkaRaftClient` is propagated to the constructed log configuration in
`KafkaMetadataLog`. We also ensure that the fetch max size is set consistently
with appropriate testing.
Reviewers: Chia-Ping Tsai <[email protected]>, David Arthur
<[email protected]>
---
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 84 ++++++----
core/src/main/scala/kafka/raft/RaftManager.scala | 29 +---
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 181 +++++++++++++--------
.../scala/unit/kafka/raft/RaftManagerTest.scala | 2 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 24 +--
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 12 +-
.../apache/kafka/raft/RaftClientTestContext.java | 4 +-
.../apache/kafka/raft/RaftEventSimulationTest.java | 2 +-
8 files changed, 197 insertions(+), 141 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 1d347cb..64ae636 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,29 +16,20 @@
*/
package kafka.raft
-import java.nio.file.Files
-import java.nio.file.NoSuchFileException
-import java.util.NoSuchElementException
-import java.util.Optional
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.ConcurrentSkipListSet
+import java.util.{NoSuchElementException, Optional, Properties}
-import kafka.log.{AppendOrigin, Log, SnapshotGenerated, LogOffsetSnapshot}
-import kafka.server.{FetchHighWatermark, FetchLogEnd}
+import kafka.api.ApiVersion
+import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot,
SnapshotGenerated}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd,
LogDirFailureChannel}
+import kafka.utils.Scheduler
import org.apache.kafka.common.record.{MemoryRecords, Records}
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.raft.Isolation
-import org.apache.kafka.raft.LogAppendInfo
-import org.apache.kafka.raft.LogFetchInfo
-import org.apache.kafka.raft.LogOffsetMetadata
-import org.apache.kafka.raft.OffsetAndEpoch
-import org.apache.kafka.raft.OffsetMetadata
-import org.apache.kafka.raft.ReplicatedLog
-import org.apache.kafka.snapshot.FileRawSnapshotReader
-import org.apache.kafka.snapshot.FileRawSnapshotWriter
-import org.apache.kafka.snapshot.RawSnapshotReader
-import org.apache.kafka.snapshot.RawSnapshotWriter
-import org.apache.kafka.snapshot.SnapshotPath
-import org.apache.kafka.snapshot.Snapshots
+import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo,
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
+import org.apache.kafka.snapshot.{FileRawSnapshotReader,
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath,
Snapshots}
import scala.compat.java8.OptionConverters._
@@ -297,11 +288,53 @@ final class KafkaMetadataLog private (
}
object KafkaMetadataLog {
+
def apply(
- log: Log,
topicPartition: TopicPartition,
- maxFetchSizeInBytes: Int = 1024 * 1024
+ dataDir: File,
+ time: Time,
+ scheduler: Scheduler,
+ maxBatchSizeInBytes: Int,
+ maxFetchSizeInBytes: Int
): KafkaMetadataLog = {
+ val props = new Properties()
+ props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+ props.put(LogConfig.MessageFormatVersionProp,
ApiVersion.latestVersion.toString)
+
+ LogConfig.validateValues(props)
+ val defaultLogConfig = LogConfig(props)
+
+ val log = Log(
+ dir = dataDir,
+ config = defaultLogConfig,
+ logStartOffset = 0L,
+ recoveryPoint = 0L,
+ scheduler = scheduler,
+ brokerTopicStats = new BrokerTopicStats,
+ time = time,
+ maxProducerIdExpirationMs = Int.MaxValue,
+ producerIdExpirationCheckIntervalMs = Int.MaxValue,
+ logDirFailureChannel = new LogDirFailureChannel(5),
+ keepPartitionMetadataFile = false
+ )
+
+ val metadataLog = new KafkaMetadataLog(
+ log,
+ recoverSnapshots(log),
+ topicPartition,
+ maxFetchSizeInBytes
+ )
+
+ // When recovering, truncate fully if the latest snapshot is after the log
end offset. This can happen to a follower
+ // when the follower crashes after downloading a snapshot from the leader
but before it could truncate the log fully.
+ metadataLog.truncateToLatestSnapshot()
+
+ metadataLog
+ }
+
+ private def recoverSnapshots(
+ log: Log
+ ): ConcurrentSkipListSet[OffsetAndEpoch] = {
val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
// Scan the log directory; deleting partial snapshots and remembering
immutable snapshots
Files
@@ -322,12 +355,7 @@ object KafkaMetadataLog {
}
}
}
-
- val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition,
maxFetchSizeInBytes)
- // When recovering, truncate fully if the latest snapshot is after the log
end offset. This can happen to a follower
- // when the follower crashes after downloading a snapshot from the leader
but before it could truncate the log fully.
- replicatedLog.truncateToLatestSnapshot()
-
- replicatedLog
+ snapshotIds
}
+
}
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 3b714f3..4a5837a 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -22,9 +22,9 @@ import java.util
import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
-import kafka.log.{Log, LogConfig, LogManager}
+import kafka.log.Log
import kafka.raft.KafkaRaftManager.RaftIoThread
-import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel,
MetaProperties}
+import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup,
ManualMetadataUpdater, NetworkClient}
@@ -241,25 +241,14 @@ class KafkaRaftManager[T](
}
private def buildMetadataLog(): KafkaMetadataLog = {
- val defaultProps = LogConfig.extractLogConfigMap(config)
- LogConfig.validateValues(defaultProps)
- val defaultLogConfig = LogConfig(defaultProps)
-
- val log = Log(
- dir = dataDir,
- config = defaultLogConfig,
- logStartOffset = 0L,
- recoveryPoint = 0L,
- scheduler = scheduler,
- brokerTopicStats = new BrokerTopicStats,
- time = time,
- maxProducerIdExpirationMs = config.transactionalIdExpirationMs,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
- logDirFailureChannel = new LogDirFailureChannel(5),
- keepPartitionMetadataFile = config.usesTopicId
+ KafkaMetadataLog(
+ topicPartition,
+ dataDir,
+ time,
+ scheduler,
+ maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
)
-
- KafkaMetadataLog(log, topicPartition)
}
private def buildNetworkClient(): NetworkClient = {
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index ce55a2b..08e7ea4 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -17,35 +17,23 @@
package kafka.raft
import java.io.File
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.Optional
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Path}
+import java.util.{Collections, Optional}
+
import kafka.log.Log
-import kafka.log.LogManager
-import kafka.log.LogTest
-import kafka.server.BrokerTopicStats
-import kafka.server.LogDirFailureChannel
-import kafka.utils.MockTime
-import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.record.SimpleRecord
+import kafka.server.KafkaRaftServer
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException,
RecordTooLargeException}
+import org.apache.kafka.common.protocol
+import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
SimpleRecord}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.raft.LogAppendInfo
-import org.apache.kafka.raft.LogOffsetMetadata
-import org.apache.kafka.raft.OffsetAndEpoch
-import org.apache.kafka.raft.ReplicatedLog
-import org.apache.kafka.snapshot.SnapshotPath
-import org.apache.kafka.snapshot.Snapshots
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertThrows
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.apache.kafka.raft.internals.BatchBuilder
+import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo,
LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
+import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
final class KafkaMetadataLogTest {
import KafkaMetadataLogTest._
@@ -65,8 +53,7 @@ final class KafkaMetadataLogTest {
@Test
def testUnexpectedAppendOffset(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val recordFoo = new SimpleRecord("foo".getBytes())
val currentEpoch = 3
@@ -100,11 +87,10 @@ final class KafkaMetadataLogTest {
@Test
def testCreateSnapshot(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
val numberOfRecords = 10
val epoch = 0
val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
@@ -120,16 +106,14 @@ final class KafkaMetadataLogTest {
@Test
def testReadMissingSnapshot(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
assertEquals(Optional.empty(), log.readSnapshot(new OffsetAndEpoch(10, 0)))
}
@Test
def testUpdateLogStartOffset(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val offset = 10
val epoch = 0
val snapshotId = new OffsetAndEpoch(offset, epoch)
@@ -160,8 +144,7 @@ final class KafkaMetadataLogTest {
@Test
def testUpdateLogStartOffsetWithMissingSnapshot(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val offset = 10
val epoch = 0
@@ -177,8 +160,7 @@ final class KafkaMetadataLogTest {
@Test
def testFailToIncreaseLogStartPastHighWatermark(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val offset = 10
val epoch = 0
val snapshotId = new OffsetAndEpoch(2 * offset, 1 + epoch)
@@ -198,8 +180,7 @@ final class KafkaMetadataLogTest {
@Test
def testTruncateFullyToLatestSnapshot(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 0
val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch)
@@ -233,8 +214,7 @@ final class KafkaMetadataLogTest {
@Test
def testDoesntTruncateFully(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
@@ -259,8 +239,7 @@ final class KafkaMetadataLogTest {
@Test
def testCleanupSnapshots(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime,
topicPartition)
+ val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
val snapshotId = new OffsetAndEpoch(1, epoch)
@@ -277,7 +256,7 @@ final class KafkaMetadataLogTest {
Snapshots.createTempFile(logDir, new OffsetAndEpoch(1, epoch))
Snapshots.createTempFile(logDir, new OffsetAndEpoch(2, epoch + 1))
- val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val secondLog = buildMetadataLog(tempDir, mockTime)
assertEquals(snapshotId, secondLog.latestSnapshotId.get)
assertEquals(0, log.startOffset)
@@ -297,8 +276,7 @@ final class KafkaMetadataLogTest {
@Test
def testCreateReplicatedLogTruncatesFully(): Unit = {
- val topicPartition = new TopicPartition("cluster-metadata", 0)
- val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime,
topicPartition)
+ val log = buildMetadataLog(tempDir, mockTime)
val numberOfRecords = 10
val epoch = 1
val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1)
@@ -310,7 +288,7 @@ final class KafkaMetadataLogTest {
log.close()
- val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+ val secondLog = buildMetadataLog(tempDir, mockTime)
assertEquals(snapshotId, secondLog.latestSnapshotId.get)
assertEquals(snapshotId.offset, secondLog.startOffset)
@@ -318,36 +296,101 @@ final class KafkaMetadataLogTest {
assertEquals(snapshotId.offset, secondLog.endOffset().offset)
assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
}
+
+ @Test
+ def testMaxBatchSize(): Unit = {
+ val leaderEpoch = 5
+ val maxBatchSizeInBytes = 16384
+ val recordSize = 64
+ val log = buildMetadataLog(tempDir, mockTime, maxBatchSizeInBytes)
+
+ val oversizeBatch = buildFullBatch(leaderEpoch, recordSize,
maxBatchSizeInBytes + recordSize)
+ assertThrows(classOf[RecordTooLargeException], () => {
+ log.appendAsLeader(oversizeBatch, leaderEpoch)
+ })
+
+ val undersizeBatch = buildFullBatch(leaderEpoch, recordSize,
maxBatchSizeInBytes)
+ val appendInfo = log.appendAsLeader(undersizeBatch, leaderEpoch)
+ assertEquals(0L, appendInfo.firstOffset)
+ }
+
+ private def buildFullBatch(
+ leaderEpoch: Int,
+ recordSize: Int,
+ maxBatchSizeInBytes: Int
+ ): MemoryRecords = {
+ val buffer = ByteBuffer.allocate(maxBatchSizeInBytes)
+ val batchBuilder = new BatchBuilder[Array[Byte]](
+ buffer,
+ new ByteArraySerde,
+ CompressionType.NONE,
+ 0L,
+ mockTime.milliseconds(),
+ false,
+ leaderEpoch,
+ maxBatchSizeInBytes
+ )
+
+ val serializationCache = new ObjectSerializationCache
+ val records = Collections.singletonList(new Array[Byte](recordSize))
+ while (!batchBuilder.bytesNeeded(records, serializationCache).isPresent) {
+ batchBuilder.appendRecord(records.get(0), serializationCache)
+ }
+
+ batchBuilder.build()
+ }
+
}
object KafkaMetadataLogTest {
- def buildMetadataLogAndDir(tempDir: File, time: MockTime, topicPartition:
TopicPartition): (Path, KafkaMetadataLog) = {
-
- val logDir = createLogDirectory(tempDir, Log.logDirName(topicPartition))
- val logConfig = LogTest.createLogConfig()
-
- val log = Log(
- dir = logDir,
- config = logConfig,
- logStartOffset = 0,
- recoveryPoint = 0,
- scheduler = time.scheduler,
- brokerTopicStats = new BrokerTopicStats,
- time = time,
- maxProducerIdExpirationMs = 60 * 60 * 1000,
- producerIdExpirationCheckIntervalMs =
LogManager.ProducerIdExpirationCheckIntervalMs,
- logDirFailureChannel = new LogDirFailureChannel(5)
+ class ByteArraySerde extends RecordSerde[Array[Byte]] {
+ override def recordSize(data: Array[Byte], serializationCache:
ObjectSerializationCache): Int = {
+ data.length
+ }
+ override def write(data: Array[Byte], serializationCache:
ObjectSerializationCache, out: Writable): Unit = {
+ out.writeByteArray(data)
+ }
+ override def read(input: protocol.Readable, size: Int): Array[Byte] = {
+ val array = new Array[Byte](size)
+ input.readArray(array)
+ array
+ }
+ }
+
+ def buildMetadataLogAndDir(
+ tempDir: File,
+ time: MockTime,
+ maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ ): (Path, KafkaMetadataLog) = {
+
+ val logDir = createLogDirectory(
+ tempDir,
+ Log.logDirName(KafkaRaftServer.MetadataPartition)
)
- (logDir.toPath, KafkaMetadataLog(log, topicPartition))
+ val metadataLog = KafkaMetadataLog(
+ KafkaRaftServer.MetadataPartition,
+ logDir,
+ time,
+ time.scheduler,
+ maxBatchSizeInBytes,
+ maxFetchSizeInBytes
+ )
+
+ (logDir.toPath, metadataLog)
}
- def buildMetadataLog(tempDir: File, time: MockTime, topicPartition:
TopicPartition): KafkaMetadataLog = {
- val (_, log) = buildMetadataLogAndDir(tempDir, time, topicPartition)
+ def buildMetadataLog(
+ tempDir: File,
+ time: MockTime,
+ maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ ): KafkaMetadataLog = {
+ val (_, log) = buildMetadataLogAndDir(tempDir, time, maxBatchSizeInBytes,
maxFetchSizeInBytes)
log
}
-
def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int,
initialOffset: Long = 0L): LogAppendInfo = {
log.appendAsLeader(
MemoryRecords.withRecords(
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index f7f8a06..3afb75b 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.CompletableFuture
import kafka.raft.KafkaRaftManager.RaftIoThread
import org.apache.kafka.raft.KafkaRaftClient
import org.junit.jupiter.api.Assertions._
-import org.mockito.Mockito._
import org.junit.jupiter.api.Test
+import org.mockito.Mockito._
class RaftManagerTest {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 672b967..b8022da 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -133,16 +133,15 @@ import static
org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
*/
public class KafkaRaftClient<T> implements RaftClient<T> {
private static final int RETRY_BACKOFF_BASE_MS = 100;
- private static final int FETCH_MAX_WAIT_MS = 500;
- static final int MAX_BATCH_SIZE = 8 * 1024 * 1024;
+ public static final int MAX_FETCH_WAIT_MS = 500;
+ public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
+ public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
private final AtomicReference<GracefulShutdown> shutdown = new
AtomicReference<>();
private final Logger logger;
- private final LogContext logContext;
private final Time time;
private final int fetchMaxWaitMs;
private final String clusterId;
- private final OptionalInt nodeId;
private final NetworkChannel channel;
private final ReplicatedLog log;
private final Random random;
@@ -151,8 +150,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final RecordSerde<T> serde;
private final MemoryPool memoryPool;
private final RaftMessageQueue messageQueue;
- private final QuorumStateStore quorumStateStore;
- private final Metrics metrics;
private final RaftConfig raftConfig;
private final KafkaRaftMetrics kafkaRaftMetrics;
private final QuorumState quorum;
@@ -187,11 +184,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
new BlockingMessageQueue(),
log,
quorumStateStore,
- new BatchMemoryPool(5, MAX_BATCH_SIZE),
+ new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
time,
metrics,
expirationService,
- FETCH_MAX_WAIT_MS,
+ MAX_FETCH_WAIT_MS,
clusterId,
nodeId,
logContext,
@@ -220,16 +217,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
this.channel = channel;
this.messageQueue = messageQueue;
this.log = log;
- this.quorumStateStore = quorumStateStore;
this.memoryPool = memoryPool;
this.fetchPurgatory = new ThresholdPurgatory<>(expirationService);
this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
this.time = time;
this.clusterId = clusterId;
- this.nodeId = nodeId;
- this.metrics = metrics;
this.fetchMaxWaitMs = fetchMaxWaitMs;
- this.logContext = logContext;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.raftConfig = raftConfig;
@@ -413,7 +406,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
quorum.epoch(),
log.endOffset().offset,
raftConfig.appendLingerMs(),
- MAX_BATCH_SIZE,
+ MAX_BATCH_SIZE_BYTES,
memoryPool,
time,
CompressionType.NONE,
@@ -1775,8 +1768,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
.setFetchOffset(log.endOffset().offset);
});
return request
+ .setMaxBytes(MAX_FETCH_SIZE_BYTES)
.setMaxWaitMs(fetchMaxWaitMs)
- .setClusterId(clusterId.toString())
+ .setClusterId(clusterId)
.setReplicaId(quorum.localIdOrSentinel());
}
@@ -2245,7 +2239,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
public SnapshotWriter<T> createSnapshot(OffsetAndEpoch snapshotId) throws
IOException {
return new SnapshotWriter<>(
log.createSnapshot(snapshotId),
- MAX_BATCH_SIZE,
+ MAX_BATCH_SIZE_BYTES,
memoryPool,
time,
CompressionType.NONE,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 8093c1f..1c716f4 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -471,8 +471,8 @@ public class KafkaRaftClientTest {
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
- ByteBuffer buffer =
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE);
- Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE))
+ ByteBuffer buffer =
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
+
Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
.thenReturn(buffer);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -500,8 +500,8 @@ public class KafkaRaftClientTest {
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
- ByteBuffer buffer =
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE);
- Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE))
+ ByteBuffer buffer =
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
+
Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
.thenReturn(buffer);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -530,8 +530,8 @@ public class KafkaRaftClientTest {
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
- ByteBuffer buffer =
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE);
- Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE))
+ ByteBuffer buffer =
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
+
Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
.thenReturn(buffer);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index e57995c..3683a8e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -87,7 +87,7 @@ public final class RaftClientTestContext {
final TopicPartition metadataPartition = Builder.METADATA_PARTITION;
final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS;
- final int electionFetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
+ final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
@@ -870,6 +870,8 @@ public final class RaftClientTestContext {
assertTrue(
message.data() instanceof FetchRequestData, "Unexpected request
type " + message.data());
FetchRequestData request = (FetchRequestData) message.data();
+ assertEquals(KafkaRaftClient.MAX_FETCH_SIZE_BYTES, request.maxBytes());
+ assertEquals(fetchMaxWaitMs, request.maxWaitMs());
assertEquals(1, request.topics().size());
assertEquals(metadataPartition.topic(),
request.topics().get(0).topic());
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index a1af912..28a6618 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -747,7 +747,7 @@ public class RaftEventSimulationTest {
persistentState.log.reopen();
IntSerde serde = new IntSerde();
- MemoryPool memoryPool = new BatchMemoryPool(2,
KafkaRaftClient.MAX_BATCH_SIZE);
+ MemoryPool memoryPool = new BatchMemoryPool(2,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
KafkaRaftClient<Integer> client = new KafkaRaftClient<>(
serde,