http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b7f340f..5722a43 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -30,9 +30,10 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} import kafka.utils._ import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.ListOffsetRequest +import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest} import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.collection.{Seq, mutable} import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.{Time, Utils} @@ -40,10 +41,13 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import java.util.Map.{Entry => JEntry} +import java.lang.{Long => JLong} object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, - NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false) + NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) } /** @@ -59,9 +63,6 @@ object LogAppendInfo { * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing - * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a - * RecordBatch and keep track of metadata across Records in a RecordBatch. - * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log. */ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, @@ -72,9 +73,19 @@ case class LogAppendInfo(var firstOffset: Long, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, - offsetsMonotonic: Boolean, - producerAppendInfos: Map[Long, ProducerAppendInfo], - isDuplicate: Boolean = false) + offsetsMonotonic: Boolean) + +/** + * A class used to hold useful metadata about a completed transaction. This is used to build + * the transaction index after appending to the log. + * + * @param producerId The ID of the producer + * @param firstOffset The first offset (inclusive) of the transaction + * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the + * COMMIT/ABORT control record which indicates the transaction's completion. + * @param isAborted Whether or not the transaction was aborted + */ +case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) /** * An append-only log for storing messages. @@ -111,8 +122,7 @@ class Log(@volatile var dir: File, scheduler: Scheduler, time: Time = Time.SYSTEM, val maxPidExpirationMs: Int = 60 * 60 * 1000, - val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000, - val pidSnapshotIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup { + val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -133,8 +143,10 @@ class Log(@volatile var dir: File, @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ - /* Construct and load PID map */ - private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs) + /* The earliest offset which is part of an incomplete transaction. This is used to compute the LSO. */ + @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None + + private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs) /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] @@ -147,7 +159,7 @@ class Log(@volatile var dir: File, loadSegments() /* Calculate the offset of the next message */ - nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, + nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset, activeSegment.size.toInt) leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset) @@ -157,7 +169,7 @@ class Log(@volatile var dir: File, // The earliest leader epoch may not be flushed during a hard failure. Recover it here. leaderEpochCache.clearEarliest(logStartOffset) - buildAndRecoverPidMap(logEndOffset) + loadProducerState(logEndOffset) info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms" .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs)) @@ -189,19 +201,12 @@ class Log(@volatile var dir: File, }, tags) - scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => { + scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { lock synchronized { - pidMap.removeExpiredPids(time.milliseconds) + producerStateManager.removeExpiredProducers(time.milliseconds) } }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) - scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => { - lock synchronized { - pidMap.maybeTakeSnapshot() - } - }, period = pidSnapshotIntervalMs, unit = TimeUnit.MILLISECONDS) - - /** The name of this log */ def name = dir.getName() @@ -212,13 +217,10 @@ class Log(@volatile var dir: File, new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir))) } - /* Load the log segments from the log files on disk */ - private def loadSegments() { + private def removeTempFilesAndCollectSwapFiles(): Set[File] = { var swapFiles = Set[File]() - // first do a pass through the files in the log directory and remove any temporary files - // and find any interrupted swap operations - for(file <- dir.listFiles if file.isFile) { + for (file <- dir.listFiles if file.isFile) { if(!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName @@ -229,48 +231,51 @@ class Log(@volatile var dir: File, // we crashed in the middle of a swap operation, to recover: // if a log, delete the .index file, complete the swap operation later // if an index just delete it, it will be rebuilt - val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - if(baseName.getPath.endsWith(IndexFileSuffix)) { + val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) + if (isIndexFile(baseFile)) { Files.deleteIfExists(file.toPath) - } else if(baseName.getPath.endsWith(LogFileSuffix)){ - // delete the index - val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) - Files.deleteIfExists(index.toPath()) + } else if (isLogFile(baseFile)) { + // delete the index files + val offset = offsetFromFilename(baseFile.getName) + Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath) + Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath) + Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath) swapFiles += file } } } + swapFiles + } - // now do a second pass and load all the .log and all index files - for(file <- dir.listFiles if file.isFile) { + private def loadSegmentFiles(): Unit = { + // load segments in ascending order because transactional data from one segment may depend on the + // segments that come before it + for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { val filename = file.getName - if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) { + if (isIndexFile(file)) { // if it is an index file, make sure it has a corresponding .log file - val logFile = - if (filename.endsWith(TimeIndexFileSuffix)) - new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix)) - else - new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) - - if(!logFile.exists) { + val offset = offsetFromFilename(filename) + val logFile = logFilename(dir, offset) + if (!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) Files.deleteIfExists(file.toPath) } - } else if(filename.endsWith(LogFileSuffix)) { - // if its a log file, load the corresponding log segment + } else if (isLogFile(file)) { + // if it's a log file, load the corresponding log segment val startOffset = offsetFromFilename(filename) - val indexFile = Log.indexFilename(dir, startOffset) - val timeIndexFile = Log.timeIndexFilename(dir, startOffset) + val indexFile = Log.offsetIndexFile(dir, startOffset) + val timeIndexFile = Log.timeIndexFile(dir, startOffset) + val txnIndexFile = Log.transactionIndexFile(dir, startOffset) val indexFileExists = indexFile.exists() val timeIndexFileExists = timeIndexFile.exists() val segment = new LogSegment(dir = dir, - startOffset = startOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = true) + startOffset = startOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = true) if (indexFileExists) { try { @@ -279,25 +284,43 @@ class Log(@volatile var dir: File, if (!timeIndexFileExists) segment.timeIndex.resize(0) segment.timeIndex.sanityCheck() + segment.txnIndex.sanityCheck() } catch { case e: java.lang.IllegalArgumentException => warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " + - s"${indexFile.getAbsolutePath} and rebuilding index...") + s"${indexFile.getAbsolutePath}, and ${txnIndexFile.getAbsolutePath} and rebuilding index...") Files.deleteIfExists(timeIndexFile.toPath) Files.delete(indexFile.toPath) - segment.recover(config.maxMessageSize) + segment.txnIndex.delete() + recoverSegment(segment) } } else { - error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) - segment.recover(config.maxMessageSize) + error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) + recoverSegment(segment) } segments.put(startOffset, segment) } } + } + + private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized { + val stateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs) + stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds) + logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment => + val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset) + val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue) + if (fetchDataInfo != null) + loadProducersFromLog(stateManager, fetchDataInfo.records) + } + val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache) - // Finally, complete any interrupted swap operations. To be crash-safe, - // log files that are replaced by the swap segment should be renamed to .deleted - // before the swap file is restored as the new segment file. + // once we have recovered the segment's data, take a snapshot to ensure that we won't + // need to reload the same segment again while recovering another segment. + stateManager.takeSnapshot() + bytesTruncated + } + + private def completeSwapOperations(swapFiles: Set[File]): Unit = { for (swapFile <- swapFiles) { val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) val filename = logFile.getName @@ -306,18 +329,36 @@ class Log(@volatile var dir: File, val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix) val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) + val txnIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TxnIndexFileSuffix) + SwapFileSuffix) + val txnIndex = new TransactionIndex(startOffset, txnIndexFile) val swapSegment = new LogSegment(FileRecords.open(swapFile), - index = index, - timeIndex = timeIndex, - baseOffset = startOffset, - indexIntervalBytes = config.indexInterval, - rollJitterMs = config.randomSegmentJitter, - time = time) + index = index, + timeIndex = timeIndex, + txnIndex = txnIndex, + baseOffset = startOffset, + indexIntervalBytes = config.indexInterval, + rollJitterMs = config.randomSegmentJitter, + time = time) info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath)) - swapSegment.recover(config.maxMessageSize) - val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset) + recoverSegment(swapSegment) + val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset()) replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true) } + } + + /* Load the log segments from the log files on disk */ + private def loadSegments() { + // first do a pass through the files in the log directory and remove any temporary files + // and find any interrupted swap operations + val swapFiles = removeTempFilesAndCollectSwapFiles() + + // now do a second pass and load all the log and index files + loadSegmentFiles() + + // Finally, complete any interrupted swap operations. To be crash-safe, + // log files that are replaced by the swap segment should be renamed to .deleted + // before the swap file is restored as the new segment file. + completeSwapOperations(swapFiles) if(logSegments.isEmpty) { // no existing segments, create a new mutable segment beginning at offset 0 @@ -330,13 +371,11 @@ class Log(@volatile var dir: File, fileAlreadyExists = false, initFileSize = this.initFileSize(), preallocate = config.preallocate)) - } else { - if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { - recoverLog() - // reset the index size of the currently active log segment to allow more entries - activeSegment.index.resize(config.maxIndexSize) - activeSegment.timeIndex.resize(config.maxIndexSize) - } + } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { + recoverLog() + // reset the index size of the currently active log segment to allow more entries + activeSegment.index.resize(config.maxIndexSize) + activeSegment.timeIndex.resize(config.maxIndexSize) } } @@ -347,66 +386,72 @@ class Log(@volatile var dir: File, private def recoverLog() { // if we have the clean shutdown marker, skip recovery if(hasCleanShutdownFile) { - this.recoveryPoint = activeSegment.nextOffset + this.recoveryPoint = activeSegment.nextOffset() return } // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { - val curr = unflushed.next - info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name)) + val segment = unflushed.next + info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name)) val truncatedBytes = try { - curr.recover(config.maxMessageSize, Some(leaderEpochCache)) + recoverSegment(segment, Some(leaderEpochCache)) } catch { case _: InvalidOffsetException => - val startOffset = curr.baseOffset + val startOffset = segment.baseOffset warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " + "creating an empty one with starting offset " + startOffset) - curr.truncateTo(startOffset) + segment.truncateTo(startOffset) } if(truncatedBytes > 0) { // we had an invalid message, delete all remaining log - warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset)) + warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name, + segment.nextOffset())) unflushed.foreach(deleteSegment) } } } - /** - * Creates an instance of id map for this log and updates the mapping - * in the case it is missing some messages. Note that the id mapping - * starts from a snapshot that is taken strictly before the log end - * offset. Consequently, we need to process the tail of the log to update - * the mapping. - */ - private def buildAndRecoverPidMap(lastOffset: Long) { - lock synchronized { - info(s"Recovering PID mapping from offset $lastOffset for partition $topicPartition") - val currentTimeMs = time.milliseconds - pidMap.truncateAndReload(logStartOffset, lastOffset, currentTimeMs) - logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment => - val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset) + private def loadProducerState(lastOffset: Long): Unit = lock synchronized { + info(s"Loading producer state from offset $lastOffset for partition $topicPartition") + val currentTimeMs = time.milliseconds + producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs) + + // only do the potentially expensive reloading of the last snapshot offset is lower than the + // log end offset (which would be the case on first startup) and there are active producers. + // if there are no active producers, then truncating shouldn't change that fact (although it + // could cause a producerId to expire earlier than expected), so we can skip the loading. + // This is an optimization for users which are not yet using idempotent/transactional features yet. + if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) { + logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => + val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset) val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) - if (fetchDataInfo != null) { - fetchDataInfo.records.batches.asScala.foreach { batch => - if (batch.hasProducerId) { - val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset, - batch.lastSequence - batch.baseSequence, batch.maxTimestamp) - pidMap.load(batch.producerId, pidEntry, currentTimeMs) - } - } - } + if (fetchDataInfo != null) + loadProducersFromLog(producerStateManager, fetchDataInfo.records) } - pidMap.updateMapEndOffset(lastOffset) } + + producerStateManager.updateMapEndOffset(lastOffset) + updateFirstUnstableOffset() } - private[log] def activePids: Map[Long, ProducerIdEntry] = { - lock synchronized { - pidMap.activePids + private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { + val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] + val completedTxns = ListBuffer.empty[CompletedTxn] + records.batches.asScala.foreach { batch => + if (batch.hasProducerId) { + val lastEntry = producerStateManager.lastEntry(batch.producerId) + updateProducers(batch, loadedProducers, completedTxns, lastEntry, loadingFromLog = true) + } } + loadedProducers.values.foreach(producerStateManager.update) + completedTxns.foreach(producerStateManager.completeTxn) + } + + private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized { + producerStateManager.activeProducers } /** @@ -426,47 +471,50 @@ class Log(@volatile var dir: File, def close() { debug(s"Closing log $name") lock synchronized { + producerStateManager.takeSnapshot() logSegments.foreach(_.close()) } } /** - * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs - * @param records The records to append - * @throws KafkaStorageException If the append fails due to an I/O error. - * @return Information about the appended messages including the first and last offset. - */ - def appendAsLeader(records: MemoryRecords, leaderEpoch: Int): LogAppendInfo = { - append(records, assignOffsets = true, leaderEpoch) + * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs + * @param records The records to append + * @param isFromClient Whether or not this append is from a producer + * @throws KafkaStorageException If the append fails due to an I/O error. + * @return Information about the appended messages including the first and last offset. + */ + def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = { + append(records, isFromClient, assignOffsets = true, leaderEpoch) } /** - * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs - * @param records The records to append - * @throws KafkaStorageException If the append fails due to an I/O error. - * @return Information about the appended messages including the first and last offset. - */ + * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs + * @param records The records to append + * @throws KafkaStorageException If the append fails due to an I/O error. + * @return Information about the appended messages including the first and last offset. + */ def appendAsFollower(records: MemoryRecords): LogAppendInfo = { - append(records, assignOffsets = false, leaderEpoch = -1) + append(records, isFromClient = false, assignOffsets = false, leaderEpoch = -1) } /** - * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. - * - * This method will generally be responsible for assigning offsets to the messages, - * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. - * - * @param records The log records to append - * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given - * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader - * @throws KafkaStorageException If the append fails due to an I/O error. - * @return Information about the appended messages including the first and last offset. - */ - private def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = { - val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets) + * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. + * + * This method will generally be responsible for assigning offsets to the messages, + * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. + * + * @param records The log records to append + * @param isFromClient Whether or not this append is from a producer + * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given + * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader + * @throws KafkaStorageException If the append fails due to an I/O error. + * @return Information about the appended messages including the first and last offset. + */ + private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = { + val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // return if we have no valid messages or if this is a duplicate of the last appended entry - if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate) + if (appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log @@ -483,15 +531,16 @@ class Log(@volatile var dir: File, val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator.validateMessagesAndAssignOffsets(validRecords, - offset, - now, - appendInfo.sourceCodec, - appendInfo.targetCodec, - config.compact, - config.messageFormatVersion.messageFormatVersion, - config.messageTimestampType, - config.messageTimestampDifferenceMaxMs, - leaderEpoch) + offset, + now, + appendInfo.sourceCodec, + appendInfo.targetCodec, + config.compact, + config.messageFormatVersion.messageFormatVersion, + config.messageTimestampType, + config.messageTimestampDifferenceMaxMs, + leaderEpoch, + isFromClient) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } @@ -534,34 +583,56 @@ class Log(@volatile var dir: File, .format(validRecords.sizeInBytes, config.segmentSize)) } + // now that we have valid records, offsets assigned, and timestamps updated, we need to + // validate the idempotent/transactional state of the producers and collect some metadata + val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient) + maybeDuplicate.foreach { duplicate => + appendInfo.firstOffset = duplicate.firstOffset + appendInfo.lastOffset = duplicate.lastOffset + appendInfo.logAppendTime = duplicate.timestamp + return appendInfo + } + // maybe roll the log if this segment is full val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) - // now append to the log + val logOffsetMetadata = LogOffsetMetadata( + messageOffset = appendInfo.firstOffset, + segmentBaseOffset = segment.baseOffset, + relativePositionInSegment = segment.size) + segment.append(firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) - // update the PID sequence mapping - for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) { - trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}") + // update the producer state + for ((producerId, producerAppendInfo) <- updatedProducers) { + trace(s"Updating producer $producerId state: ${producerAppendInfo.lastEntry}") + producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) + producerStateManager.update(producerAppendInfo) + } - if (assignOffsets) - producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp) - pidMap.update(producerAppendInfo) + // update the transaction index with the true last stable offset. The last offset visible + // to consumers using READ_COMMITTED will be limited by this value and the high watermark. + for (completedTxn <- completedTxns) { + val lastStableOffset = producerStateManager.completeTxn(completedTxn) + segment.updateTxnIndex(completedTxn, lastStableOffset) } // always update the last pid map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written - pidMap.updateMapEndOffset(appendInfo.lastOffset + 1) + producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1) + // update the first unstable offset (which is used to compute LSO) + updateFirstUnstableOffset() + trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords)) @@ -575,6 +646,24 @@ class Log(@volatile var dir: File, } } + def onHighWatermarkIncremented(highWatermark: Long): Unit = { + lock synchronized { + producerStateManager.onHighWatermarkUpdated(highWatermark) + updateFirstUnstableOffset() + } + } + + private def updateFirstUnstableOffset(): Unit = lock synchronized { + this.firstUnstableOffset = producerStateManager.firstUnstableOffset match { + case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly => + val offset = logOffsetMetadata.messageOffset + val segment = segments.floorEntry(offset).getValue + val position = segment.translateOffset(offset) + Some(LogOffsetMetadata(offset, segment.baseOffset, position.position)) + case other => other + } + } + /** * Increment the log start offset if the provided offset is larger. */ @@ -589,6 +678,23 @@ class Log(@volatile var dir: File, } } + private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean): + (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[ProducerIdEntry]) = { + val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo] + val completedTxns = ListBuffer.empty[CompletedTxn] + for (batch <- records.batches.asScala if batch.hasProducerId) { + val maybeLastEntry = producerStateManager.lastEntry(batch.producerId) + + // if this is a client produce request, there will be only one batch. If that batch matches + // the last appended entry for that producer, then this request is a duplicate and we return + // the last appended entry to the client. + if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch))) + return (updatedProducers, completedTxns.toList, maybeLastEntry) + updateProducers(batch, updatedProducers, completedTxns, maybeLastEntry, loadingFromLog = false) + } + (updatedProducers, completedTxns.toList, None) + } + /** * Validate the following: * <ol> @@ -616,8 +722,6 @@ class Log(@volatile var dir: File, var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L - var isDuplicate = false - val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]() for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients @@ -660,37 +764,23 @@ class Log(@volatile var dir: File, val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec - - val pid = batch.producerId - if (pid != RecordBatch.NO_PRODUCER_ID) { - producerAppendInfos.get(pid) match { - case Some(appendInfo) => appendInfo.append(batch) - case None => - val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty) - if (isFromClient && lastEntry.isDuplicate(batch)) { - // This request is a duplicate so return the information about the existing entry. Note that for requests - // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration - // of the loop and the values below will not be updated more than once. - isDuplicate = true - firstOffset = lastEntry.firstOffset - lastOffset = lastEntry.lastOffset - maxTimestamp = lastEntry.timestamp - debug(s"Detected a duplicate for partition $topicPartition at (firstOffset, lastOffset): ($firstOffset, $lastOffset). " + - "Ignoring the incoming record.") - } else { - val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry) - producerAppendInfos.put(pid, producerAppendInfo) - producerAppendInfo.append(batch) - } - } - } } // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) - LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec, - targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate) + targetCodec, shallowMessageCount, validBytesCount, monotonic) + } + + private def updateProducers(batch: RecordBatch, + producers: mutable.Map[Long, ProducerAppendInfo], + completedTxns: ListBuffer[CompletedTxn], + lastEntry: Option[ProducerIdEntry], + loadingFromLog: Boolean): Unit = { + val pid = batch.producerId + val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog)) + val maybeCompletedTxn = appendInfo.append(batch) + maybeCompletedTxn.foreach(completedTxns += _) } /** @@ -721,11 +811,19 @@ class Log(@volatile var dir: File, * @param maxLength The maximum number of bytes to read * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) + * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional + * read semantics (e.g. consumers are limited to fetching up to the high watermark). In + * READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally, + * in READ_COMMITTED, the transaction index is consulted after fetching to collect the list + * of aborted transactions in the fetch range which the consumer uses to filter the fetched + * records before they are returned to the user. Note that fetches from followers always use + * READ_UNCOMMITTED. * * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset * @return The fetch data information including fetch starting offset metadata and messages read. */ - def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = { + def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) // Because we don't use lock for reading, the synchronization is a little bit tricky. @@ -735,38 +833,43 @@ class Log(@volatile var dir: File, if(startOffset == next) return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY) - var entry = segments.floorEntry(startOffset) + var segmentEntry = segments.floorEntry(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset - if(startOffset > next || entry == null || startOffset < logStartOffset) + if(startOffset > next || segmentEntry == null || startOffset < logStartOffset) throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next)) // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log - while(entry != null) { + while(segmentEntry != null) { + val segment = segmentEntry.getValue + // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log // end of the active segment. val maxPosition = { - if (entry == segments.lastEntry) { + if (segmentEntry == segments.lastEntry) { val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong // Check the segment again in case a new segment has just rolled out. - if (entry != segments.lastEntry) + if (segmentEntry != segments.lastEntry) // New log segment has rolled out, we can read up to the file end. - entry.getValue.size + segment.size else exposedPos } else { - entry.getValue.size + segment.size } } - val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) - if(fetchInfo == null) { - entry = segments.higherEntry(entry.getKey) + val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) + if (fetchInfo == null) { + segmentEntry = segments.higherEntry(segmentEntry.getKey) } else { - return fetchInfo + return isolationLevel match { + case IsolationLevel.READ_UNCOMMITTED => fetchInfo + case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo) + } } } @@ -776,6 +879,41 @@ class Log(@volatile var dir: File, FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } + private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment], + fetchInfo: FetchDataInfo): FetchDataInfo = { + val fetchSize = fetchInfo.records.sizeInBytes + val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, + fetchInfo.fetchOffsetMetadata.relativePositionInSegment) + val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { + val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) + if (nextSegmentEntry != null) + nextSegmentEntry.getValue.baseOffset + else + logEndOffset + } + val abortedTransactions = collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry) + FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, + records = fetchInfo.records, + firstEntryIncomplete = fetchInfo.firstEntryIncomplete, + abortedTransactions = Some(abortedTransactions)) + } + + private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, + startingSegmentEntry: JEntry[JLong, LogSegment]): List[AbortedTransaction] = { + var segmentEntry = startingSegmentEntry + val abortedTransactions = ListBuffer.empty[AbortedTransaction] + + while (segmentEntry != null) { + val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset) + abortedTransactions ++= searchResult.abortedTransactions + if (searchResult.isComplete) + return abortedTransactions.toList + + segmentEntry = segments.higherEntry(segmentEntry.getKey) + } + abortedTransactions.toList + } + /** * Get an offset based on the given timestamp * The offset returned is the offset of the first message whose timestamp is greater than or equals to the @@ -860,7 +998,8 @@ class Log(@volatile var dir: File, deletable.foreach(deleteSegment) logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) leaderEpochCache.clearEarliest(logStartOffset) - pidMap.expirePids(logStartOffset) + producerStateManager.evictUnretainedProducers(logStartOffset) + updateFirstUnstableOffset() } } numToDelete @@ -934,7 +1073,7 @@ class Log(@volatile var dir: File, def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata /** - * The offset of the next message that will be appended to the log + * The offset of the next message that will be appended to the log */ def logEndOffset: Long = nextOffsetMetadata.messageOffset @@ -990,9 +1129,10 @@ class Log(@volatile var dir: File, lock synchronized { val newOffset = math.max(expectedNextOffset, logEndOffset) val logFile = logFilename(dir, newOffset) - val indexFile = indexFilename(dir, newOffset) - val timeIndexFile = timeIndexFilename(dir, newOffset) - for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) { + val offsetIdxFile = offsetIndexFile(dir, newOffset) + val timeIdxFile = timeIndexFile(dir, newOffset) + val txnIdxFile = transactionIndexFile(dir, newOffset) + for(file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") file.delete() } @@ -1007,6 +1147,15 @@ class Log(@volatile var dir: File, seg.log.trim() } } + + // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot + // offset align with the new segment offset since this ensures we can recover the segment by beginning + // with the corresponding snapshot file and scanning the segment data. Because the segment base offset + // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), + // we manually override the state offset here prior to taking the snapshot. + producerStateManager.updateMapEndOffset(newOffset) + producerStateManager.takeSnapshot() + val segment = new LogSegment(dir, startOffset = newOffset, indexIntervalBytes = config.indexInterval, @@ -1053,6 +1202,12 @@ class Log(@volatile var dir: File, time.milliseconds + " unflushed = " + unflushedMessages) for(segment <- logSegments(this.recoveryPoint, offset)) segment.flush() + + // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain + // the snapshots from the recent segments in case we need to truncate and rebuild the producer state. + // Otherwise, we would always need to rebuild from the earliest segment. + producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset)) + lock synchronized { if(offset > this.recoveryPoint) { this.recoveryPoint = offset @@ -1061,6 +1216,17 @@ class Log(@volatile var dir: File, } } + def minSnapshotOffsetToRetain(flushedOffset: Long) = { + // always retain the producer snapshot from the last two segments. This solves the common case + // of truncating to an offset within the active segment, and the rarer case of truncating to the + // previous segment just after rolling the new segment. + var minSnapshotOffset = activeSegment.baseOffset + val previousSegment = segments.lowerEntry(activeSegment.baseOffset) + if (previousSegment != null) + minSnapshotOffset = previousSegment.getValue.baseOffset + math.min(flushedOffset, minSnapshotOffset) + } + /** * Completely delete this log directory and all contents from the file system with no delay */ @@ -1073,11 +1239,25 @@ class Log(@volatile var dir: File, } } - private[log] def maybeTakePidSnapshot(): Unit = pidMap.maybeTakeSnapshot() + // visible for testing + private[log] def takeProducerSnapshot(): Unit = lock synchronized { + producerStateManager.takeSnapshot() + } - private[log] def latestPidSnapshotOffset: Option[Long] = pidMap.latestSnapshotOffset + // visible for testing + private[log] def latestProducerSnapshotOffset: Option[Long] = lock synchronized { + producerStateManager.latestSnapshotOffset + } - private[log] def latestPidMapOffset: Long = pidMap.mapEndOffset + // visible for testing + private[log] def oldestProducerSnapshotOffset: Option[Long] = lock synchronized { + producerStateManager.oldestSnapshotOffset + } + + // visible for testing + private[log] def latestProducerStateEndOffset: Long = lock synchronized { + producerStateManager.mapEndOffset + } /** * Truncate this log so that it ends with the greatest offset < targetOffset. @@ -1103,7 +1283,7 @@ class Log(@volatile var dir: File, this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) this.logStartOffset = math.min(targetOffset, this.logStartOffset) leaderEpochCache.clearLatest(targetOffset) - buildAndRecoverPidMap(targetOffset) + loadProducerState(targetOffset) } } } @@ -1130,8 +1310,9 @@ class Log(@volatile var dir: File, updateLogEndOffset(newOffset) leaderEpochCache.clear() - pidMap.truncate() - pidMap.updateMapEndOffset(newOffset) + producerStateManager.truncate() + producerStateManager.updateMapEndOffset(newOffset) + updateFirstUnstableOffset() this.recoveryPoint = math.min(newOffset, this.recoveryPoint) this.logStartOffset = newOffset @@ -1282,6 +1463,9 @@ object Log { val PidSnapshotFileSuffix = ".snapshot" + /** an (aborted) txn index */ + val TxnIndexFileSuffix = ".txnindex" + /** a file that is scheduled to be deleted */ val DeletedFileSuffix = ".deleted" @@ -1331,7 +1515,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def indexFilename(dir: File, offset: Long) = + def offsetIndexFile(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) /** @@ -1340,7 +1524,7 @@ object Log { * @param dir The directory in which the log will reside * @param offset The base offset of the log file */ - def timeIndexFilename(dir: File, offset: Long) = + def timeIndexFile(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix) /** @@ -1349,9 +1533,12 @@ object Log { * @param dir The directory in which the log will reside * @param offset The last offset (exclusive) included in the snapshot */ - def pidSnapshotFilename(dir: File, offset: Long) = + def producerSnapshotFile(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + PidSnapshotFileSuffix) + def transactionIndexFile(dir: File, offset: Long) = + new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix) + def offsetFromFilename(filename: String): Long = filename.substring(0, filename.indexOf('.')).toLong @@ -1387,4 +1574,12 @@ object Log { new TopicPartition(topic, partition.toInt) } + private def isIndexFile(file: File): Boolean = { + val filename = file.getName + filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix) + } + + private def isLogFile(file: File): Boolean = + file.getPath.endsWith(LogFileSuffix) + }
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index d0e8ec4..282e049 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -388,12 +388,18 @@ private[log] class Cleaner(val id: Int, logFile.delete() val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix) val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix) + val txnIndexFile = new File(segments.head.txnIndex.file.getPath + Log.CleanedFileSuffix) indexFile.delete() timeIndexFile.delete() + txnIndexFile.delete() + + val startOffset = segments.head.baseOffset val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate) - val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) - val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize) - val cleaned = new LogSegment(records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) + val index = new OffsetIndex(indexFile, startOffset, segments.head.index.maxIndexSize) + val timeIndex = new TimeIndex(timeIndexFile, startOffset, segments.head.timeIndex.maxIndexSize) + val txnIndex = new TransactionIndex(startOffset, txnIndexFile) + val cleaned = new LogSegment(records, index, timeIndex, txnIndex, startOffset, + segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) try { // clean segments into the new destination segment @@ -451,7 +457,8 @@ private[log] class Cleaner(val id: Int, activePids: Map[Long, ProducerIdEntry], stats: CleanerStats) { val logCleanerFilter = new RecordFilter { - def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId) + def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = + shouldRetainMessage(source, map, retainDeletes, recordBatch, record, stats, activePids) } var position = 0 @@ -492,17 +499,20 @@ private[log] class Cleaner(val id: Int, private def shouldRetainMessage(source: kafka.log.LogSegment, map: kafka.log.OffsetMap, retainDeletes: Boolean, + batch: RecordBatch, record: Record, stats: CleanerStats, - activePids: Map[Long, ProducerIdEntry], - pid: Long): Boolean = { - if (record.isControlRecord) + activeProducers: Map[Long, ProducerIdEntry]): Boolean = { + if (batch.isControlBatch) return true // retain the record if it is the last one produced by an active idempotent producer to ensure that - // the PID is not removed from the log before it has been expired - if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset)) - return true + // the producerId is not removed from the log before it has been expired + if (batch.hasProducerId) { + val producerId = batch.producerId + if (RecordBatch.NO_PRODUCER_ID < producerId && activeProducers.get(producerId).exists(_.lastOffset == record.offset)) + return true + } val pastLatestOffset = record.offset > map.latestOffset if (pastLatestOffset) @@ -638,8 +648,8 @@ private[log] class Cleaner(val id: Int, throttler.maybeThrottle(records.sizeInBytes) val startPosition = position - for (record <- records.records.asScala) { - if (!record.isControlRecord && record.hasKey && record.offset >= start) { + for (batch <- records.batches.asScala; record <- batch.asScala) { + if (!batch.isControlBatch && record.hasKey && record.offset >= start) { if (map.size < maxDesiredMapSize) map.put(record.key, record.offset) else http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index b89fc40..c621680 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -189,7 +189,7 @@ class LogManager(val logDirs: Array[File], } } - jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq + jobs(cleanShutdownFile) = jobsForDir.map(pool.submit) } @@ -282,7 +282,6 @@ class LogManager(val logDirs: Array[File], jobs(dir) = jobsForDir.map(pool.submit).toSeq } - try { for ((dir, dirJobs) <- jobs) { dirJobs.foreach(_.get) @@ -312,7 +311,6 @@ class LogManager(val logDirs: Array[File], info("Shutdown complete.") } - /** * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset * @@ -454,7 +452,7 @@ class LogManager(val logDirs: Array[File], case e: Throwable => error(s"Exception in kafka-delete-logs thread.", e) } -} + } /** * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index df3c372..d76b47a 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -27,14 +27,14 @@ import kafka.server.epoch.LeaderEpochCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException -import org.apache.kafka.common.record.FileRecords.LogEntryPosition +import org.apache.kafka.common.record.FileRecords.LogOffsetPosition import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.math._ - /** +/** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in @@ -53,6 +53,7 @@ import scala.math._ class LogSegment(val log: FileRecords, val index: OffsetIndex, val timeIndex: TimeIndex, + val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, @@ -67,45 +68,49 @@ class LogSegment(val log: FileRecords, private var rollingBasedTimestamp: Option[Long] = None /* The maximum timestamp we see so far */ - @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp - @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset + @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp + @volatile private var offsetOfMaxTimestamp: Long = timeIndex.lastEntry.offset - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, + fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate), - new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), - new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), + new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)), startOffset, indexIntervalBytes, rollJitterMs, time) /* Return the size in bytes of this log segment */ - def size: Long = log.sizeInBytes() + def size: Int = log.sizeInBytes() - /** - * checks that the argument offset can be represented as an integer offset relative to the baseOffset. - */ - def canConvertToRelativeOffset(offset: Long): Boolean = { - (offset - baseOffset) <= Integer.MAX_VALUE - } + /** + * checks that the argument offset can be represented as an integer offset relative to the baseOffset. + */ + def canConvertToRelativeOffset(offset: Long): Boolean = { + (offset - baseOffset) <= Integer.MAX_VALUE + } - /** + /** * Append the given messages starting with the given offset. Add * an entry to the index if needed. * * It is assumed this method is being called from within a lock. * * @param firstOffset The first offset in the message set. + * @param largestOffset The last offset in the message set * @param largestTimestamp The largest timestamp in the message set. * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. * @param records The log entries to append. + * @return the physical position in the file of the appended records */ @nonthreadsafe def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, - records: MemoryRecords) { + records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) @@ -131,6 +136,28 @@ class LogSegment(val log: FileRecords, } } + @nonthreadsafe + def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long) { + if (completedTxn.isAborted) { + trace(s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset") + txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset)) + } + } + + private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = { + if (batch.hasProducerId) { + val producerId = batch.producerId + val lastEntry = producerStateManager.lastEntry(producerId) + val appendInfo = new ProducerAppendInfo(batch.producerId, lastEntry, loadingFromLog = true) + val maybeCompletedTxn = appendInfo.append(batch) + producerStateManager.update(appendInfo) + maybeCompletedTxn.foreach { completedTxn => + val lastStableOffset = producerStateManager.completeTxn(completedTxn) + updateTxnIndex(completedTxn, lastStableOffset) + } + } + } + /** * Find the physical file position for the first message with offset >= the requested offset. * @@ -144,7 +171,7 @@ class LogSegment(val log: FileRecords, * message or null if no message meets this criteria. */ @threadsafe - private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = { + private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = { val mapping = index.lookup(offset) log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) } @@ -175,7 +202,7 @@ class LogSegment(val log: FileRecords, if (startOffsetAndSize == null) return null - val startPosition = startOffsetAndSize.position.toInt + val startPosition = startOffsetAndSize.position val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition) val adjustedMaxSize = @@ -187,7 +214,7 @@ class LogSegment(val log: FileRecords, return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) // calculate the length of the message set to read based on whether or not they gave us a maxOffset - val length = maxOffset match { + val fetchSize: Int = maxOffset match { case None => // no max offset, just read until the max position min((maxPosition - startPosition).toInt, adjustedMaxSize) @@ -207,24 +234,32 @@ class LogSegment(val log: FileRecords, min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt } - FetchDataInfo(offsetMetadata, log.read(startPosition, length), + FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) } + def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] = + index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset) + /** * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index. * * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this * is corrupt. - * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. + * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover + * the transaction index. + * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery. * @return The number of bytes truncated from the log */ @nonthreadsafe - def recover(maxMessageSize: Int, leaderEpochCache: Option[LeaderEpochCache] = None): Int = { + def recover(maxMessageSize: Int, + producerStateManager: ProducerStateManager, + leaderEpochCache: Option[LeaderEpochCache] = None): Int = { index.truncate() index.resize(index.maxIndexSize) timeIndex.truncate() timeIndex.resize(timeIndex.maxIndexSize) + txnIndex.truncate() var validBytes = 0 var lastIndexEntry = 0 maxTimestampSoFar = RecordBatch.NO_TIMESTAMP @@ -250,8 +285,9 @@ class LogSegment(val log: FileRecords, if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign() - cache.assign(batch.partitionLeaderEpoch, batch.baseOffset()) + cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } + updateProducerState(producerStateManager, batch) } } } catch { @@ -268,22 +304,23 @@ class LogSegment(val log: FileRecords, truncated } - def loadLargestTimestamp(readToLogEnd: Boolean = false) { + private def loadLargestTimestamp() { // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset) val lastTimeIndexEntry = timeIndex.lastEntry maxTimestampSoFar = lastTimeIndexEntry.timestamp offsetOfMaxTimestamp = lastTimeIndexEntry.offset - if (readToLogEnd) { - val offsetPosition = index.lookup(lastTimeIndexEntry.offset) - // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry. - val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position) - if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) { - maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp - offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset - } + + val offsetPosition = index.lookup(lastTimeIndexEntry.offset) + // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry. + val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position) + if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) { + maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp + offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset } } + def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = + txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset) override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")" @@ -301,6 +338,7 @@ class LogSegment(val log: FileRecords, return 0 index.truncateTo(offset) timeIndex.truncateTo(offset) + txnIndex.truncateTo(offset) // after truncation, reset and allocate more space for the (new currently active) index index.resize(index.maxIndexSize) timeIndex.resize(timeIndex.maxIndexSize) @@ -310,9 +348,8 @@ class LogSegment(val log: FileRecords, rollingBasedTimestamp = None } bytesSinceLastIndexEntry = 0 - // We may need to reload the max timestamp after truncation. if (maxTimestampSoFar >= 0) - loadLargestTimestamp(readToLogEnd = true) + loadLargestTimestamp() bytesTruncated } @@ -323,14 +360,12 @@ class LogSegment(val log: FileRecords, @threadsafe def nextOffset(): Long = { val ms = read(index.lastOffset, None, log.sizeInBytes) - if (ms == null) { + if (ms == null) baseOffset - } else { - ms.records.batches.asScala.lastOption match { - case None => baseOffset - case Some(last) => last.nextOffset - } - } + else + ms.records.batches.asScala.lastOption + .map(_.nextOffset) + .getOrElse(baseOffset) } /** @@ -342,6 +377,7 @@ class LogSegment(val log: FileRecords, log.flush() index.flush() timeIndex.flush() + txnIndex.flush() } } @@ -365,6 +401,10 @@ class LogSegment(val log: FileRecords, catch { case e: IOException => throw kafkaStorageException("timeindex", e) } + try txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) + catch { + case e: IOException => throw kafkaStorageException("txnindex", e) + } } /** @@ -437,6 +477,7 @@ class LogSegment(val log: FileRecords, CoreUtils.swallow(index.close()) CoreUtils.swallow(timeIndex.close()) CoreUtils.swallow(log.close()) + CoreUtils.swallow(txnIndex.close()) } /** @@ -448,12 +489,15 @@ class LogSegment(val log: FileRecords, val deletedLog = log.delete() val deletedIndex = index.delete() val deletedTimeIndex = timeIndex.delete() - if(!deletedLog && log.file.exists) + val deletedTxnIndex = txnIndex.delete() + if (!deletedLog && log.file.exists) throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") - if(!deletedIndex && index.file.exists) + if (!deletedIndex && index.file.exists) throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") - if(!deletedTimeIndex && timeIndex.file.exists) + if (!deletedTimeIndex && timeIndex.file.exists) throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.") + if (!deletedTxnIndex && txnIndex.file.exists) + throw new KafkaStorageException("Delete of transaction index " + txnIndex.file.getName + " failed.") } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index c1777d5..33257fd 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -52,24 +52,32 @@ private[kafka] object LogValidator extends Logging { magic: Byte, timestampType: TimestampType, timestampDiffMaxMs: Long, - partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = { + partitionLeaderEpoch: Int, + isFromClient: Boolean): ValidationAndOffsetAssignResult = { if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // check the magic value if (!records.hasMatchingMagic(magic)) convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType, - timestampDiffMaxMs, magic, partitionLeaderEpoch) + timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient) else // Do in-place validation, offset assignment and maybe set timestamp assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, - partitionLeaderEpoch) + partitionLeaderEpoch, isFromClient) } else { validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, - magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch) + magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient) } } - private def validateBatch(batch: RecordBatch): Unit = { - ensureNonTransactional(batch) + private def validateBatch(batch: RecordBatch, isFromClient: Boolean): Unit = { + if (isFromClient) { + if (batch.hasProducerId && batch.baseSequence < 0) + throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " + + s"with producerId ${batch.producerId}") + + if (batch.isControlBatch) + throw new InvalidRecordException("Clients are not allowed to write control records") + } } private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType, @@ -84,7 +92,6 @@ private[kafka] object LogValidator extends Logging { if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) record.ensureValid() - ensureNotControlRecord(record) validateKey(record, compactedTopic) validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs) } @@ -96,21 +103,22 @@ private[kafka] object LogValidator extends Logging { timestampType: TimestampType, timestampDiffMaxMs: Long, toMagicValue: Byte, - partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = { + partitionLeaderEpoch: Int, + isFromClient: Boolean): ValidationAndOffsetAssignResult = { val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value, CompressionType.NONE, records.records) - val (pid, epoch, sequence) = { + val (producerId, producerEpoch, sequence, isTransactional) = { val first = records.batches.asScala.head - (first.producerId, first.producerEpoch, first.baseSequence) + (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional) } val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType, - offsetCounter.value, now, pid, epoch, sequence, false, partitionLeaderEpoch) + offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) for (batch <- records.batches.asScala) { - validateBatch(batch) + validateBatch(batch, isFromClient) for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) @@ -133,21 +141,21 @@ private[kafka] object LogValidator extends Logging { compactedTopic: Boolean, timestampType: TimestampType, timestampDiffMaxMs: Long, - partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = { + partitionLeaderEpoch: Int, + isFromClient: Boolean): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value var isMagicV2 = false for (batch <- records.batches.asScala) { - validateBatch(batch) + validateBatch(batch, isFromClient) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) - val offset = offsetCounter.getAndIncrement() if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp @@ -206,7 +214,8 @@ private[kafka] object LogValidator extends Logging { magic: Byte, timestampType: TimestampType, timestampDiffMaxMs: Long, - partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = { + partitionLeaderEpoch: Int, + isFromClient: Boolean): ValidationAndOffsetAssignResult = { // No in place assignment situation 1 and 2 var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0 @@ -216,14 +225,17 @@ private[kafka] object LogValidator extends Logging { val validatedRecords = new mutable.ArrayBuffer[Record] for (batch <- records.batches.asScala) { - validateBatch(batch) + validateBatch(batch, isFromClient) + + // Do not compress control records unless they are written compressed + if (sourceCodec == NoCompressionCodec && batch.isControlBatch) + inPlaceAssignment = true for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + s"compression attribute set: $record") - if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0) { // Check if we need to overwrite offset // No in place assignment situation 3 @@ -242,15 +254,15 @@ private[kafka] object LogValidator extends Logging { } if (!inPlaceAssignment) { - val (pid, epoch, sequence) = { + val (producerId, producerEpoch, sequence, isTransactional) = { // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2, // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records // with older magic versions, there will never be a producer id, etc. val first = records.batches.asScala.head - (first.producerId, first.producerEpoch, first.baseSequence) + (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional) } buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now, - validatedRecords, pid, epoch, sequence, partitionLeaderEpoch) + validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) } else { // we can update the batch only and write the compressed payload as is val batch = records.batches.iterator.next() @@ -274,14 +286,22 @@ private[kafka] object LogValidator extends Logging { } } - private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType, - compressionType: CompressionType, logAppendTime: Long, + private def buildRecordsAndAssignOffsets(magic: Byte, + offsetCounter: LongRef, + timestampType: TimestampType, + compressionType: CompressionType, + logAppendTime: Long, validatedRecords: Seq[Record], - producerId: Long, epoch: Short, baseSequence: Int, partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = { - val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava) + producerId: Long, + producerEpoch: Short, + baseSequence: Int, + isTransactional: Boolean, + partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = { + val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, + validatedRecords.asJava) val buffer = ByteBuffer.allocate(estimatedSize) val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, - logAppendTime, producerId, epoch, baseSequence, false, partitionLeaderEpoch) + logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) validatedRecords.foreach { record => builder.appendWithOffset(offsetCounter.getAndIncrement(), record) @@ -297,17 +317,6 @@ private[kafka] object LogValidator extends Logging { messageSizeMaybeChanged = true) } - private def ensureNonTransactional(batch: RecordBatch) { - if (batch.isTransactional) - throw new InvalidRecordException("Transactional messages are not currently supported") - } - - private def ensureNotControlRecord(record: Record) { - // Until we have implemented transaction support, we do not permit control records to be written - if (record.isControlRecord) - throw new InvalidRecordException("Control messages are not currently supported") - } - private def validateKey(record: Record, compactedTopic: Boolean) { if (compactedTopic && !record.hasKey) throw new InvalidRecordException("Compacted topic cannot accept message without key.") http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index a54579f..e4939e8 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -85,7 +85,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable def lookup(targetOffset: Long): OffsetPosition = { maybeLock(lock) { val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY) + val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if(slot == -1) OffsetPosition(baseOffset, 0) else @@ -93,6 +93,22 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable } } + /** + * Find an upper bound offset for the given fetch starting position and size. This is an offset which + * is guaranteed to be outside the fetched range, but note that it will not generally be the smallest + * such offset. + */ + def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = { + maybeLock(lock) { + val idx = mmap.duplicate + val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE) + if (slot == -1) + None + else + Some(parseEntry(idx, slot).asInstanceOf[OffsetPosition]) + } + } + private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize) private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4) @@ -140,7 +156,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable override def truncateTo(offset: Long) { inLock(lock) { val idx = mmap.duplicate - val slot = indexSlotFor(idx, offset, IndexSearchType.KEY) + val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY) /* There are 3 cases for choosing the new size * 1) if there is no entry in the index <= the offset, delete everything
