This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a5cba8  KAFKA-7286; Avoid getting stuck loading large metadata 
records (#5500)
4a5cba8 is described below

commit 4a5cba87bcd4621b999f7290d17d88d6859afb84
Author: Flavien Raynaud <[email protected]>
AuthorDate: Sun Sep 9 08:05:49 2018 +0100

    KAFKA-7286; Avoid getting stuck loading large metadata records (#5500)
    
    If a group metadata record size is higher than offsets.load.buffer.size,
    loading offsets and group metadata from __consumer_offsets would hang
    forever. This was due to the buffer being too small to fit any message
    bigger than the maximum configuration. This patch grows the buffer
    as needed so the large records will fit and the loading can move on.
    A similar change was made to the logic for state loading in the transaction
    coordinator.
    
    Reviewers: John Roesler <[email protected]>, lambdaliu 
<[email protected]>, Dhruvil Shah <[email protected]>, 
Jason Gustafson <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.scala   | 19 +++++++++--
 .../transaction/TransactionStateManager.scala      | 16 ++++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +--
 .../group/GroupMetadataManagerTest.scala           | 39 ++++++++++++++++++++--
 .../TransactionCoordinatorConcurrencyTest.scala    |  2 ++
 .../transaction/TransactionStateManagerTest.scala  |  2 ++
 6 files changed, 75 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 6bd0a5a..940ec71 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -510,7 +510,9 @@ class GroupMetadataManager(brokerId: Int,
 
       case Some(log) =>
         var currOffset = log.logStartOffset
-        lazy val buffer = ByteBuffer.allocate(config.loadBufferSize)
+
+        // buffer may not be needed if records are read from memory
+        var buffer = ByteBuffer.allocate(0)
 
         // loop breaks if leader changes at any time during the load, since 
getHighWatermark is -1
         val loadedOffsets = mutable.Map[GroupTopicPartition, 
CommitRecordMetadataAndOffset]()
@@ -524,7 +526,20 @@ class GroupMetadataManager(brokerId: Int,
           val memRecords = fetchDataInfo.records match {
             case records: MemoryRecords => records
             case fileRecords: FileRecords =>
-              buffer.clear()
+              val sizeInBytes = fileRecords.sizeInBytes
+              val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)
+
+              // minOneMessage = true in the above log.read means that the 
buffer may need to be grown to ensure progress can be made
+              if (buffer.capacity < bytesNeeded) {
+                if (config.loadBufferSize < bytesNeeded)
+                  warn(s"Loaded offsets and group metadata from 
$topicPartition with buffer larger ($bytesNeeded bytes) than " +
+                    s"configured offsets.load.buffer.size 
(${config.loadBufferSize} bytes)")
+
+                buffer = ByteBuffer.allocate(bytesNeeded)
+              } else {
+                buffer.clear()
+              }
+
               fileRecords.readInto(buffer, 0)
               MemoryRecords.readableRecords(buffer)
           }
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a358515..50d96c3 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -296,7 +296,8 @@ class TransactionStateManager(brokerId: Int,
         warn(s"Attempted to load offsets and group metadata from 
$topicPartition, but found no log")
 
       case Some(log) =>
-        lazy val buffer = 
ByteBuffer.allocate(config.transactionLogLoadBufferSize)
+        // buffer may not be needed if records are read from memory
+        var buffer = ByteBuffer.allocate(0)
 
         // loop breaks if leader changes at any time during the load, since 
getHighWatermark is -1
         var currOffset = log.logStartOffset
@@ -311,6 +312,19 @@ class TransactionStateManager(brokerId: Int,
             val memRecords = fetchDataInfo.records match {
               case records: MemoryRecords => records
               case fileRecords: FileRecords =>
+                val sizeInBytes = fileRecords.sizeInBytes
+                val bytesNeeded = 
Math.max(config.transactionLogLoadBufferSize, sizeInBytes)
+
+                // minOneMessage = true in the above log.read means that the 
buffer may need to be grown to ensure progress can be made
+                if (buffer.capacity < bytesNeeded) {
+                  if (config.transactionLogLoadBufferSize < bytesNeeded)
+                    warn(s"Loaded offsets and group metadata from 
$topicPartition with buffer larger ($bytesNeeded bytes) than " +
+                      s"configured transaction.state.log.load.buffer.size 
(${config.transactionLogLoadBufferSize} bytes)")
+
+                  buffer = ByteBuffer.allocate(bytesNeeded)
+                } else {
+                  buffer.clear()
+                }
                 buffer.clear()
                 fileRecords.readInto(buffer, 0)
                 MemoryRecords.readableRecords(buffer)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 753a5b9..1bc9707 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -660,7 +660,7 @@ object KafkaConfig {
   val GroupInitialRebalanceDelayMsDoc = "The amount of time the group 
coordinator will wait for more consumers to join a new group before performing 
the first rebalance. A longer delay means potentially fewer rebalances, but 
increases the time until processing begins."
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry 
associated with an offset commit"
-  val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets 
segments when loading offsets into the cache."
+  val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets 
segments when loading offsets into the cache (soft-limit, overridden if records 
are too large)."
   val OffsetsTopicReplicationFactorDoc = "The replication factor for the 
offsets topic (set higher to ensure availability). " +
   "Internal topic creation will fail until the cluster size meets this 
replication factor requirement."
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset 
commit topic (should not change after deployment)"
@@ -677,7 +677,7 @@ object KafkaConfig {
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for 
transactions. " +
     "If a client’s requested transaction time exceed this, then the broker 
will return an error in InitProducerIdRequest. This prevents a client from too 
large of a timeout, which can stall consumers reading from topics included in 
the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " 
config for the transaction topic."
-  val TransactionsLoadBufferSizeDoc = "Batch size for reading from the 
transaction log segments when loading producer ids and transactions into the 
cache."
+  val TransactionsLoadBufferSizeDoc = "Batch size for reading from the 
transaction log segments when loading producer ids and transactions into the 
cache (soft-limit, overridden if records are too large)."
   val TransactionsTopicReplicationFactorDoc = "The replication factor for the 
transaction topic (set higher to ensure availability). " +
     "Internal topic creation will fail until the cluster size meets this 
replication factor requirement."
   val TransactionsTopicPartitionsDoc = "The number of partitions for the 
transaction topic (should not change after deployment)."
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3e462db..a0dfbda 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -646,6 +646,38 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadGroupWithLargeGroupMetadataRecord() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    // create a GroupMetadata record larger then offsets.load.buffer.size 
(here at least 16 bytes larger)
+    val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
+    val memberId = "98098230493"
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+      protocolType = "consumer", protocol = "range", memberId, assignmentSize)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
   def testOffsetWriteAfterGroupRemoved(): Unit = {
     // this test case checks the following scenario:
     // 1. the group exists at some point in time, but is later removed 
(because all members left)
@@ -1603,7 +1635,7 @@ class GroupMetadataManagerTest {
     val apiVersion = KAFKA_1_1_IV0
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, 
apiVersion = apiVersion, retentionTime = Some(100))
     val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion)
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion = apiVersion)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
       offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
 
@@ -1708,13 +1740,14 @@ class GroupMetadataManagerTest {
                                                protocolType: String,
                                                protocol: String,
                                                memberId: String,
+                                               assignmentSize: Int = 0,
                                                apiVersion: ApiVersion = 
ApiVersion.latestVersion): SimpleRecord = {
     val memberProtocols = List((protocol, Array.emptyByteArray))
     val member = new MemberMetadata(memberId, groupId, "clientId", 
"clientHost", 30000, 10000, protocolType, memberProtocols)
     val group = GroupMetadata.loadGroup(groupId, Stable, generation, 
protocolType, protocol, memberId,
       if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, 
Seq(member), time)
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
-    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, 
Map(memberId -> Array.empty[Byte]), apiVersion)
+    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, 
Map(memberId -> new Array[Byte](assignmentSize)), apiVersion)
     new SimpleRecord(groupMetadataKey, groupMetadataValue)
   }
 
@@ -1751,6 +1784,8 @@ class GroupMetadataManagerTest {
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), 
fileRecordsMock))
 
+    
EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
     val bufferCapture = EasyMock.newCapture[ByteBuffer]
     fileRecordsMock.readInto(EasyMock.capture(bufferCapture), 
EasyMock.anyInt())
     EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 873b88d..060e07e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -259,6 +259,8 @@ class TransactionCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurren
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), 
fileRecordsMock))
 
+    
EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
     val bufferCapture = EasyMock.newCapture[ByteBuffer]
     fileRecordsMock.readInto(EasyMock.capture(bufferCapture), 
EasyMock.anyInt())
     EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 34b82d9..74bbe33 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -586,6 +586,8 @@ class TransactionStateManagerTest {
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), 
fileRecordsMock))
 
+    
EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
     val bufferCapture = EasyMock.newCapture[ByteBuffer]
     fileRecordsMock.readInto(EasyMock.capture(bufferCapture), 
EasyMock.anyInt())
     EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {

Reply via email to