Repository: kafka Updated Branches: refs/heads/trunk 29994dd10 -> e71dce89c
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala new file mode 100644 index 0000000..e8c918d --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File + +import kafka.server.LogOffsetMetadata +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors._ +import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch} +import org.apache.kafka.common.utils.{MockTime, Utils} +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.scalatest.junit.JUnitSuite + +class ProducerStateManagerTest extends JUnitSuite { + var idMappingDir: File = null + var idMapping: ProducerStateManager = null + val partition = new TopicPartition("test", 0) + val pid = 1L + val maxPidExpirationMs = 60 * 1000 + val time = new MockTime + + @Before + def setUp(): Unit = { + idMappingDir = TestUtils.tempDir() + idMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + } + + @After + def tearDown(): Unit = { + Utils.delete(idMappingDir) + } + + @Test + def testBasicIdMapping(): Unit = { + val epoch = 0.toShort + + // First entry for id 0 added + append(idMapping, pid, 0, epoch, 0L, 0L) + + // Second entry for id 0 added + append(idMapping, pid, 1, epoch, 0L, 1L) + + // Duplicate sequence number (matches previous sequence number) + assertThrows[DuplicateSequenceNumberException] { + append(idMapping, pid, 1, epoch, 0L, 1L) + } + + // Invalid sequence number (greater than next expected sequence number) + assertThrows[OutOfOrderSequenceException] { + append(idMapping, pid, 5, epoch, 0L, 2L) + } + + // Change epoch + append(idMapping, pid, 0, (epoch + 1).toShort, 0L, 3L) + + // Incorrect epoch + assertThrows[ProducerFencedException] { + append(idMapping, pid, 0, epoch, 0L, 4L) + } + } + + @Test + def testNoValidationOnFirstEntryWhenLoadingLog(): Unit = { + val epoch = 5.toShort + val sequence = 16 + val offset = 735L + append(idMapping, pid, sequence, epoch, offset, isLoadingFromLog = true) + + val maybeLastEntry = idMapping.lastEntry(pid) + assertTrue(maybeLastEntry.isDefined) + + val lastEntry = maybeLastEntry.get + assertEquals(epoch, lastEntry.producerEpoch) + assertEquals(sequence, lastEntry.firstSeq) + assertEquals(sequence, lastEntry.lastSeq) + assertEquals(offset, lastEntry.lastOffset) + assertEquals(offset, lastEntry.firstOffset) + } + + @Test + def testControlRecordBumpsEpoch(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L) + + val bumpedEpoch = 1.toShort + val (completedTxn, lastStableOffset) = appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L) + assertEquals(1L, completedTxn.firstOffset) + assertEquals(1L, completedTxn.lastOffset) + assertEquals(2L, lastStableOffset) + assertTrue(completedTxn.isAborted) + assertEquals(pid, completedTxn.producerId) + + val maybeLastEntry = idMapping.lastEntry(pid) + assertTrue(maybeLastEntry.isDefined) + + val lastEntry = maybeLastEntry.get + assertEquals(bumpedEpoch, lastEntry.producerEpoch) + assertEquals(None, lastEntry.currentTxnFirstOffset) + assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.firstSeq) + assertEquals(RecordBatch.NO_SEQUENCE, lastEntry.lastSeq) + + // should be able to append with the new epoch if we start at sequence 0 + append(idMapping, pid, 0, bumpedEpoch, 2L) + assertEquals(Some(0), idMapping.lastEntry(pid).map(_.firstSeq)) + } + + @Test + def testTxnFirstOffsetMetadataCached(): Unit = { + val producerEpoch = 0.toShort + val offset = 992342L + val seq = 0 + val producerAppendInfo = new ProducerAppendInfo(pid, None, false) + producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) + + val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L, + relativePositionInSegment = 234224) + producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) + idMapping.update(producerAppendInfo) + + assertEquals(Some(logOffsetMetadata), idMapping.firstUnstableOffset) + } + + @Test + def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = { + val producerEpoch = 0.toShort + val offset = 992342L + val seq = 0 + val producerAppendInfo = new ProducerAppendInfo(pid, None, false) + producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional = true) + + // use some other offset to simulate a follower append where the log offset metadata won't typically + // match any of the transaction first offsets + val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset - 23429, segmentBaseOffset = 990000L, + relativePositionInSegment = 234224) + producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) + idMapping.update(producerAppendInfo) + + assertEquals(Some(LogOffsetMetadata(offset)), idMapping.firstUnstableOffset) + } + + @Test + def updateProducerTransactionState(): Unit = { + val producerEpoch = 0.toShort + val coordinatorEpoch = 15 + val offset = 9L + append(idMapping, pid, 0, producerEpoch, offset) + + val appendInfo = new ProducerAppendInfo(pid, idMapping.lastEntry(pid), loadingFromLog = false) + appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true) + var lastEntry = appendInfo.lastEntry + assertEquals(producerEpoch, lastEntry.producerEpoch) + assertEquals(1, lastEntry.firstSeq) + assertEquals(5, lastEntry.lastSeq) + assertEquals(16L, lastEntry.firstOffset) + assertEquals(20L, lastEntry.lastOffset) + assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) + assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions) + + appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true) + lastEntry = appendInfo.lastEntry + assertEquals(producerEpoch, lastEntry.producerEpoch) + assertEquals(6, lastEntry.firstSeq) + assertEquals(10, lastEntry.lastSeq) + assertEquals(26L, lastEntry.firstOffset) + assertEquals(30L, lastEntry.lastOffset) + assertEquals(Some(16L), lastEntry.currentTxnFirstOffset) + assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions) + + val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch) + val completedTxn = appendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, 40L, time.milliseconds()) + assertEquals(pid, completedTxn.producerId) + assertEquals(16L, completedTxn.firstOffset) + assertEquals(40L, completedTxn.lastOffset) + assertFalse(completedTxn.isAborted) + + lastEntry = appendInfo.lastEntry + assertEquals(producerEpoch, lastEntry.producerEpoch) + assertEquals(10, lastEntry.firstSeq) + assertEquals(10, lastEntry.lastSeq) + assertEquals(40L, lastEntry.firstOffset) + assertEquals(40L, lastEntry.lastOffset) + assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch) + assertEquals(None, lastEntry.currentTxnFirstOffset) + assertEquals(List(new TxnMetadata(pid, 16L)), appendInfo.startedTransactions) + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testOutOfSequenceAfterControlRecordEpochBump(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L) + append(idMapping, pid, 1, epoch, 1L) + + val bumpedEpoch = 1.toShort + appendEndTxnMarker(idMapping, pid, bumpedEpoch, ControlRecordType.ABORT, 1L) + + // next append is invalid since we expect the sequence to be reset + append(idMapping, pid, 2, bumpedEpoch, 2L) + } + + @Test(expected = classOf[InvalidTxnStateException]) + def testNonTransactionalAppendWithOngoingTransaction(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L, isTransactional = true) + append(idMapping, pid, 1, epoch, 1L, isTransactional = false) + } + + @Test + def testTakeSnapshot(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L, 0L) + append(idMapping, pid, 1, epoch, 1L, 1L) + + // Take snapshot + idMapping.takeSnapshot() + + // Check that file exists and it is not empty + assertEquals("Directory doesn't contain a single file as expected", 1, idMappingDir.list().length) + assertTrue("Snapshot file is empty", idMappingDir.list().head.length > 0) + } + + @Test + def testRecoverFromSnapshot(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L) + append(idMapping, pid, 1, epoch, 1L) + + idMapping.takeSnapshot() + val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(0L, 3L, time.milliseconds) + + // entry added after recovery + append(recoveredMapping, pid, 2, epoch, 2L) + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testRemoveExpiredPidsOnReload(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L, 0) + append(idMapping, pid, 1, epoch, 1L, 1) + + idMapping.takeSnapshot() + val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(0L, 1L, 70000) + + // entry added after recovery. The pid should be expired now, and would not exist in the pid mapping. Hence + // we should get an out of order sequence exception. + append(recoveredMapping, pid, 2, epoch, 2L, 70001) + } + + @Test + def testDeleteSnapshotsBefore(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L) + append(idMapping, pid, 1, epoch, 1L) + idMapping.takeSnapshot() + assertEquals(1, idMappingDir.listFiles().length) + assertEquals(Set(2), currentSnapshotOffsets) + + append(idMapping, pid, 2, epoch, 2L) + idMapping.takeSnapshot() + assertEquals(2, idMappingDir.listFiles().length) + assertEquals(Set(2, 3), currentSnapshotOffsets) + + idMapping.deleteSnapshotsBefore(3L) + assertEquals(1, idMappingDir.listFiles().length) + assertEquals(Set(3), currentSnapshotOffsets) + + idMapping.deleteSnapshotsBefore(4L) + assertEquals(0, idMappingDir.listFiles().length) + assertEquals(Set(), currentSnapshotOffsets) + } + + @Test + def testTruncate(): Unit = { + val epoch = 0.toShort + + append(idMapping, pid, 0, epoch, 0L) + append(idMapping, pid, 1, epoch, 1L) + idMapping.takeSnapshot() + assertEquals(1, idMappingDir.listFiles().length) + assertEquals(Set(2), currentSnapshotOffsets) + + append(idMapping, pid, 2, epoch, 2L) + idMapping.takeSnapshot() + assertEquals(2, idMappingDir.listFiles().length) + assertEquals(Set(2, 3), currentSnapshotOffsets) + + idMapping.truncate() + + assertEquals(0, idMappingDir.listFiles().length) + assertEquals(Set(), currentSnapshotOffsets) + + append(idMapping, pid, 0, epoch, 0L) + idMapping.takeSnapshot() + assertEquals(1, idMappingDir.listFiles().length) + assertEquals(Set(1), currentSnapshotOffsets) + } + + @Test + def testFirstUnstableOffsetAfterTruncation(): Unit = { + val epoch = 0.toShort + val sequence = 0 + + append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) + assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset)) + idMapping.takeSnapshot() + + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 105) + idMapping.onHighWatermarkUpdated(106) + assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset)) + idMapping.takeSnapshot() + + append(idMapping, pid, sequence + 1, epoch, offset = 106) + idMapping.truncateAndReload(0L, 106, time.milliseconds()) + assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset)) + + idMapping.truncateAndReload(0L, 100L, time.milliseconds()) + assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset)) + } + + @Test + def testFirstUnstableOffsetAfterEviction(): Unit = { + val epoch = 0.toShort + val sequence = 0 + append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) + assertEquals(Some(99), idMapping.firstUnstableOffset.map(_.messageOffset)) + append(idMapping, 2L, 0, epoch, offset = 106, isTransactional = true) + idMapping.evictUnretainedProducers(100) + assertEquals(Some(106), idMapping.firstUnstableOffset.map(_.messageOffset)) + } + + @Test + def testEvictUnretainedPids(): Unit = { + val epoch = 0.toShort + + append(idMapping, pid, 0, epoch, 0L) + append(idMapping, pid, 1, epoch, 1L) + idMapping.takeSnapshot() + + val anotherPid = 2L + append(idMapping, anotherPid, 0, epoch, 2L) + append(idMapping, anotherPid, 1, epoch, 3L) + idMapping.takeSnapshot() + assertEquals(Set(2, 4), currentSnapshotOffsets) + + idMapping.evictUnretainedProducers(2) + assertEquals(Set(4), currentSnapshotOffsets) + assertEquals(Set(anotherPid), idMapping.activeProducers.keySet) + assertEquals(None, idMapping.lastEntry(pid)) + + val maybeEntry = idMapping.lastEntry(anotherPid) + assertTrue(maybeEntry.isDefined) + assertEquals(3L, maybeEntry.get.lastOffset) + + idMapping.evictUnretainedProducers(3) + assertEquals(Set(anotherPid), idMapping.activeProducers.keySet) + assertEquals(Set(4), currentSnapshotOffsets) + assertEquals(4, idMapping.mapEndOffset) + + idMapping.evictUnretainedProducers(5) + assertEquals(Set(), idMapping.activeProducers.keySet) + assertEquals(Set(), currentSnapshotOffsets) + assertEquals(5, idMapping.mapEndOffset) + } + + @Test + def testSkipSnapshotIfOffsetUnchanged(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, 0L, 0L) + + idMapping.takeSnapshot() + assertEquals(1, idMappingDir.listFiles().length) + assertEquals(Set(1), currentSnapshotOffsets) + + // nothing changed so there should be no new snapshot + idMapping.takeSnapshot() + assertEquals(1, idMappingDir.listFiles().length) + assertEquals(Set(1), currentSnapshotOffsets) + } + + @Test + def testStartOffset(): Unit = { + val epoch = 0.toShort + val pid2 = 2L + append(idMapping, pid2, 0, epoch, 0L, 1L) + append(idMapping, pid, 0, epoch, 1L, 2L) + append(idMapping, pid, 1, epoch, 2L, 3L) + append(idMapping, pid, 2, epoch, 3L, 4L) + idMapping.takeSnapshot() + + intercept[OutOfOrderSequenceException] { + val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(0L, 1L, time.milliseconds) + append(recoveredMapping, pid2, 1, epoch, 4L, 5L) + } + } + + @Test(expected = classOf[OutOfOrderSequenceException]) + def testPidExpirationTimeout() { + val epoch = 5.toShort + val sequence = 37 + append(idMapping, pid, sequence, epoch, 1L) + time.sleep(maxPidExpirationMs + 1) + idMapping.removeExpiredProducers(time.milliseconds) + append(idMapping, pid, sequence + 1, epoch, 1L) + } + + @Test + def testFirstUnstableOffset() { + val epoch = 5.toShort + val sequence = 0 + + assertEquals(None, idMapping.firstUndecidedOffset) + + append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) + assertEquals(Some(99L), idMapping.firstUndecidedOffset) + assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + + val anotherPid = 2L + append(idMapping, anotherPid, sequence, epoch, offset = 105, isTransactional = true) + assertEquals(Some(99L), idMapping.firstUndecidedOffset) + assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 109) + assertEquals(Some(105L), idMapping.firstUndecidedOffset) + assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + + idMapping.onHighWatermarkUpdated(100L) + assertEquals(Some(99L), idMapping.firstUnstableOffset.map(_.messageOffset)) + + idMapping.onHighWatermarkUpdated(110L) + assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset)) + + appendEndTxnMarker(idMapping, anotherPid, epoch, ControlRecordType.ABORT, offset = 112) + assertEquals(None, idMapping.firstUndecidedOffset) + assertEquals(Some(105L), idMapping.firstUnstableOffset.map(_.messageOffset)) + + idMapping.onHighWatermarkUpdated(113L) + assertEquals(None, idMapping.firstUnstableOffset.map(_.messageOffset)) + } + + @Test + def testProducersWithOngoingTransactionsDontExpire() { + val epoch = 5.toShort + val sequence = 0 + + append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) + assertEquals(Some(99L), idMapping.firstUndecidedOffset) + + time.sleep(maxPidExpirationMs + 1) + idMapping.removeExpiredProducers(time.milliseconds) + + assertTrue(idMapping.lastEntry(pid).isDefined) + assertEquals(Some(99L), idMapping.firstUndecidedOffset) + + idMapping.removeExpiredProducers(time.milliseconds) + assertTrue(idMapping.lastEntry(pid).isDefined) + } + + @Test(expected = classOf[ProducerFencedException]) + def testOldEpochForControlRecord(): Unit = { + val epoch = 5.toShort + val sequence = 0 + + assertEquals(None, idMapping.firstUndecidedOffset) + + append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) + appendEndTxnMarker(idMapping, pid, 3.toShort, ControlRecordType.COMMIT, offset=100) + } + + @Test + def testCoordinatorFencing(): Unit = { + val epoch = 5.toShort + val sequence = 0 + + append(idMapping, pid, sequence, epoch, offset = 99, isTransactional = true) + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) + + val lastEntry = idMapping.lastEntry(pid) + assertEquals(Some(1), lastEntry.map(_.coordinatorEpoch)) + + // writing with the current epoch is allowed + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 101, coordinatorEpoch = 1) + + // bumping the epoch is allowed + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 102, coordinatorEpoch = 2) + + // old epochs are not allowed + try { + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 103, coordinatorEpoch = 1) + fail("Expected coordinator to be fenced") + } catch { + case e: TransactionCoordinatorFencedException => + } + } + + @Test(expected = classOf[TransactionCoordinatorFencedException]) + def testCoordinatorFencedAfterReload(): Unit = { + val epoch = 0.toShort + append(idMapping, pid, 0, epoch, offset = 99, isTransactional = true) + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 1) + idMapping.takeSnapshot() + + val recoveredMapping = new ProducerStateManager(partition, idMappingDir, maxPidExpirationMs) + recoveredMapping.truncateAndReload(0L, 2L, 70000) + + // append from old coordinator should be rejected + appendEndTxnMarker(idMapping, pid, epoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0) + } + + private def appendEndTxnMarker(mapping: ProducerStateManager, + pid: Long, + epoch: Short, + controlType: ControlRecordType, + offset: Long, + coordinatorEpoch: Int = 0, + timestamp: Long = time.milliseconds()): (CompletedTxn, Long) = { + val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)) + val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) + val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, epoch, offset, timestamp) + mapping.update(producerAppendInfo) + val lastStableOffset = mapping.completeTxn(completedTxn) + mapping.updateMapEndOffset(offset + 1) + (completedTxn, lastStableOffset) + } + + private def append(mapping: ProducerStateManager, + pid: Long, + seq: Int, + epoch: Short, + offset: Long, + timestamp: Long = time.milliseconds(), + isTransactional: Boolean = false, + isLoadingFromLog: Boolean = false): Unit = { + val producerAppendInfo = new ProducerAppendInfo(pid, mapping.lastEntry(pid), isLoadingFromLog) + producerAppendInfo.append(epoch, seq, seq, timestamp, offset, isTransactional) + mapping.update(producerAppendInfo) + mapping.updateMapEndOffset(offset + 1) + } + + private def currentSnapshotOffsets = + idMappingDir.listFiles().map(file => Log.offsetFromFilename(file.getName)).toSet + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala new file mode 100644 index 0000000..4546818 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.File + +import kafka.utils.TestUtils +import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.scalatest.junit.JUnitSuite + +class TransactionIndexTest extends JUnitSuite { + var file: File = _ + var index: TransactionIndex = _ + val offset = 0L + + @Before + def setup: Unit = { + file = TestUtils.tempFile() + index = new TransactionIndex(offset, file) + } + + @After + def teardown: Unit = { + index.close() + } + + @Test + def testPositionSetCorrectlyWhenOpened(): Unit = { + val abortedTxns = List( + new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), + new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), + new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), + new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + abortedTxns.foreach(index.append) + index.close() + + val reopenedIndex = new TransactionIndex(0L, file) + val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55) + reopenedIndex.append(anotherAbortedTxn) + assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testSanityCheck(): Unit = { + val abortedTxns = List( + new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), + new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), + new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), + new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + abortedTxns.foreach(index.append) + index.close() + + // open the index with a different starting offset to fake invalid data + val reopenedIndex = new TransactionIndex(100L, file) + reopenedIndex.sanityCheck() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testLastOffsetMustIncrease(): Unit = { + index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13)) + index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 15, lastStableOffset = 11)) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testLastOffsetCannotDecrease(): Unit = { + index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13)) + index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11)) + } + + @Test + def testCollectAbortedTransactions(): Unit = { + val abortedTxns = List( + new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11), + new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13), + new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), + new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + + abortedTxns.foreach(index.append) + + val abortedTransactions = abortedTxns.map(_.asAbortedTransaction) + + var result = index.collectAbortedTxns(0L, 100L) + assertEquals(abortedTransactions, result.abortedTransactions) + assertFalse(result.isComplete) + + result = index.collectAbortedTxns(0L, 32) + assertEquals(abortedTransactions.take(3), result.abortedTransactions) + assertTrue(result.isComplete) + + result = index.collectAbortedTxns(0L, 35) + assertEquals(abortedTransactions, result.abortedTransactions) + assertTrue(result.isComplete) + + result = index.collectAbortedTxns(10, 35) + assertEquals(abortedTransactions, result.abortedTransactions) + assertTrue(result.isComplete) + + result = index.collectAbortedTxns(11, 35) + assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions) + assertTrue(result.isComplete) + + result = index.collectAbortedTxns(20, 41) + assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions) + assertFalse(result.isComplete) + } + + @Test + def testTruncate(): Unit = { + val abortedTxns = List( + new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2), + new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16), + new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25), + new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40)) + val abortedTransactions = abortedTxns.map(_.asAbortedTransaction) + + abortedTxns.foreach(index.append) + + index.truncateTo(51) + assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions) + + index.truncateTo(50) + assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions) + + index.truncate() + assertEquals(List.empty[AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions) + } + + @Test + def testAbortedTxnSerde(): Unit = { + val pid = 983493L + val firstOffset = 137L + val lastOffset = 299L + val lastStableOffset = 200L + + val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset) + assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version) + assertEquals(pid, abortedTxn.producerId) + assertEquals(firstOffset, abortedTxn.firstOffset) + assertEquals(lastOffset, abortedTxn.lastOffset) + assertEquals(lastStableOffset, abortedTxn.lastStableOffset) + } + + @Test + def testRenameIndex(): Unit = { + val renamed = TestUtils.tempFile() + index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2)) + + index.renameTo(renamed) + index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16)) + + val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions + assertEquals(2, abortedTxns.size) + assertEquals(0, abortedTxns(0).firstOffset) + assertEquals(5, abortedTxns(1).firstOffset) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 5dfcb63..415027c 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -18,9 +18,8 @@ package kafka.server import java.io.File -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, Random} -import java.lang.{Long => JLong} import kafka.admin.AdminUtils import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo} @@ -32,13 +31,10 @@ import kafka.utils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.{MemoryRecords, Record} -import org.apache.kafka.common.requests.DeleteRecordsRequest import org.apache.kafka.common.utils.{Time, Utils} import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ import org.junit.{After, Before, Test} -import scala.collection.JavaConverters._ class LogOffsetTest extends ZooKeeperTestHarness { val random = new Random() @@ -239,9 +235,9 @@ class LogOffsetTest extends ZooKeeperTestHarness { def testFetchOffsetsBeforeWithChangingSegmentSize() { val log = EasyMock.niceMock(classOf[Log]) val logSegment = EasyMock.niceMock(classOf[LogSegment]) - EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Long] { - private val value = new AtomicLong(0) - def answer: Long = value.getAndIncrement() + EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] { + private val value = new AtomicInteger(0) + def answer: Int = value.getAndIncrement() }) EasyMock.replay(logSegment) val logSegments = Seq(logSegment) http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index d6b1649..9f7a47a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -24,10 +24,11 @@ import kafka.log.Log import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.easymock.EasyMock import EasyMock._ +import org.apache.kafka.common.requests.IsolationLevel import org.junit.Assert._ import org.junit.{After, Test} @@ -152,14 +153,14 @@ class ReplicaManagerQuotasTest { expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes() //if we ask for len 1 return a message - expect(log.read(anyObject(), geq(1), anyObject(), anyObject())).andReturn( + expect(log.read(anyObject(), geq(1), anyObject(), anyObject(), anyObject())).andReturn( FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, record) )).anyTimes() //if we ask for len = 0, return 0 messages - expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject())).andReturn( + expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject(), anyObject())).andReturn( FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.EMPTY http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e00c142..4886b94 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -21,7 +21,6 @@ import java.io.File import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean -import kafka.cluster.Broker import kafka.log.LogConfig import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} import TestUtils.createBroker @@ -29,7 +28,7 @@ import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState} +import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, PartitionState} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.{Node, TopicPartition} @@ -109,6 +108,7 @@ class ReplicaManagerTest { timeout = 0, requiredAcks = 3, internalTopicsAllowed = false, + isFromClient = true, entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes))), responseCallback = callback) @@ -166,6 +166,7 @@ class ReplicaManagerTest { timeout = 1000, requiredAcks = -1, internalTopicsAllowed = false, + isFromClient = true, entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))), responseCallback = produceCallback) @@ -178,7 +179,8 @@ class ReplicaManagerTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - responseCallback = fetchCallback) + responseCallback = fetchCallback, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, @@ -192,7 +194,133 @@ class ReplicaManagerTest { rm.shutdown(checkpointHW = false) } } - + + @Test + def testReadCommittedFetchLimitedAtLSO(): Unit = { + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) + props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + props.put("broker.id", Int.box(0)) + val config = KafkaConfig.fromProps(props) + val logProps = new Properties() + logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray, LogConfig(logProps)) + val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) + val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() + EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(0))).andReturn(true).anyTimes() + EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes() + EasyMock.replay(metadataCache) + val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName)) + + try { + val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava + val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava + + val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0) + + // Make this replica the leader. + val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => {}) + rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) + + def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = + responseStatus.values.foreach { status => + assertEquals(Errors.NONE, status.error) + } + + val producerId = 234L + val epoch = 5.toShort + + // write a few batches as part of a transaction + val numRecords = 3 + for (sequence <- 0 until numRecords) { + val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, + new SimpleRecord(s"message $sequence".getBytes)) + rm.appendRecords( + timeout = 1000, + requiredAcks = -1, + internalTopicsAllowed = false, + isFromClient = true, + entriesPerPartition = Map(new TopicPartition(topic, 0) -> records), + responseCallback = produceCallback) + } + + var fetchCallbackFired = false + var fetchError = Errors.NONE + var fetchedRecords: Records = null + def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { + fetchError = responseStatus.map(_._2).head.error + fetchedRecords = responseStatus.map(_._2).head.records + fetchCallbackFired = true + } + + def fetchMessages(fetchInfos: Seq[(TopicPartition, PartitionData)], + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = { + rm.fetchMessages( + timeout = 1000, + replicaId = 1, + fetchMinBytes = 0, + fetchMaxBytes = Int.MaxValue, + hardMaxBytesLimit = false, + fetchInfos = fetchInfos, + responseCallback = fetchCallback, + isolationLevel = isolationLevel) + } + + // fetch as follower to advance the high watermark + fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)), + isolationLevel = IsolationLevel.READ_UNCOMMITTED) + + // fetch should return empty since LSO should be stuck at 0 + fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + isolationLevel = IsolationLevel.READ_COMMITTED) + + assertTrue(fetchCallbackFired) + assertEquals(Errors.NONE, fetchError) + assertTrue(fetchedRecords.batches.asScala.isEmpty) + fetchCallbackFired = false + + // now commit the transaction + val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) + val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) + rm.appendRecords( + timeout = 1000, + requiredAcks = -1, + internalTopicsAllowed = false, + isFromClient = false, + entriesPerPartition = Map(new TopicPartition(topic, 0) -> commitRecordBatch), + responseCallback = produceCallback) + + // the LSO has advanced, but the appended commit marker has not been replicated, so + // none of the data from the transaction should be visible yet + fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + isolationLevel = IsolationLevel.READ_COMMITTED) + + assertTrue(fetchCallbackFired) + assertEquals(Errors.NONE, fetchError) + assertTrue(fetchedRecords.batches.asScala.isEmpty) + fetchCallbackFired = false + + // fetch as follower to advance the high watermark + fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)), + isolationLevel = IsolationLevel.READ_UNCOMMITTED) + + // now all of the records should be fetchable + fetchMessages(fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), + isolationLevel = IsolationLevel.READ_COMMITTED) + + assertTrue(fetchCallbackFired) + assertEquals(Errors.NONE, fetchError) + assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size) + } finally { + rm.shutdown(checkpointHW = false) + } + } + @Test def testFetchBeyondHighWatermarkReturnEmptyResponse() { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) @@ -211,8 +339,8 @@ class ReplicaManagerTest { EasyMock.replay(metadataCache) val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName)) + try { - val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava @@ -234,6 +362,7 @@ class ReplicaManagerTest { timeout = 1000, requiredAcks = -1, internalTopicsAllowed = false, + isFromClient = true, entriesPerPartition = Map(new TopicPartition(topic, 0) -> TestUtils.singletonRecords("message %d".format(i).getBytes)), responseCallback = produceCallback) @@ -254,7 +383,8 @@ class ReplicaManagerTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), - responseCallback = fetchCallback) + responseCallback = fetchCallback, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) assertTrue(fetchCallbackFired) @@ -270,11 +400,12 @@ class ReplicaManagerTest { fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, fetchInfos = Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), - responseCallback = fetchCallback) + responseCallback = fetchCallback, + isolationLevel = IsolationLevel.READ_UNCOMMITTED) - assertTrue(fetchCallbackFired) - assertEquals("Should not give an exception", Errors.NONE, fetchError) - assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords) + assertTrue(fetchCallbackFired) + assertEquals("Should not give an exception", Errors.NONE, fetchError) + assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords) } finally { rm.shutdown(checkpointHW = false) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 5e91c9b..9270544 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -49,7 +49,7 @@ class RequestQuotaTest extends BaseRequestTest { private var leaderNode: KafkaServer = null // Run tests concurrently since a throttle could be up to 1 second because quota percentage allocated is very low - case class Task(val apiKey: ApiKeys, val future: Future[_]) + case class Task(apiKey: ApiKeys, future: Future[_]) private val executor = Executors.newCachedThreadPool private val tasks = new ListBuffer[Task] @@ -183,7 +183,8 @@ class RequestQuotaTest extends BaseRequestTest { new requests.MetadataRequest.Builder(List(topic).asJava) case ApiKeys.LIST_OFFSETS => - requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava) + requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) + .setTargetTimes(Map(tp -> (0L: java.lang.Long)).asJava) case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue, @@ -285,7 +286,7 @@ class RequestQuotaTest extends BaseRequestTest { apiKey.parseResponse(request.version, responseBuffer) } - case class Client(val clientId: String, val apiKey: ApiKeys) { + case class Client(clientId: String, apiKey: ApiKeys) { var correlationId: Int = 0 val builder = requestBuilder(apiKey) def runUntil(until: (Struct) => Boolean): Boolean = { http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index ba17db6..d7822c1 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -28,7 +28,8 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.requests.IsolationLevel import org.easymock.EasyMock import org.junit.Assert._ @@ -79,12 +80,12 @@ class SimpleFetchTest { EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes() - EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn( + EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true, IsolationLevel.READ_UNCOMMITTED)).andReturn( FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, recordToHW) )).anyTimes() - EasyMock.expect(log.read(0, fetchSize, None, true)).andReturn( + EasyMock.expect(log.read(0, fetchSize, None, true, IsolationLevel.READ_UNCOMMITTED)).andReturn( FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), MemoryRecords.withRecords(CompressionType.NONE, recordToLEO) http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index bedc7bc..5d9e7c1 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -735,7 +735,7 @@ object TestUtils extends Logging { * @return The new leader or assertion failure if timeout is reached. */ def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int, - timeoutMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, + timeoutMs: Long = 30000, oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = { require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader") val startTime = System.currentTimeMillis()
