dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168341032


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +140,88 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeTransactionLogValueWithUnknownTaggedFields(): Unit = {

Review Comment:
   nit: Should we say `FutureTransactionLogValue`?



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +140,88 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeTransactionLogValueWithUnknownTaggedFields(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        0, new Field("partition_foo", Type.STRING, ""),
+        1, new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue.PartitionsSchema with tagged fields
+    val txnPartitions = new Struct(futurePartitionsSchema)
+    txnPartitions.set("topic", "topic")
+    txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
+    val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnPartitionsTaggedFields.put(0, "foo")
+    txnPartitionsTaggedFields.put(1, 4000)
+    txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
+
+    // Copy of TransactionLogValue.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futureTransactionLogValueSchema = new Schema(
+      new Field("producer_id", Type.INT64, ""),
+      new Field("producer_epoch", Type.INT16, ""),
+      new Field("transaction_timeout_ms", Type.INT32, ""),
+      new Field("transaction_status", Type.INT8, ""),
+      new Field("transaction_partitions", 
CompactArrayOf.nullable(futurePartitionsSchema), ""),
+      new Field("transaction_last_update_timestamp_ms", Type.INT64, ""),
+      new Field("transaction_start_timestamp_ms", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        0, new Field("txn_foo", Type.STRING, ""),
+        1, new Field("txn_bar", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue with tagged fields
+    val transactionLogValue = new Struct(futureTransactionLogValueSchema)
+    transactionLogValue.set("producer_id", 1000L)
+    transactionLogValue.set("producer_epoch", 100.toShort)
+    transactionLogValue.set("transaction_timeout_ms", 1000)
+    transactionLogValue.set("transaction_status", CompleteCommit.id)
+    transactionLogValue.set("transaction_partitions", Array(txnPartitions))
+    transactionLogValue.set("transaction_last_update_timestamp_ms", 2000L)
+    transactionLogValue.set("transaction_start_timestamp_ms", 3000L)
+    val txnLogValueTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnLogValueTaggedFields.put(0, "foo")
+    txnLogValueTaggedFields.put(1, 4000)
+    transactionLogValue.set("_tagged_fields", txnLogValueTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(transactionLogValue.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(1.toByte) // Add 1 as version.
+    transactionLogValue.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 
1.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))

Review Comment:
   nit: Could we also assert the topic's unknown tagged fields?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2616,7 +2675,8 @@ class GroupMetadataManagerTest {
                                                memberId: String,
                                                assignmentBytes: Array[Byte] = 
Array.emptyByteArray,
                                                metadataVersion: 
MetadataVersion = MetadataVersion.latest): SimpleRecord = {
-    val memberProtocols = List((protocol, Array.emptyByteArray))
+    val subscription = new Subscription(List("topic").asJava)
+    val memberProtocols = List((protocol, 
ConsumerProtocol.serializeSubscription(subscription).array()))

Review Comment:
   Ack. I would revert this and fix this in a separate PR with the relevant 
context. This is confusing otherwise.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2467,64 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new 
TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, 
memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value().getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value().getShort)
+    }
+  }
+

Review Comment:
   I am perhaps a bit paranoiac here but those tests are based on a copy of the 
schema. Using the correct one seems to be a good idea.



##########
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java:
##########
@@ -231,9 +231,9 @@ public static void processDirectories(String packageName,
             for (Path inputPath : directoryStream) {
                 try {
                     MessageSpec spec = JSON_SERDE.
-                        readValue(inputPath.toFile(), MessageSpec.class);
+                            readValue(inputPath.toFile(), MessageSpec.class);

Review Comment:
   nit: Let's fully revert changes in this file.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,28 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+

Review Comment:
   I meant a test which ensure that TransactionLogValue serialised with version 
1 can be deserialised with the current code.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2469,168 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new 
TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, 
memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value.getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value.getShort)
+    }
+  }
+
+  @Test
+  def testDeserializeOffsetCommitValueWithUnknownTaggedFields(): Unit = {
+    // Copy of OffsetCommitValue.SCHEMA_4 with a few
+    // additional tagged fields.
+    val futureOffsetCommitSchema = new Schema(
+      new Field("offset", Type.INT64, ""),
+      new Field("leader_epoch", Type.INT32, ""),
+      new Field("metadata", Type.COMPACT_STRING, ""),
+      new Field("commit_timestamp", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        0, new Field("offset_foo", Type.STRING, ""),
+        1, new Field("offset_bar", Type.INT32, "")
+      )
+    )
+
+    // create OffsetCommitValue with tagged fields
+    val offsetCommit = new Struct(futureOffsetCommitSchema)
+    offsetCommit.set("offset", 1000L)
+    offsetCommit.set("leader_epoch", 100)
+    offsetCommit.set("metadata", "metadata")
+    offsetCommit.set("commit_timestamp", 2000L)
+    val offsetCommitTaggedFields = new java.util.TreeMap[Integer, Any]()
+    offsetCommitTaggedFields.put(0, "foo")
+    offsetCommitTaggedFields.put(1, 4000)
+    offsetCommit.set("_tagged_fields", offsetCommitTaggedFields)
+
+    // Prepare the buffer.
+    val buffer = ByteBuffer.allocate(offsetCommit.sizeOf() + 2)
+    buffer.put(0.toByte)
+    buffer.put(4.toByte) // Add 4 as version.
+    offsetCommit.writeTo(buffer)
+    buffer.flip()
+
+    // Read the buffer with the real schema and verify that tagged
+    // fields were read but ignored.
+    buffer.getShort() // Skip version.
+    val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 
4.toShort)
+    assertEquals(Seq(0, 1), value.unknownTaggedFields().asScala.map(_.tag))
+
+    // Read the buffer with readOffsetMessageValue.
+    buffer.rewind()
+    val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+    assertEquals(1000L, offsetAndMetadata.offset)
+    assertEquals(100, offsetAndMetadata.leaderEpoch.get)
+    assertEquals("metadata", offsetAndMetadata.metadata)
+    assertEquals(2000L, offsetAndMetadata.commitTimestamp)
+  }
+
+  @Test
+  def testDeserializeGroupMetadataValueWithUnknownTaggedFields(): Unit = {

Review Comment:
   nit: ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2466,6 +2469,168 @@ class GroupMetadataManagerTest {
       verifySerde(version)
   }
 
+  @Test
+  def testSerializeGroupMetadataValueToHighestNonFlexibleVersion(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new 
TopicPartition("topic", 0)).asJava, null)
+    ))
+    val record = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, 
memberId, assignmentBytes)
+    )).records.asScala.head
+    assertEquals(3, record.value.getShort)
+  }
+
+  @Test
+  def testSerializeOffsetCommitValueToHighestNonFlexibleVersion(): Unit = {
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    offsetCommitRecords.foreach { record =>
+      assertEquals(3, record.value.getShort)
+    }
+  }
+
+  @Test
+  def testDeserializeOffsetCommitValueWithUnknownTaggedFields(): Unit = {

Review Comment:
   nit: Should we say `FutureOffsetCommitValue`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to