dajac commented on code in PR #13526:
URL: https://github.com/apache/kafka/pull/13526#discussion_r1168982888


##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +141,119 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+    val txnPartitions = new TransactionLogValue.PartitionsSchema()
+      .setTopic("topic")
+      .setPartitionIds(java.util.Collections.singletonList(0))
+
+    val txnLogValue = new TransactionLogValue()
+      .setProducerId(100)
+      .setProducerEpoch(50.toShort)
+      .setTransactionStatus(CompleteCommit.id)
+      .setTransactionStartTimestampMs(750L)
+      .setTransactionLastUpdateTimestampMs(1000L)
+      .setTransactionTimeoutMs(500)
+      
.setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+    val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+    val deserialized = TransactionLog.readTxnRecordValue("transactionId", 
serialized).get
+
+    assertEquals(100, deserialized.producerId)
+    assertEquals(50, deserialized.producerEpoch)
+    assertEquals(CompleteCommit, deserialized.state)
+    assertEquals(750L, deserialized.txnStartTimestamp)
+    assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+    assertEquals(500, deserialized.txnTimeoutMs)
+
+    val actualTxnPartitions = deserialized.topicPartitions
+    assertEquals(1, actualTxnPartitions.size)
+    assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        0, new Field("partition_foo", Type.STRING, ""),
+        1, new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue.PartitionsSchema with tagged fields

Review Comment:
   nit: `Create`.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +141,119 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
+    val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, 
CompleteCommit, Set.empty, 500, 500)
+    val txnLogValueBuffer = 
ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata))
+    assertEquals(0, txnLogValueBuffer.getShort)
+  }
+
+  @Test
+  def testDeserializeHighestSupportedTransactionLogValue(): Unit = {
+    val txnPartitions = new TransactionLogValue.PartitionsSchema()
+      .setTopic("topic")
+      .setPartitionIds(java.util.Collections.singletonList(0))
+
+    val txnLogValue = new TransactionLogValue()
+      .setProducerId(100)
+      .setProducerEpoch(50.toShort)
+      .setTransactionStatus(CompleteCommit.id)
+      .setTransactionStartTimestampMs(750L)
+      .setTransactionLastUpdateTimestampMs(1000L)
+      .setTransactionTimeoutMs(500)
+      
.setTransactionPartitions(java.util.Collections.singletonList(txnPartitions))
+
+    val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue)
+    val deserialized = TransactionLog.readTxnRecordValue("transactionId", 
serialized).get
+
+    assertEquals(100, deserialized.producerId)
+    assertEquals(50, deserialized.producerEpoch)
+    assertEquals(CompleteCommit, deserialized.state)
+    assertEquals(750L, deserialized.txnStartTimestamp)
+    assertEquals(1000L, deserialized.txnLastUpdateTimestamp)
+    assertEquals(500, deserialized.txnTimeoutMs)
+
+    val actualTxnPartitions = deserialized.topicPartitions
+    assertEquals(1, actualTxnPartitions.size)
+    assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0)))
+  }
+
+  @Test
+  def testDeserializeFutureTransactionLogValue(): Unit = {
+    // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futurePartitionsSchema = new Schema(
+      new Field("topic", Type.COMPACT_STRING, ""),
+      new Field("partition_ids", new CompactArrayOf(Type.INT32), ""),
+      TaggedFieldsSection.of(
+        0, new Field("partition_foo", Type.STRING, ""),
+        1, new Field("partition_foo", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue.PartitionsSchema with tagged fields
+    val txnPartitions = new Struct(futurePartitionsSchema)
+    txnPartitions.set("topic", "topic")
+    txnPartitions.set("partition_ids", Array(Integer.valueOf(1)))
+    val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]()
+    txnPartitionsTaggedFields.put(0, "foo")
+    txnPartitionsTaggedFields.put(1, 4000)
+    txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields)
+
+    // Copy of TransactionLogValue.SCHEMA_1 with a few
+    // additional tagged fields.
+    val futureTransactionLogValueSchema = new Schema(
+      new Field("producer_id", Type.INT64, ""),
+      new Field("producer_epoch", Type.INT16, ""),
+      new Field("transaction_timeout_ms", Type.INT32, ""),
+      new Field("transaction_status", Type.INT8, ""),
+      new Field("transaction_partitions", 
CompactArrayOf.nullable(futurePartitionsSchema), ""),
+      new Field("transaction_last_update_timestamp_ms", Type.INT64, ""),
+      new Field("transaction_start_timestamp_ms", Type.INT64, ""),
+      TaggedFieldsSection.of(
+        0, new Field("txn_foo", Type.STRING, ""),
+        1, new Field("txn_bar", Type.INT32, "")
+      )
+    )
+
+    // create TransactionLogValue with tagged fields

Review Comment:
   nit: `Create`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to