This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new ca33700 KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133) ca33700 is described below commit ca3370091f4cc1f120f3919ebd42b06dee7cc654 Author: Anna Povzner <a...@confluent.io> AuthorDate: Thu Jun 14 08:26:45 2018 -0700 KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset (#5133) It is possible that log start offset may fall in the middle of the batch after AdminClient#deleteRecords(). This will cause a follower starting from log start offset to fail fetching (all records). Use-cases when a follower will start fetching from log start offset includes: 1) new replica due to partition re-assignment; 2) new local replica created as a result of AdminClient#AlterReplicaLogDirs(); 3) broker that was down for some time while AdminClient#deleteRecords() move log start [...] Added two integration tests: 1) Produce and then AdminClient#deleteRecords() while one of the followers is down, and then restart of the follower requires fetching from log start offset; 2) AdminClient#AlterReplicaLogDirs() after AdminClient#deleteRecords() Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- core/src/main/scala/kafka/cluster/Partition.scala | 47 +++++- .../kafka/common/OffsetsOutOfOrderException.scala | 25 +++ .../common/UnexpectedAppendOffsetException.scala | 29 ++++ core/src/main/scala/kafka/log/Log.scala | 50 ++++-- .../kafka/server/ReplicaAlterLogDirsThread.scala | 3 +- .../scala/kafka/server/ReplicaFetcherThread.scala | 2 +- .../kafka/api/AdminClientIntegrationTest.scala | 77 +++++++++ .../scala/unit/kafka/cluster/PartitionTest.scala | 174 +++++++++++++++++++++ core/src/test/scala/unit/kafka/log/LogTest.scala | 69 +++++++- 9 files changed, 449 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b9180a4..55f870e 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.api.LeaderAndIsr import kafka.api.Request +import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogConfig} import kafka.metrics.KafkaMetricsGroup @@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.AdminZkClient import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords @@ -187,6 +188,10 @@ class Partition(val topic: String, def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId)) + def getReplicaOrException(replicaId: Int = localBrokerId): Replica = + getReplica(replicaId).getOrElse( + throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition")) + def leaderReplicaIfLocal: Option[Replica] = leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica) @@ -545,15 +550,41 @@ class Partition(val topic: String, laggingReplicas } - def appendRecordsToFutureReplica(records: MemoryRecords) { - getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records) + private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { + if (isFuture) + getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records) + else { + // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread + // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. + inReadLock(leaderIsrUpdateLock) { + getReplicaOrException().log.get.appendAsFollower(records) + } + } } - def appendRecordsToFollower(records: MemoryRecords) { - // The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread - // is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. - inReadLock(leaderIsrUpdateLock) { - getReplica().get.log.get.appendAsFollower(records) + def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) { + try { + doAppendRecordsToFollowerOrFutureReplica(records, isFuture) + } catch { + case e: UnexpectedAppendOffsetException => + val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException() + val logEndOffset = replica.logEndOffset.messageOffset + if (logEndOffset == replica.logStartOffset && + e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) { + // This may happen if the log start offset on the leader (or current replica) falls in + // the middle of the batch due to delete records request and the follower tries to + // fetch its first offset from the leader. + // We handle this case here instead of Log#append() because we will need to remove the + // segment that start with log start offset and create a new one with earlier offset + // (base offset of the batch), which will move recoveryPoint backwards, so we will need + // to checkpoint the new recovery point before we append + val replicaName = if (isFuture) "future replica" else "follower" + info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset ${replica.logStartOffset}." + + s" Since this is the first record to be appended to the $replicaName's log, will start the log from offset ${e.firstOffset}.") + truncateFullyAndStartAt(e.firstOffset, isFuture) + doAppendRecordsToFollowerOrFutureReplica(records, isFuture) + } else + throw e } } diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala new file mode 100644 index 0000000..f8daaa4 --- /dev/null +++ b/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the follower received records with non-monotonically increasing offsets + */ +class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) { +} + diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala new file mode 100644 index 0000000..e719a93 --- /dev/null +++ b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the follower or the future replica received records from the leader (or current + * replica) with first offset less than expected next offset. + * @param firstOffset The first offset of the records to append + * @param lastOffset The last offset of the records to append + */ +class UnexpectedAppendOffsetException(val message: String, + val firstOffset: Long, + val lastOffset: Long) extends RuntimeException(message) { +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c7d2a6e..c92beee 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -29,7 +29,7 @@ import java.util.regex.Pattern import com.yammer.metrics.core.Gauge import kafka.api.KAFKA_0_10_0_IV0 -import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef} +import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException} import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} @@ -49,11 +49,11 @@ import scala.collection.{Seq, Set, mutable} object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) } /** @@ -72,6 +72,7 @@ object LogAppendInfo { * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + * @param lastOffsetOfFirstBatch The last offset of the first batch */ case class LogAppendInfo(var firstOffset: Option[Long], var lastOffset: Long, @@ -84,12 +85,15 @@ case class LogAppendInfo(var firstOffset: Option[Long], targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, - offsetsMonotonic: Boolean) { + offsetsMonotonic: Boolean, + lastOffsetOfFirstBatch: Long) { /** - * Get the first offset if it exists, else get the last offset. - * @return The offset of first message if it exists; else offset of the last message. + * Get the first offset if it exists, else get the last offset of the first batch + * For magic versions 2 and newer, this method will return first offset. For magic versions + * older than 2, we use the last offset of the first batch as an approximation of the first + * offset to avoid decompressing the data. */ - def firstOrLastOffset: Long = firstOffset.getOrElse(lastOffset) + def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch) /** * Get the (maximum) number of messages described by LogAppendInfo @@ -736,6 +740,8 @@ class Log(@volatile var dir: File, * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader * @throws KafkaStorageException If the append fails due to an I/O error. + * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' + * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset * @return Information about the appended messages including the first and last offset. */ private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { @@ -798,9 +804,27 @@ class Log(@volatile var dir: File, } } else { // we are taking the offsets we are given - if (!appendInfo.offsetsMonotonic || appendInfo.firstOrLastOffset < nextOffsetMetadata.messageOffset) - throw new IllegalArgumentException(s"Out of order offsets found in append to $topicPartition: " + - records.records.asScala.map(_.offset)) + if (!appendInfo.offsetsMonotonic) + throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + + records.records.asScala.map(_.offset)) + + if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) { + // we may still be able to recover if the log is empty + // one example: fetching from log start offset on the leader which is not batch aligned, + // which may happen as a result of AdminClient#deleteRecords() + val firstOffset = appendInfo.firstOffset match { + case Some(offset) => offset + case None => records.batches.asScala.head.baseOffset() + } + + val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch" + throw new UnexpectedAppendOffsetException( + s"Unexpected offset in append to $topicPartition. $firstOrLast " + + s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", + firstOffset, appendInfo.lastOffset) + } } // update the epoch cache with the epoch stamped onto the message by the leader @@ -830,7 +854,7 @@ class Log(@volatile var dir: File, val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) val logOffsetMetadata = LogOffsetMetadata( - messageOffset = appendInfo.firstOrLastOffset, + messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) @@ -970,6 +994,7 @@ class Log(@volatile var dir: File, var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L var readFirstMessage = false + var lastOffsetOfFirstBatch = -1L for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients @@ -986,6 +1011,7 @@ class Log(@volatile var dir: File, if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) + lastOffsetOfFirstBatch = batch.lastOffset readFirstMessage = true } @@ -1024,7 +1050,7 @@ class Log(@volatile var dir: File, // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) + RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) } private def updateProducers(batch: RecordBatch, diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 5a505c3..e46473b 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -98,8 +98,7 @@ class ReplicaAlterLogDirsThread(name: String, throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset)) - // Append the leader's messages to the log - partition.appendRecordsToFutureReplica(records) + partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true) val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark) futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark) futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index cf8d829..80940f6 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String, .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log - partition.appendRecordsToFollower(records) + partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) if (isTraceEnabled) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 50ed7ae..d6f349c 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -813,6 +813,83 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } @Test + def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = { + val leaders = createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1 + + def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { + TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.getReplica(topicPartition) != None, + "Expected follower to create replica for partition") + + // wait until the follower discovers that log start offset moved beyond its HW + TestUtils.waitUntilTrue(() => { + servers(followerIndex).replicaManager.getReplica(topicPartition).get.logStartOffset == expectedStartOffset + }, s"Expected follower to discover new log start offset $expectedStartOffset") + + TestUtils.waitUntilTrue(() => { + servers(followerIndex).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset == expectedEndOffset + }, s"Expected follower to catch up to log end offset $expectedEndOffset") + } + + // we will produce to topic and delete records while one follower is down + killBroker(followerIndex) + + client = AdminClient.create(createConfig) + sendRecords(producers.head, 100, topicPartition) + + val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) + result.all().get() + + // start the stopped broker to verify that it will be able to fetch from new log start offset + restartDeadBrokers() + + waitForFollowerLog(expectedStartOffset=3L, expectedEndOffset=100L) + + // after the new replica caught up, all replicas should have same log start offset + for (i <- 0 until serverCount) + assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset) + + // kill the same follower again, produce more records, and delete records beyond follower's LOE + killBroker(followerIndex) + sendRecords(producers.head, 100, topicPartition) + val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava) + result1.all().get() + restartDeadBrokers() + waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L) + } + + @Test + def testAlterLogDirsAfterDeleteRecords(): Unit = { + client = AdminClient.create(createConfig) + createTopic(topic, numPartitions = 1, replicationFactor = serverCount) + val expectedLEO = 100 + sendRecords(producers.head, expectedLEO, topicPartition) + + // delete records to move log start offset + val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava) + result.all().get() + // make sure we are in the expected state after delete records + for (i <- 0 until serverCount) { + assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset) + assertEquals(expectedLEO, servers(i).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset) + } + + // we will create another dir just for one server + val futureLogDir = servers(0).config.logDirs(1) + val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId) + + // Verify that replica can be moved to the specified log directory + client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get + TestUtils.waitUntilTrue(() => { + futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent + }, "timed out waiting for replica movement") + + // once replica moved, its LSO and LEO should match other replicas + assertEquals(3, servers(0).replicaManager.getReplica(topicPartition).get.logStartOffset) + assertEquals(expectedLEO, servers(0).replicaManager.getReplica(topicPartition).get.logEndOffset.messageOffset) + } + + @Test def testOffsetsForTimesAfterDeleteRecords(): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = serverCount) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala new file mode 100644 index 0000000..fe5d578 --- /dev/null +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.cluster + +import java.io.File +import java.nio.ByteBuffer +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.common.UnexpectedAppendOffsetException +import kafka.log.{Log, LogConfig, LogManager, CleanerConfig} +import kafka.server._ +import kafka.utils.{MockTime, TestUtils, MockScheduler} +import kafka.utils.timer.MockTimer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ReplicaNotAvailableException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record._ +import org.junit.{After, Before, Test} +import org.junit.Assert._ +import org.scalatest.Assertions.assertThrows +import scala.collection.JavaConverters._ + +class PartitionTest { + + val brokerId = 101 + val topicPartition = new TopicPartition("test-topic", 0) + val time = new MockTime() + val brokerTopicStats = new BrokerTopicStats + val metrics = new Metrics + + var tmpDir: File = _ + var logDir: File = _ + var replicaManager: ReplicaManager = _ + var logManager: LogManager = _ + var logConfig: LogConfig = _ + + @Before + def setup(): Unit = { + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 512: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer) + logConfig = LogConfig(logProps) + + tmpDir = TestUtils.tempDir() + logDir = TestUtils.randomPartitionLogDir(tmpDir) + logManager = TestUtils.createLogManager( + logDirs = Seq(logDir), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) + logManager.startup() + + val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) + brokerProps.put("log.dir", logDir.getAbsolutePath) + val brokerConfig = KafkaConfig.fromProps(brokerProps) + replicaManager = new ReplicaManager( + config = brokerConfig, metrics, time, zkClient = null, new MockScheduler(time), + logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""), + brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size)) + } + + @After + def tearDown(): Unit = { + brokerTopicStats.close() + metrics.close() + + logManager.shutdown() + Utils.delete(tmpDir) + logManager.liveLogDirs.foreach(Utils.delete) + replicaManager.shutdown(checkpointHW = false) + } + + @Test + def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + partition.addReplicaIfNotExists(replica) + assertEquals(Some(replica), partition.getReplica(replica.brokerId)) + + val initialLogStartOffset = 5L + partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false) + assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:", + initialLogStartOffset, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset after truncate fully and start at $initialLogStartOffset:", + initialLogStartOffset, replica.logStartOffset) + + // verify that we cannot append records that do not contain log start offset even if the log is empty + assertThrows[UnexpectedAppendOffsetException] { + // append one record with offset = 3 + partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false) + } + assertEquals(s"Log end offset should not change after failure to append", initialLogStartOffset, replica.logEndOffset.messageOffset) + + // verify that we can append records that contain log start offset, even when first + // offset < log start offset if the log is empty + val newLogStartOffset = 4L + val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + baseOffset = newLogStartOffset) + partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) + assertEquals(s"Log end offset after append of 3 records with base offset $newLogStartOffset:", 7L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset after append of 3 records with base offset $newLogStartOffset:", newLogStartOffset, replica.logStartOffset) + + // and we can append more records after that + partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false) + assertEquals(s"Log end offset after append of 1 record at offset 7:", 8L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset) + + // but we cannot append to offset < log start if the log is not empty + assertThrows[UnexpectedAppendOffsetException] { + val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes)), + baseOffset = 3L) + partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = false) + } + assertEquals(s"Log end offset should not change after failure to append", 8L, replica.logEndOffset.messageOffset) + + // we still can append to next offset + partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false) + assertEquals(s"Log end offset after append of 1 record at offset 8:", 9L, replica.logEndOffset.messageOffset) + assertEquals(s"Log start offset not expected to change:", newLogStartOffset, replica.logStartOffset) + } + + @Test + def testGetReplica(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) + val partition = new + Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + + assertEquals(None, partition.getReplica(brokerId)) + assertThrows[ReplicaNotAvailableException] { + partition.getReplicaOrException(brokerId) + } + + partition.addReplicaIfNotExists(replica) + assertEquals(replica, partition.getReplicaOrException(brokerId)) + } + + @Test + def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = { + val partition = new Partition(topicPartition.topic, topicPartition.partition, time, replicaManager) + assertThrows[ReplicaNotAvailableException] { + partition.appendRecordsToFollowerOrFutureReplica( + createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false) + } + } + + def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder( + buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, + baseOffset, time.milliseconds, partitionLeaderEpoch) + records.foreach(builder.append) + builder.build() + } + +} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1171e5e..6c62e5e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -22,7 +22,8 @@ import java.nio.ByteBuffer import java.nio.file.{Files, Paths} import java.util.Properties -import kafka.common.KafkaException +import org.apache.kafka.common.errors._ +import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException} import kafka.log.Log.DeleteDirSuffix import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} @@ -42,6 +43,7 @@ import org.junit.{After, Before, Test} import scala.collection.Iterable import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import org.scalatest.Assertions.{assertThrows, intercept, withClue} class LogTest { var config: KafkaConfig = null @@ -1885,13 +1887,72 @@ class LogTest { assertTrue("Message payload should be null.", !head.hasValue) } - @Test(expected = classOf[IllegalArgumentException]) + @Test def testAppendWithOutOfOrderOffsetsThrowsException() { val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + + val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L) + val buffer = ByteBuffer.allocate(512) + for (offset <- appendOffsets) { + val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, + TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), + 1L, 0, 0, false, 0) + builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) + builder.close() + } + buffer.flip() + val memoryRecords = MemoryRecords.readableRecords(buffer) + + assertThrows[OffsetsOutOfOrderException] { + log.appendAsFollower(memoryRecords) + } + } + + @Test + def testAppendBelowExpectedOffsetThrowsException() { + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) - val invalidRecord = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1.toString.getBytes)) - log.appendAsFollower(invalidRecord) + + val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) + for (magic <- magicVals; compression <- compressionTypes) { + val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes)) + withClue(s"Magic=$magic, compressionType=$compression") { + assertThrows[UnexpectedAppendOffsetException] { + log.appendAsFollower(invalidRecord) + } + } + } + } + + @Test + def testAppendEmptyLogBelowLogStartOffsetThrowsException() { + createEmptyLogs(logDir, 7) + val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) + assertEquals(7L, log.logStartOffset) + assertEquals(7L, log.logEndOffset) + + val firstOffset = 4L + val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) + val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) + for (magic <- magicVals; compression <- compressionTypes) { + val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes)), + magicValue = magic, codec = compression, + baseOffset = firstOffset) + + withClue(s"Magic=$magic, compressionType=$compression") { + val exception = intercept[UnexpectedAppendOffsetException] { + log.appendAsFollower(records = batch) + } + assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset", + firstOffset, exception.firstOffset) + assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset", + firstOffset + 2, exception.lastOffset) + } + } } @Test -- To stop receiving notification emails like this one, please contact j...@apache.org.