dajac commented on code in PR #13526: URL: https://github.com/apache/kafka/pull/13526#discussion_r1168982888
########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala: ########## @@ -135,4 +141,119 @@ class TransactionLogTest { assertEquals(Some("<DELETE>"), valueStringOpt) } + @Test + def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { + val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500) + val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata)) + assertEquals(0, txnLogValueBuffer.getShort) + } + + @Test + def testDeserializeHighestSupportedTransactionLogValue(): Unit = { + val txnPartitions = new TransactionLogValue.PartitionsSchema() + .setTopic("topic") + .setPartitionIds(java.util.Collections.singletonList(0)) + + val txnLogValue = new TransactionLogValue() + .setProducerId(100) + .setProducerEpoch(50.toShort) + .setTransactionStatus(CompleteCommit.id) + .setTransactionStartTimestampMs(750L) + .setTransactionLastUpdateTimestampMs(1000L) + .setTransactionTimeoutMs(500) + .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) + + val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) + val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get + + assertEquals(100, deserialized.producerId) + assertEquals(50, deserialized.producerEpoch) + assertEquals(CompleteCommit, deserialized.state) + assertEquals(750L, deserialized.txnStartTimestamp) + assertEquals(1000L, deserialized.txnLastUpdateTimestamp) + assertEquals(500, deserialized.txnTimeoutMs) + + val actualTxnPartitions = deserialized.topicPartitions + assertEquals(1, actualTxnPartitions.size) + assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0))) + } + + @Test + def testDeserializeFutureTransactionLogValue(): Unit = { + // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few + // additional tagged fields. + val futurePartitionsSchema = new Schema( + new Field("topic", Type.COMPACT_STRING, ""), + new Field("partition_ids", new CompactArrayOf(Type.INT32), ""), + TaggedFieldsSection.of( + 0, new Field("partition_foo", Type.STRING, ""), + 1, new Field("partition_foo", Type.INT32, "") + ) + ) + + // create TransactionLogValue.PartitionsSchema with tagged fields Review Comment: nit: `Create`. ########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala: ########## @@ -135,4 +141,119 @@ class TransactionLogTest { assertEquals(Some("<DELETE>"), valueStringOpt) } + @Test + def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = { + val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500) + val txnLogValueBuffer = ByteBuffer.wrap(TransactionLog.valueToBytes(txnTransitMetadata)) + assertEquals(0, txnLogValueBuffer.getShort) + } + + @Test + def testDeserializeHighestSupportedTransactionLogValue(): Unit = { + val txnPartitions = new TransactionLogValue.PartitionsSchema() + .setTopic("topic") + .setPartitionIds(java.util.Collections.singletonList(0)) + + val txnLogValue = new TransactionLogValue() + .setProducerId(100) + .setProducerEpoch(50.toShort) + .setTransactionStatus(CompleteCommit.id) + .setTransactionStartTimestampMs(750L) + .setTransactionLastUpdateTimestampMs(1000L) + .setTransactionTimeoutMs(500) + .setTransactionPartitions(java.util.Collections.singletonList(txnPartitions)) + + val serialized = MessageUtil.toVersionPrefixedByteBuffer(1, txnLogValue) + val deserialized = TransactionLog.readTxnRecordValue("transactionId", serialized).get + + assertEquals(100, deserialized.producerId) + assertEquals(50, deserialized.producerEpoch) + assertEquals(CompleteCommit, deserialized.state) + assertEquals(750L, deserialized.txnStartTimestamp) + assertEquals(1000L, deserialized.txnLastUpdateTimestamp) + assertEquals(500, deserialized.txnTimeoutMs) + + val actualTxnPartitions = deserialized.topicPartitions + assertEquals(1, actualTxnPartitions.size) + assertTrue(actualTxnPartitions.contains(new TopicPartition("topic", 0))) + } + + @Test + def testDeserializeFutureTransactionLogValue(): Unit = { + // Copy of TransactionLogValue.PartitionsSchema.SCHEMA_1 with a few + // additional tagged fields. + val futurePartitionsSchema = new Schema( + new Field("topic", Type.COMPACT_STRING, ""), + new Field("partition_ids", new CompactArrayOf(Type.INT32), ""), + TaggedFieldsSection.of( + 0, new Field("partition_foo", Type.STRING, ""), + 1, new Field("partition_foo", Type.INT32, "") + ) + ) + + // create TransactionLogValue.PartitionsSchema with tagged fields + val txnPartitions = new Struct(futurePartitionsSchema) + txnPartitions.set("topic", "topic") + txnPartitions.set("partition_ids", Array(Integer.valueOf(1))) + val txnPartitionsTaggedFields = new java.util.TreeMap[Integer, Any]() + txnPartitionsTaggedFields.put(0, "foo") + txnPartitionsTaggedFields.put(1, 4000) + txnPartitions.set("_tagged_fields", txnPartitionsTaggedFields) + + // Copy of TransactionLogValue.SCHEMA_1 with a few + // additional tagged fields. + val futureTransactionLogValueSchema = new Schema( + new Field("producer_id", Type.INT64, ""), + new Field("producer_epoch", Type.INT16, ""), + new Field("transaction_timeout_ms", Type.INT32, ""), + new Field("transaction_status", Type.INT8, ""), + new Field("transaction_partitions", CompactArrayOf.nullable(futurePartitionsSchema), ""), + new Field("transaction_last_update_timestamp_ms", Type.INT64, ""), + new Field("transaction_start_timestamp_ms", Type.INT64, ""), + TaggedFieldsSection.of( + 0, new Field("txn_foo", Type.STRING, ""), + 1, new Field("txn_bar", Type.INT32, "") + ) + ) + + // create TransactionLogValue with tagged fields Review Comment: nit: `Create`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org