This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 8625d0e  MINOR: Raft max batch size needs to propagate to log config 
(#10256)
8625d0e is described below

commit 8625d0e0a6f3d2dfae62036e2e3a245501b05dc4
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Mar 4 14:40:31 2021 -0800

    MINOR: Raft max batch size needs to propagate to log config (#10256)
    
    This patch ensures that the constant max batch size defined in 
`KafkaRaftClient` is propagated to the constructed log configuration in 
`KafkaMetadataLog`. We also ensure that the fetch max size is set consistently 
with appropriate testing.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, David Arthur 
<[email protected]>
---
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  84 ++++++----
 core/src/main/scala/kafka/raft/RaftManager.scala   |  29 +---
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    | 181 +++++++++++++--------
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |   2 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  24 +--
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  12 +-
 .../apache/kafka/raft/RaftClientTestContext.java   |   4 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |   2 +-
 8 files changed, 197 insertions(+), 141 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 1d347cb..64ae636 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -16,29 +16,20 @@
  */
 package kafka.raft
 
-import java.nio.file.Files
-import java.nio.file.NoSuchFileException
-import java.util.NoSuchElementException
-import java.util.Optional
+import java.io.File
+import java.nio.file.{Files, NoSuchFileException}
 import java.util.concurrent.ConcurrentSkipListSet
+import java.util.{NoSuchElementException, Optional, Properties}
 
-import kafka.log.{AppendOrigin, Log, SnapshotGenerated, LogOffsetSnapshot}
-import kafka.server.{FetchHighWatermark, FetchLogEnd}
+import kafka.api.ApiVersion
+import kafka.log.{AppendOrigin, Log, LogConfig, LogOffsetSnapshot, 
SnapshotGenerated}
+import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchLogEnd, 
LogDirFailureChannel}
+import kafka.utils.Scheduler
 import org.apache.kafka.common.record.{MemoryRecords, Records}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.raft.Isolation
-import org.apache.kafka.raft.LogAppendInfo
-import org.apache.kafka.raft.LogFetchInfo
-import org.apache.kafka.raft.LogOffsetMetadata
-import org.apache.kafka.raft.OffsetAndEpoch
-import org.apache.kafka.raft.OffsetMetadata
-import org.apache.kafka.raft.ReplicatedLog
-import org.apache.kafka.snapshot.FileRawSnapshotReader
-import org.apache.kafka.snapshot.FileRawSnapshotWriter
-import org.apache.kafka.snapshot.RawSnapshotReader
-import org.apache.kafka.snapshot.RawSnapshotWriter
-import org.apache.kafka.snapshot.SnapshotPath
-import org.apache.kafka.snapshot.Snapshots
+import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, 
LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog}
+import org.apache.kafka.snapshot.{FileRawSnapshotReader, 
FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, 
Snapshots}
 
 import scala.compat.java8.OptionConverters._
 
@@ -297,11 +288,53 @@ final class KafkaMetadataLog private (
 }
 
 object KafkaMetadataLog {
+
   def apply(
-    log: Log,
     topicPartition: TopicPartition,
-    maxFetchSizeInBytes: Int = 1024 * 1024
+    dataDir: File,
+    time: Time,
+    scheduler: Scheduler,
+    maxBatchSizeInBytes: Int,
+    maxFetchSizeInBytes: Int
   ): KafkaMetadataLog = {
+    val props = new Properties()
+    props.put(LogConfig.MaxMessageBytesProp, maxBatchSizeInBytes.toString)
+    props.put(LogConfig.MessageFormatVersionProp, 
ApiVersion.latestVersion.toString)
+
+    LogConfig.validateValues(props)
+    val defaultLogConfig = LogConfig(props)
+
+    val log = Log(
+      dir = dataDir,
+      config = defaultLogConfig,
+      logStartOffset = 0L,
+      recoveryPoint = 0L,
+      scheduler = scheduler,
+      brokerTopicStats = new BrokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = Int.MaxValue,
+      producerIdExpirationCheckIntervalMs = Int.MaxValue,
+      logDirFailureChannel = new LogDirFailureChannel(5),
+      keepPartitionMetadataFile = false
+    )
+
+    val metadataLog = new KafkaMetadataLog(
+      log,
+      recoverSnapshots(log),
+      topicPartition,
+      maxFetchSizeInBytes
+    )
+
+    // When recovering, truncate fully if the latest snapshot is after the log 
end offset. This can happen to a follower
+    // when the follower crashes after downloading a snapshot from the leader 
but before it could truncate the log fully.
+    metadataLog.truncateToLatestSnapshot()
+
+    metadataLog
+  }
+
+  private def recoverSnapshots(
+    log: Log
+  ): ConcurrentSkipListSet[OffsetAndEpoch] = {
     val snapshotIds = new ConcurrentSkipListSet[OffsetAndEpoch]()
     // Scan the log directory; deleting partial snapshots and remembering 
immutable snapshots
     Files
@@ -322,12 +355,7 @@ object KafkaMetadataLog {
           }
         }
       }
-
-    val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, 
maxFetchSizeInBytes)
-    // When recovering, truncate fully if the latest snapshot is after the log 
end offset. This can happen to a follower
-    // when the follower crashes after downloading a snapshot from the leader 
but before it could truncate the log fully.
-    replicatedLog.truncateToLatestSnapshot()
-
-    replicatedLog
+    snapshotIds
   }
+
 }
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 3b714f3..4a5837a 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -22,9 +22,9 @@ import java.util
 import java.util.OptionalInt
 import java.util.concurrent.CompletableFuture
 
-import kafka.log.{Log, LogConfig, LogManager}
+import kafka.log.Log
 import kafka.raft.KafkaRaftManager.RaftIoThread
-import kafka.server.{BrokerTopicStats, KafkaConfig, LogDirFailureChannel, 
MetaProperties}
+import kafka.server.{KafkaConfig, MetaProperties}
 import kafka.utils.timer.SystemTimer
 import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
 import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, 
ManualMetadataUpdater, NetworkClient}
@@ -241,25 +241,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildMetadataLog(): KafkaMetadataLog = {
-    val defaultProps = LogConfig.extractLogConfigMap(config)
-    LogConfig.validateValues(defaultProps)
-    val defaultLogConfig = LogConfig(defaultProps)
-
-    val log = Log(
-      dir = dataDir,
-      config = defaultLogConfig,
-      logStartOffset = 0L,
-      recoveryPoint = 0L,
-      scheduler = scheduler,
-      brokerTopicStats = new BrokerTopicStats,
-      time = time,
-      maxProducerIdExpirationMs = config.transactionalIdExpirationMs,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
-      logDirFailureChannel = new LogDirFailureChannel(5),
-      keepPartitionMetadataFile = config.usesTopicId
+    KafkaMetadataLog(
+      topicPartition,
+      dataDir,
+      time,
+      scheduler,
+      maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+      maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
     )
-
-    KafkaMetadataLog(log, topicPartition)
   }
 
   private def buildNetworkClient(): NetworkClient = {
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala 
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index ce55a2b..08e7ea4 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -17,35 +17,23 @@
 package kafka.raft
 
 import java.io.File
-import java.nio.file.Files
-import java.nio.file.Path
-import java.util.Optional
+import java.nio.ByteBuffer
+import java.nio.file.{Files, Path}
+import java.util.{Collections, Optional}
+
 import kafka.log.Log
-import kafka.log.LogManager
-import kafka.log.LogTest
-import kafka.server.BrokerTopicStats
-import kafka.server.LogDirFailureChannel
-import kafka.utils.MockTime
-import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.record.SimpleRecord
+import kafka.server.KafkaRaftServer
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, 
RecordTooLargeException}
+import org.apache.kafka.common.protocol
+import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.raft.LogAppendInfo
-import org.apache.kafka.raft.LogOffsetMetadata
-import org.apache.kafka.raft.OffsetAndEpoch
-import org.apache.kafka.raft.ReplicatedLog
-import org.apache.kafka.snapshot.SnapshotPath
-import org.apache.kafka.snapshot.Snapshots
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertThrows
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.apache.kafka.raft.internals.BatchBuilder
+import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, 
LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog}
+import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
 final class KafkaMetadataLogTest {
   import KafkaMetadataLogTest._
@@ -65,8 +53,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testUnexpectedAppendOffset(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
 
     val recordFoo = new SimpleRecord("foo".getBytes())
     val currentEpoch = 3
@@ -100,11 +87,10 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testCreateSnapshot(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
     val numberOfRecords = 10
     val epoch = 0
     val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
 
     append(log, numberOfRecords, epoch)
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
@@ -120,16 +106,14 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testReadMissingSnapshot(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(Optional.empty(), log.readSnapshot(new OffsetAndEpoch(10, 0)))
   }
 
   @Test
   def testUpdateLogStartOffset(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
     val offset = 10
     val epoch = 0
     val snapshotId = new OffsetAndEpoch(offset, epoch)
@@ -160,8 +144,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testUpdateLogStartOffsetWithMissingSnapshot(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
     val offset = 10
     val epoch = 0
 
@@ -177,8 +160,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testFailToIncreaseLogStartPastHighWatermark(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
     val offset = 10
     val epoch = 0
     val snapshotId = new OffsetAndEpoch(2 * offset, 1 + epoch)
@@ -198,8 +180,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testTruncateFullyToLatestSnapshot(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 0
     val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch)
@@ -233,8 +214,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testDoesntTruncateFully(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val log = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 1
 
@@ -259,8 +239,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testCleanupSnapshots(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime, 
topicPartition)
+    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 1
     val snapshotId = new OffsetAndEpoch(1, epoch)
@@ -277,7 +256,7 @@ final class KafkaMetadataLogTest {
     Snapshots.createTempFile(logDir, new OffsetAndEpoch(1, epoch))
     Snapshots.createTempFile(logDir, new OffsetAndEpoch(2, epoch + 1))
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(0, log.startOffset)
@@ -297,8 +276,7 @@ final class KafkaMetadataLogTest {
 
   @Test
   def testCreateReplicatedLogTruncatesFully(): Unit = {
-    val topicPartition = new TopicPartition("cluster-metadata", 0)
-    val (logDir, log) = buildMetadataLogAndDir(tempDir, mockTime, 
topicPartition)
+    val log = buildMetadataLog(tempDir, mockTime)
     val numberOfRecords = 10
     val epoch = 1
     val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1)
@@ -310,7 +288,7 @@ final class KafkaMetadataLogTest {
 
     log.close()
 
-    val secondLog = buildMetadataLog(tempDir, mockTime, topicPartition)
+    val secondLog = buildMetadataLog(tempDir, mockTime)
 
     assertEquals(snapshotId, secondLog.latestSnapshotId.get)
     assertEquals(snapshotId.offset, secondLog.startOffset)
@@ -318,36 +296,101 @@ final class KafkaMetadataLogTest {
     assertEquals(snapshotId.offset, secondLog.endOffset().offset)
     assertEquals(snapshotId.offset, secondLog.highWatermark.offset)
   }
+
+  @Test
+  def testMaxBatchSize(): Unit = {
+    val leaderEpoch = 5
+    val maxBatchSizeInBytes = 16384
+    val recordSize = 64
+    val log = buildMetadataLog(tempDir, mockTime, maxBatchSizeInBytes)
+
+    val oversizeBatch = buildFullBatch(leaderEpoch, recordSize, 
maxBatchSizeInBytes + recordSize)
+    assertThrows(classOf[RecordTooLargeException], () => {
+      log.appendAsLeader(oversizeBatch, leaderEpoch)
+    })
+
+    val undersizeBatch = buildFullBatch(leaderEpoch, recordSize, 
maxBatchSizeInBytes)
+    val appendInfo = log.appendAsLeader(undersizeBatch, leaderEpoch)
+    assertEquals(0L, appendInfo.firstOffset)
+  }
+
+  private def buildFullBatch(
+    leaderEpoch: Int,
+    recordSize: Int,
+    maxBatchSizeInBytes: Int
+  ): MemoryRecords = {
+    val buffer = ByteBuffer.allocate(maxBatchSizeInBytes)
+    val batchBuilder = new BatchBuilder[Array[Byte]](
+      buffer,
+      new ByteArraySerde,
+      CompressionType.NONE,
+      0L,
+      mockTime.milliseconds(),
+      false,
+      leaderEpoch,
+      maxBatchSizeInBytes
+    )
+
+    val serializationCache = new ObjectSerializationCache
+    val records = Collections.singletonList(new Array[Byte](recordSize))
+    while (!batchBuilder.bytesNeeded(records, serializationCache).isPresent) {
+      batchBuilder.appendRecord(records.get(0), serializationCache)
+    }
+
+    batchBuilder.build()
+  }
+
 }
 
 object KafkaMetadataLogTest {
-  def buildMetadataLogAndDir(tempDir: File, time: MockTime, topicPartition: 
TopicPartition): (Path, KafkaMetadataLog) = {
-
-    val logDir = createLogDirectory(tempDir, Log.logDirName(topicPartition))
-    val logConfig = LogTest.createLogConfig()
-
-    val log = Log(
-      dir = logDir,
-      config = logConfig,
-      logStartOffset = 0,
-      recoveryPoint = 0,
-      scheduler = time.scheduler,
-      brokerTopicStats = new BrokerTopicStats,
-      time = time,
-      maxProducerIdExpirationMs = 60 * 60 * 1000,
-      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
-      logDirFailureChannel = new LogDirFailureChannel(5)
+  class ByteArraySerde extends RecordSerde[Array[Byte]] {
+    override def recordSize(data: Array[Byte], serializationCache: 
ObjectSerializationCache): Int = {
+      data.length
+    }
+    override def write(data: Array[Byte], serializationCache: 
ObjectSerializationCache, out: Writable): Unit = {
+      out.writeByteArray(data)
+    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
+      val array = new Array[Byte](size)
+      input.readArray(array)
+      array
+    }
+  }
+
+  def buildMetadataLogAndDir(
+    tempDir: File,
+    time: MockTime,
+    maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+    maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+  ): (Path, KafkaMetadataLog) = {
+
+    val logDir = createLogDirectory(
+      tempDir,
+      Log.logDirName(KafkaRaftServer.MetadataPartition)
     )
 
-    (logDir.toPath, KafkaMetadataLog(log, topicPartition))
+    val metadataLog = KafkaMetadataLog(
+      KafkaRaftServer.MetadataPartition,
+      logDir,
+      time,
+      time.scheduler,
+      maxBatchSizeInBytes,
+      maxFetchSizeInBytes
+    )
+
+    (logDir.toPath, metadataLog)
   }
 
-  def buildMetadataLog(tempDir: File, time: MockTime, topicPartition: 
TopicPartition): KafkaMetadataLog = {
-    val (_, log) = buildMetadataLogAndDir(tempDir, time, topicPartition)
+  def buildMetadataLog(
+    tempDir: File,
+    time: MockTime,
+    maxBatchSizeInBytes: Int = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+    maxFetchSizeInBytes: Int = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+  ): KafkaMetadataLog = {
+    val (_, log) = buildMetadataLogAndDir(tempDir, time, maxBatchSizeInBytes, 
maxFetchSizeInBytes)
     log
   }
 
-
   def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int, 
initialOffset: Long = 0L): LogAppendInfo = {
     log.appendAsLeader(
       MemoryRecords.withRecords(
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index f7f8a06..3afb75b 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.CompletableFuture
 import kafka.raft.KafkaRaftManager.RaftIoThread
 import org.apache.kafka.raft.KafkaRaftClient
 import org.junit.jupiter.api.Assertions._
-import org.mockito.Mockito._
 import org.junit.jupiter.api.Test
+import org.mockito.Mockito._
 
 class RaftManagerTest {
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 672b967..b8022da 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -133,16 +133,15 @@ import static 
org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
  */
 public class KafkaRaftClient<T> implements RaftClient<T> {
     private static final int RETRY_BACKOFF_BASE_MS = 100;
-    private static final int FETCH_MAX_WAIT_MS = 500;
-    static final int MAX_BATCH_SIZE = 8 * 1024 * 1024;
+    public static final int MAX_FETCH_WAIT_MS = 500;
+    public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
+    public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
 
     private final AtomicReference<GracefulShutdown> shutdown = new 
AtomicReference<>();
     private final Logger logger;
-    private final LogContext logContext;
     private final Time time;
     private final int fetchMaxWaitMs;
     private final String clusterId;
-    private final OptionalInt nodeId;
     private final NetworkChannel channel;
     private final ReplicatedLog log;
     private final Random random;
@@ -151,8 +150,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     private final RecordSerde<T> serde;
     private final MemoryPool memoryPool;
     private final RaftMessageQueue messageQueue;
-    private final QuorumStateStore quorumStateStore;
-    private final Metrics metrics;
     private final RaftConfig raftConfig;
     private final KafkaRaftMetrics kafkaRaftMetrics;
     private final QuorumState quorum;
@@ -187,11 +184,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             new BlockingMessageQueue(),
             log,
             quorumStateStore,
-            new BatchMemoryPool(5, MAX_BATCH_SIZE),
+            new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
             time,
             metrics,
             expirationService,
-            FETCH_MAX_WAIT_MS,
+            MAX_FETCH_WAIT_MS,
             clusterId,
             nodeId,
             logContext,
@@ -220,16 +217,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         this.channel = channel;
         this.messageQueue = messageQueue;
         this.log = log;
-        this.quorumStateStore = quorumStateStore;
         this.memoryPool = memoryPool;
         this.fetchPurgatory = new ThresholdPurgatory<>(expirationService);
         this.appendPurgatory = new ThresholdPurgatory<>(expirationService);
         this.time = time;
         this.clusterId = clusterId;
-        this.nodeId = nodeId;
-        this.metrics = metrics;
         this.fetchMaxWaitMs = fetchMaxWaitMs;
-        this.logContext = logContext;
         this.logger = logContext.logger(KafkaRaftClient.class);
         this.random = random;
         this.raftConfig = raftConfig;
@@ -413,7 +406,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             quorum.epoch(),
             log.endOffset().offset,
             raftConfig.appendLingerMs(),
-            MAX_BATCH_SIZE,
+            MAX_BATCH_SIZE_BYTES,
             memoryPool,
             time,
             CompressionType.NONE,
@@ -1775,8 +1768,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                 .setFetchOffset(log.endOffset().offset);
         });
         return request
+            .setMaxBytes(MAX_FETCH_SIZE_BYTES)
             .setMaxWaitMs(fetchMaxWaitMs)
-            .setClusterId(clusterId.toString())
+            .setClusterId(clusterId)
             .setReplicaId(quorum.localIdOrSentinel());
     }
 
@@ -2245,7 +2239,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     public SnapshotWriter<T> createSnapshot(OffsetAndEpoch snapshotId) throws 
IOException {
         return new SnapshotWriter<>(
             log.createSnapshot(snapshotId),
-            MAX_BATCH_SIZE,
+            MAX_BATCH_SIZE_BYTES,
             memoryPool,
             time,
             CompressionType.NONE,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 8093c1f..1c716f4 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -471,8 +471,8 @@ public class KafkaRaftClientTest {
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
-        ByteBuffer buffer = 
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE);
-        Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE))
+        ByteBuffer buffer = 
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
+        
Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
             .thenReturn(buffer);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -500,8 +500,8 @@ public class KafkaRaftClientTest {
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
-        ByteBuffer buffer = 
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE);
-        Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE))
+        ByteBuffer buffer = 
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
+        
Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
             .thenReturn(buffer);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -530,8 +530,8 @@ public class KafkaRaftClientTest {
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
-        ByteBuffer buffer = 
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE);
-        Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE))
+        ByteBuffer buffer = 
ByteBuffer.allocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
+        
Mockito.when(memoryPool.tryAllocate(KafkaRaftClient.MAX_BATCH_SIZE_BYTES))
             .thenReturn(buffer);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index e57995c..3683a8e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -87,7 +87,7 @@ public final class RaftClientTestContext {
 
     final TopicPartition metadataPartition = Builder.METADATA_PARTITION;
     final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS;
-    final int electionFetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
+    final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
     final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
     final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
 
@@ -870,6 +870,8 @@ public final class RaftClientTestContext {
         assertTrue(
             message.data() instanceof FetchRequestData, "Unexpected request 
type " + message.data());
         FetchRequestData request = (FetchRequestData) message.data();
+        assertEquals(KafkaRaftClient.MAX_FETCH_SIZE_BYTES, request.maxBytes());
+        assertEquals(fetchMaxWaitMs, request.maxWaitMs());
 
         assertEquals(1, request.topics().size());
         assertEquals(metadataPartition.topic(), 
request.topics().get(0).topic());
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index a1af912..28a6618 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -747,7 +747,7 @@ public class RaftEventSimulationTest {
             persistentState.log.reopen();
 
             IntSerde serde = new IntSerde();
-            MemoryPool memoryPool = new BatchMemoryPool(2, 
KafkaRaftClient.MAX_BATCH_SIZE);
+            MemoryPool memoryPool = new BatchMemoryPool(2, 
KafkaRaftClient.MAX_BATCH_SIZE_BYTES);
 
             KafkaRaftClient<Integer> client = new KafkaRaftClient<>(
                 serde,

Reply via email to