This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 9d68b8e KAFKA-8803: Remove timestamp check in completeTransitionTo
(#8278)
9d68b8e is described below
commit 9d68b8e3db2df135c799fc9523c99570d1ed6a26
Author: Guozhang Wang <[email protected]>
AuthorDate: Tue Mar 17 14:40:02 2020 -0700
KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)
In prepareAddPartitions the txnStartTimestamp could be updated as
updateTimestamp, which is assumed to be always larger then the original
startTimestamp. However, due to ntp time shift the timer may go backwards and
hence the newStartTimestamp be smaller than the original one. Then later in
completeTransitionTo the time check would fail with an IllegalStateException,
and the txn would not transit to Ongoing.
An indirect result of this, is that this txn would NEVER be expired anymore
because only Ongoing ones would be checked for expiration.
We should do the same as in #3286 to remove this check.
Also added test coverage for both KAFKA-5415 and KAFKA-8803.
Reviewers: Jason Gustafson<[email protected]>
---
.../transaction/TransactionMetadata.scala | 3 +-
.../transaction/TransactionMetadataTest.scala | 215 ++++++++++++++++++---
2 files changed, 194 insertions(+), 24 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 4b57abf..24b418a 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -378,8 +378,7 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
case Ongoing => // from addPartitions
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
- txnTimeoutMs != transitMetadata.txnTimeoutMs ||
- txnStartTimestamp > transitMetadata.txnStartTimestamp) {
+ txnTimeoutMs != transitMetadata.txnTimeoutMs) {
throwStateTransitionFailure(transitMetadata)
} else {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index 506e68d..85ee263 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -17,6 +17,7 @@
package kafka.coordinator.transaction
import kafka.utils.MockTime
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.junit.Assert._
@@ -27,11 +28,11 @@ import scala.collection.mutable
class TransactionMetadataTest {
val time = new MockTime()
+ val producerId = 23423L
+ val transactionalId = "txnlId"
@Test
def testInitializeEpoch(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = RecordBatch.NO_PRODUCER_EPOCH
val txnMetadata = new TransactionMetadata(
@@ -55,8 +56,6 @@ class TransactionMetadataTest {
@Test
def testNormalEpochBump(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@@ -79,8 +78,6 @@ class TransactionMetadataTest {
@Test(expected = classOf[IllegalStateException])
def testBumpEpochNotAllowedIfEpochsExhausted(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@@ -99,9 +96,197 @@ class TransactionMetadataTest {
}
@Test
+ def testTolerateUpdateTimeShiftDuringEpochBump(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Empty,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ val transitMetadata = txnMetadata.prepareIncrementProducerEpoch(30000,
Option(producerEpoch), time.milliseconds() - 1).right.get
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
+ assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateUpdateTimeResetDuringProducerIdRotation(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Empty,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ val transitMetadata = txnMetadata.prepareProducerIdRotation(producerId +
1, 30000, time.milliseconds() - 1, recordLastEpoch = true)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(producerId + 1, txnMetadata.producerId)
+ assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
+ assertEquals(0, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringAddPartitions(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Empty,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = time.milliseconds(),
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller; when transting from Empty the start time would
be updated to the update-time
+ var transitMetadata =
txnMetadata.prepareAddPartitions(Set[TopicPartition](new
TopicPartition("topic1", 0)), time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)),
txnMetadata.topicPartitions)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+
+ // add another partition, check that in Ongoing state the start timestamp
would not change to update time
+ transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new
TopicPartition("topic2", 0)), time.milliseconds() - 2)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new
TopicPartition("topic2", 0)), txnMetadata.topicPartitions)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 2, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringPrepareCommit(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Ongoing,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareCommit,
time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(PrepareCommit, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringPrepareAbort(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = Ongoing,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareAbort,
time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(PrepareAbort, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringCompleteCommit(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = PrepareCommit,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(CompleteCommit, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
+ def testTolerateTimeShiftDuringCompleteAbort(): Unit = {
+ val producerEpoch: Short = 1
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = producerEpoch,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = 30000,
+ state = PrepareAbort,
+ topicPartitions = mutable.Set.empty,
+ txnStartTimestamp = 1L,
+ txnLastUpdateTimestamp = time.milliseconds())
+
+ // let new time be smaller
+ var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
+ txnMetadata.completeTransitionTo(transitMetadata)
+ assertEquals(CompleteAbort, txnMetadata.state)
+ assertEquals(producerId, txnMetadata.producerId)
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(1L, txnMetadata.txnStartTimestamp)
+ assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
+ }
+
+ @Test
def testFenceProducerAfterEpochsExhausted(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@@ -131,8 +316,6 @@ class TransactionMetadataTest {
@Test(expected = classOf[IllegalStateException])
def testFenceProducerNotAllowedIfItWouldOverflow(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = Short.MaxValue
val txnMetadata = new TransactionMetadata(
@@ -151,8 +334,6 @@ class TransactionMetadataTest {
@Test
def testRotateProducerId(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@@ -192,8 +373,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithNewlyCreatedMetadata(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@@ -217,8 +396,6 @@ class TransactionMetadataTest {
@Test
def testEpochBumpWithCurrentEpochProvided(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@@ -242,8 +419,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithLastEpoch(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val lastProducerEpoch = (producerEpoch - 1).toShort
@@ -268,8 +443,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithFencedEpoch(): Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = 735.toShort
val lastProducerEpoch = (producerEpoch - 1).toShort
@@ -290,8 +463,6 @@ class TransactionMetadataTest {
}
private def testRotateProducerIdInOngoingState(state: TransactionState):
Unit = {
- val transactionalId = "txnlId"
- val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(