This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 35c2c22  KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly 
by group coordinator (#9202)
35c2c22 is described below

commit 35c2c226bf403e1dd1b47084f13327ed7cc00ee4
Author: Luke Chen <[email protected]>
AuthorDate: Tue Sep 22 04:35:16 2020 +0800

    KAFKA-10401; Ensure `currentStateTimeStamp` is set correctly by group 
coordinator (#9202)
    
    Fix the `currentStateTimeStamp` doesn't get set in 
`GROUP_METADATA_VALUE_SCHEMA_V3`, and did a small refactor to use the 
`GROUP_VALUE_SCHEMAS.size - 1` replace the default hard-coded max version 
number. Also add test for it.
    
    Reviewers: Jason Gustafson <[email protected]>, Ismael Juma 
<[email protected]>
---
 .../coordinator/group/GroupMetadataManager.scala   | 18 ++++-----
 .../group/GroupMetadataManagerTest.scala           | 43 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index ef44604..e6561d3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1074,6 +1074,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.keySet.max
 
   private def schemaForKey(version: Int) = {
     val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
@@ -1338,23 +1339,18 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
-      if (version >= 0 && version <= 3) {
+      if (version >= 0 && version <= 
CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] = version match {
-          case version if version == 2 =>
-            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-              if (timestamp == -1) None else Some(timestamp)
-            } else
-              None
-          case _ =>
-            None
-        }
+        val currentStateTimestamp: Option[Long] =
+          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
+            val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
+            if (timestamp == -1) None else Some(timestamp)
+          } else None
 
         val members = memberMetadataArray.map { memberMetadataObj =>
           val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a07a628..726ff20 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -30,8 +30,9 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.KafkaException
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue, 
assertThrows}
 import org.junit.{Before, Test}
 import org.scalatest.Assertions.fail
 import java.nio.ByteBuffer
@@ -802,7 +803,45 @@ class GroupMetadataManagerTest {
   }
 
   @Test
-  def testReadFromOldGroupMetadata() {
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unsupportedVersion = Short.MinValue
+
+    // put the unsupported version as the version value
+    val groupMetadataRecordValue = 
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unsupportedVersion)
+    // reset the position to the starting position 0 so that it can read the 
data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unsupportedVersion}", 
e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the 
currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(s"the apiVersion $apiVersion doesn't set the 
currentStateTimestamp correctly.",
+          Some(time.milliseconds()), 
deserializedGroupMetadata.currentStateTimestamp)
+      else
+        assertTrue(s"the apiVersion $apiVersion should not set the 
currentStateTimestamp.",
+          deserializedGroupMetadata.currentStateTimestamp.isEmpty)
+    }
+  }
+
+  @Test
+  def testReadFromOldGroupMetadata(): Unit = {
     val generation = 1
     val protocol = "range"
     val memberId = "memberId"

Reply via email to