This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 50598191dcd MINOR: Add tests on TxnOffsetCommit and EndTxnMarker protection against invalid producer epoch when TV2 is used (#20024) 50598191dcd is described below commit 50598191dcd4763384ef1798fc07db899f731f3d Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com> AuthorDate: Sun Jul 20 18:34:29 2025 -0400 MINOR: Add tests on TxnOffsetCommit and EndTxnMarker protection against invalid producer epoch when TV2 is used (#20024) This patch adds an API level integration test for the producer epoch verification when processing transactional offset commit and end txn markers. Reviewers: PoAn Yang <pay...@apache.org>, TengYao Chi <kiting...@gmail.com>, Sean Quah <sq...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com> --- .../server/GroupCoordinatorBaseRequestTest.scala | 56 ++++++- .../kafka/server/TxnOffsetCommitRequestTest.scala | 141 +++++++++++++---- .../kafka/server/WriteTxnMarkersRequestTest.scala | 172 +++++++++++++++++++++ 3 files changed, 338 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 67e4cb8df53..431e431504f 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -19,14 +19,16 @@ package kafka.server import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.{TopicCollection, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment -import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRes [...] +import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic} +import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRes [...] import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...] +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...] import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.utils.ProducerIdAndEpoch @@ -352,6 +354,35 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { assertEquals(expectedError, connectAndReceive[EndTxnResponse](request).error) } + protected def writeTxnMarkers( + producerId: Long, + producerEpoch: Short, + committed: Boolean, + expectedError: Errors = Errors.NONE, + version: Short = ApiKeys.WRITE_TXN_MARKERS.latestVersion(isUnstableApiEnabled) + ): Unit = { + val request = new WriteTxnMarkersRequest.Builder( + new WriteTxnMarkersRequestData() + .setMarkers(List( + new WritableTxnMarker() + .setProducerId(producerId) + .setProducerEpoch(producerEpoch) + .setTransactionResult(committed) + .setTopics(List( + new WritableTxnMarkerTopic() + .setName(Topic.GROUP_METADATA_TOPIC_NAME) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava) + .setCoordinatorEpoch(0) + ).asJava) + ).build(version) + + assertEquals( + expectedError.code, + connectAndReceive[WriteTxnMarkersResponse](request).data.markers.get(0).topics.get(0).partitions.get(0).errorCode + ) + } + protected def fetchOffsets( groups: List[OffsetFetchRequestData.OffsetFetchRequestGroup], requireStable: Boolean, @@ -422,6 +453,27 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { groupResponse } + protected def fetchOffset( + groupId: String, + topic: String, + partition: Int + ): Long = { + val groupIdRecord = fetchOffsets( + group = new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(topic) + .setPartitionIndexes(List[Integer](partition).asJava) + ).asJava), + requireStable = true, + version = 9 + ) + val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head + val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head + partitionRecord.committedOffset + } + protected def deleteOffset( groupId: String, topic: String, diff --git a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala index b0f1bee2333..aef40390d85 100644 --- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala @@ -16,19 +16,16 @@ */ package kafka.server -import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import kafka.utils.TestUtils import org.apache.kafka.common.errors.UnsupportedVersionException -import org.apache.kafka.common.message.OffsetFetchRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.JoinGroupRequest +import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest} import org.apache.kafka.common.test.ClusterInstance +import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import org.apache.kafka.common.utils.ProducerIdAndEpoch import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig -import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue} - -import scala.jdk.CollectionConverters._ +import org.junit.jupiter.api.Assertions.{assertNotEquals, assertThrows} @ClusterTestDefaults( types = Array(Type.KRAFT), @@ -51,6 +48,16 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat testTxnOffsetCommit(false) } + @ClusterTest + def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithNewConsumerGroupProtocol(): Unit = { + testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(true) + } + + @ClusterTest + def testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithOldConsumerGroupProtocol(): Unit = { + testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(false) + } + private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = { val topic = "topic" val partition = 0 @@ -65,8 +72,8 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat // Join the consumer group. Note that we don't heartbeat here so we must use // a session long enough for the duration of the test. val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol) - assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) - assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID) + assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId) + assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch) createTopic(topic, 1) @@ -178,7 +185,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat transactionalId = transactionalId ) - val originalOffset = fetchOffset(topic, partition, groupId) + val originalOffset = fetchOffset(groupId, topic, partition) commitTxnOffset( groupId = groupId, @@ -207,31 +214,107 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat TestUtils.waitUntilTrue(() => try { - fetchOffset(topic, partition, groupId) == expectedOffset + fetchOffset(groupId, topic, partition) == expectedOffset } catch { case _: Throwable => false }, "txn commit offset validation failed" ) } - private def fetchOffset( - topic: String, - partition: Int, - groupId: String - ): Long = { - val groupIdRecord = fetchOffsets( - group = new OffsetFetchRequestData.OffsetFetchRequestGroup() - .setGroupId(groupId) - .setTopics(List( - new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName(topic) - .setPartitionIndexes(List[Integer](partition).asJava) - ).asJava), - requireStable = true, - version = 9 - ) - val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head - val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex == partition).head - partitionRecord.committedOffset + private def testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(useNewProtocol: Boolean): Unit = { + val topic = "topic" + val partition = 0 + val transactionalId = "txn" + val groupId = "group" + val offset = 100L + + // Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + createTransactionStateTopic() + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol) + assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId) + assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch) + + createTopic(topic, 1) + + for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { + val useTV2 = version > EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 + + // Initialize producer. Wait until the coordinator finishes loading. + var producerIdAndEpoch: ProducerIdAndEpoch = null + TestUtils.waitUntilTrue(() => + try { + producerIdAndEpoch = initProducerId( + transactionalId = transactionalId, + producerIdAndEpoch = ProducerIdAndEpoch.NONE, + expectedError = Errors.NONE + ) + true + } catch { + case _: Throwable => false + }, "initProducerId request failed" + ) + + addOffsetsToTxn( + groupId = groupId, + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId + ) + + // Complete the transaction. + endTxn( + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId, + isTransactionV2Enabled = useTV2, + committed = true, + expectedError = Errors.NONE + ) + + // Start a new transaction. Wait for the previous transaction to complete. + TestUtils.waitUntilTrue(() => + try { + addOffsetsToTxn( + groupId = groupId, + producerId = producerIdAndEpoch.producerId, + producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch, + transactionalId = transactionalId + ) + true + } catch { + case _: Throwable => false + }, "addOffsetsToTxn request failed" + ) + + // Committing offset with old epoch succeeds for TV1 and fails for TV2. + commitTxnOffset( + groupId = groupId, + memberId = if (version >= 3) memberId else JoinGroupRequest.UNKNOWN_MEMBER_ID, + generationId = if (version >= 3) 1 else JoinGroupRequest.UNKNOWN_GENERATION_ID, + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId, + topic = topic, + partition = partition, + offset = offset, + expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else Errors.NONE, + version = version.toShort + ) + + // Complete the transaction. + endTxn( + producerId = producerIdAndEpoch.producerId, + producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch, + transactionalId = transactionalId, + isTransactionV2Enabled = useTV2, + committed = true, + expectedError = Errors.NONE + ) + } } } diff --git a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala new file mode 100644 index 00000000000..a68de4dacc0 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest} +import org.apache.kafka.common.test.ClusterInstance +import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import org.apache.kafka.common.utils.ProducerIdAndEpoch +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfig +import org.junit.jupiter.api.Assertions.assertNotEquals + +@ClusterTestDefaults( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + ) +) +class WriteTxnMarkersRequestTest(cluster:ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest + def testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithNewConsumerGroupProtocol(): Unit = { + testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(true) + } + + @ClusterTest + def testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithOldConsumerGroupProtocol(): Unit = { + testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(false) + } + + private def testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(useNewProtocol: Boolean): Unit = { + val topic = "topic" + val partition = 0 + val transactionalId = "txn" + val groupId = "group" + val offset = 100L + + // Creates the __consumer_offsets and __transaction_state topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + createTransactionStateTopic() + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, useNewProtocol) + assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId) + assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch) + + createTopic(topic, 1) + + for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) { + val useTV2 = version > EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 + + // Initialize producer. Wait until the coordinator finishes loading. + var producerIdAndEpoch: ProducerIdAndEpoch = null + TestUtils.waitUntilTrue(() => + try { + producerIdAndEpoch = initProducerId( + transactionalId = transactionalId, + producerIdAndEpoch = ProducerIdAndEpoch.NONE, + expectedError = Errors.NONE + ) + true + } catch { + case _: Throwable => false + }, "initProducerId request failed" + ) + + addOffsetsToTxn( + groupId = groupId, + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId + ) + + // Complete the transaction. + endTxn( + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + transactionalId = transactionalId, + isTransactionV2Enabled = useTV2, + committed = true, + expectedError = Errors.NONE + ) + + // Start a new transaction. Wait for the previous transaction to complete. + TestUtils.waitUntilTrue(() => + try { + addOffsetsToTxn( + groupId = groupId, + producerId = producerIdAndEpoch.producerId, + producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch, + transactionalId = transactionalId + ) + true + } catch { + case _: Throwable => false + }, "addOffsetsToTxn request failed" + ) + + commitTxnOffset( + groupId = groupId, + memberId = if (version >= 3) memberId else JoinGroupRequest.UNKNOWN_MEMBER_ID, + generationId = if (version >= 3) 1 else JoinGroupRequest.UNKNOWN_GENERATION_ID, + producerId = producerIdAndEpoch.producerId, + producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch, + transactionalId = transactionalId, + topic = topic, + partition = partition, + offset = offset + version, + expectedError = Errors.NONE, + version = version.toShort + ) + + // Delayed txn marker should be accepted for TV1 and rejected for TV2. + // Note that for the ideal case, producer epoch + 1 should also be rejected for TV2, + // which is still under fixing. + writeTxnMarkers( + producerId = producerIdAndEpoch.producerId, + producerEpoch = producerIdAndEpoch.epoch, + committed = true, + expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else Errors.NONE + ) + + // The offset is committed for TV1 and not committed for TV2. + TestUtils.waitUntilTrue(() => + try { + fetchOffset(groupId, topic, partition) == (if (useTV2) -1L else offset + version) + } catch { + case _: Throwable => false + }, "unexpected txn commit offset" + ) + + // Complete the transaction. + endTxn( + producerId = producerIdAndEpoch.producerId, + producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort else producerIdAndEpoch.epoch, + transactionalId = transactionalId, + isTransactionV2Enabled = useTV2, + committed = true, + expectedError = Errors.NONE + ) + + // The offset is committed for TV2. + TestUtils.waitUntilTrue(() => + try { + fetchOffset(groupId, topic, partition) == offset + version + } catch { + case _: Throwable => false + }, "txn commit offset validation failed" + ) + } + } +}