http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/ProducerIdMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala deleted file mode 100644 index bcadce5..0000000 --- a/core/src/main/scala/kafka/log/ProducerIdMapping.scala +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log - -import java.io._ -import java.nio.ByteBuffer -import java.nio.file.Files - -import kafka.common.KafkaException -import kafka.utils.{Logging, nonthreadsafe} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException} -import org.apache.kafka.common.protocol.types._ -import org.apache.kafka.common.record.RecordBatch -import org.apache.kafka.common.utils.{ByteUtils, Crc32C} - -import scala.collection.{immutable, mutable} - -private[log] object ProducerIdEntry { - val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - -1, 0, RecordBatch.NO_TIMESTAMP) -} - -private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long) { - def firstSeq: Int = lastSeq - offsetDelta - def firstOffset: Long = lastOffset - offsetDelta - - def isDuplicate(batch: RecordBatch): Boolean = { - batch.producerEpoch == epoch && - batch.baseSequence == firstSeq && - batch.lastSequence == lastSeq - } -} - -private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) { - // the initialEntry here is the last successful appended batch. we validate incoming entries transitively, starting - // with the last appended entry. - private var epoch = initialEntry.epoch - private var firstSeq = initialEntry.firstSeq - private var lastSeq = initialEntry.lastSeq - private var lastOffset = initialEntry.lastOffset - private var maxTimestamp = initialEntry.timestamp - - private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = { - if (this.epoch > epoch) { - throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer with a newer epoch. $epoch (request epoch), ${this.epoch} (server epoch)") - } else if (this.epoch == RecordBatch.NO_PRODUCER_EPOCH || this.epoch < epoch) { - if (firstSeq != 0) - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " + - s"(request epoch), $firstSeq (seq. number)") - } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) { - throw new DuplicateSequenceNumberException(s"Duplicate sequence number: $pid (pid), $firstSeq " + - s"(seq. number), ${this.firstSeq} (expected seq. number)") - } else if (firstSeq != this.lastSeq + 1L) { - throw new OutOfOrderSequenceException(s"Invalid sequence number: $pid (pid), $firstSeq " + - s"(seq. number), ${this.lastSeq} (expected seq. number)") - } - } - - def assignLastOffsetAndTimestamp(lastOffset: Long, lastTimestamp: Long): Unit = { - this.lastOffset = lastOffset - this.maxTimestamp = lastTimestamp - } - - private def append(epoch: Short, firstSeq: Int, lastSeq: Int, lastTimestamp: Long, lastOffset: Long) { - validateAppend(epoch, firstSeq, lastSeq) - this.epoch = epoch - this.firstSeq = firstSeq - this.lastSeq = lastSeq - this.maxTimestamp = lastTimestamp - this.lastOffset = lastOffset - } - - def append(batch: RecordBatch): Unit = - append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset) - - def append(entry: ProducerIdEntry): Unit = - append(entry.epoch, entry.firstSeq, entry.lastSeq, entry.timestamp, entry.lastOffset) - - def lastEntry: ProducerIdEntry = - ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp) -} - -class CorruptSnapshotException(msg: String) extends KafkaException(msg) - -object ProducerIdMapping { - private val PidSnapshotVersion: Short = 1 - private val VersionField = "version" - private val CrcField = "crc" - private val PidField = "pid" - private val LastSequenceField = "last_sequence" - private val EpochField = "epoch" - private val LastOffsetField = "last_offset" - private val OffsetDeltaField = "offset_delta" - private val TimestampField = "timestamp" - private val PidEntriesField = "pid_entries" - - private val VersionOffset = 0 - private val CrcOffset = VersionOffset + 2 - private val PidEntriesOffset = CrcOffset + 4 - - private val maxPidSnapshotsToRetain = 2 - - val PidSnapshotEntrySchema = new Schema( - new Field(PidField, Type.INT64, "The producer ID"), - new Field(EpochField, Type.INT16, "Current epoch of the producer"), - new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"), - new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"), - new Field(OffsetDeltaField, Type.INT32, "The difference of the last sequence and first sequence in the last written batch"), - new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry")) - val PidSnapshotMapSchema = new Schema( - new Field(VersionField, Type.INT16, "Version of the snapshot file"), - new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"), - new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table")) - - def readSnapshot(file: File): Iterable[(Long, ProducerIdEntry)] = { - val buffer = Files.readAllBytes(file.toPath) - val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) - - val version = struct.getShort(VersionField) - if (version != PidSnapshotVersion) - throw new IllegalArgumentException(s"Unhandled snapshot file version $version") - - val crc = struct.getUnsignedInt(CrcField) - val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset) - if (crc != computedCrc) - throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). Stored crc: $crc. Computed crc: $computedCrc") - - struct.getArray(PidEntriesField).map { pidEntryObj => - val pidEntryStruct = pidEntryObj.asInstanceOf[Struct] - val pid: Long = pidEntryStruct.getLong(PidField) - val epoch = pidEntryStruct.getShort(EpochField) - val seq = pidEntryStruct.getInt(LastSequenceField) - val offset = pidEntryStruct.getLong(LastOffsetField) - val timestamp = pidEntryStruct.getLong(TimestampField) - val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField) - val newEntry = ProducerIdEntry(epoch, seq, offset, offsetDelta, timestamp) - pid -> newEntry - } - } - - private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) { - val struct = new Struct(PidSnapshotMapSchema) - struct.set(VersionField, PidSnapshotVersion) - struct.set(CrcField, 0L) // we'll fill this after writing the entries - val entriesArray = entries.map { - case (pid, entry) => - val pidEntryStruct = struct.instance(PidEntriesField) - pidEntryStruct.set(PidField, pid) - .set(EpochField, entry.epoch) - .set(LastSequenceField, entry.lastSeq) - .set(LastOffsetField, entry.lastOffset) - .set(OffsetDeltaField, entry.offsetDelta) - .set(TimestampField, entry.timestamp) - pidEntryStruct - }.toArray - struct.set(PidEntriesField, entriesArray) - - val buffer = ByteBuffer.allocate(struct.sizeOf) - struct.writeTo(buffer) - buffer.flip() - - // now fill in the CRC - val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset) - ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc) - - val fos = new FileOutputStream(file) - try { - fos.write(buffer.array, buffer.arrayOffset, buffer.limit) - } finally { - fos.close() - } - } - - private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix) - -} - -/** - * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g. - * epoch, sequence number, last offset, etc.) - * - * The sequence number is the last number successfully appended to the partition for the given identifier. - * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message - * appended to the partition. - * - * As long as a PID is contained in the map, the corresponding producer can continue to write data. - * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from - * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure - * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to - * age. This ensures that PIDs will not be expired until either the max expiration time has been reached, - * or if the topic also is configured for deletion, the segment containing the last written offset has - * been deleted. - */ -@nonthreadsafe -class ProducerIdMapping(val config: LogConfig, - val topicPartition: TopicPartition, - val logDir: File, - val maxPidExpirationMs: Int) extends Logging { - import ProducerIdMapping._ - - private val pidMap = mutable.Map[Long, ProducerIdEntry]() - private var lastMapOffset = 0L - private var lastSnapOffset = 0L - - /** - * Returns the last offset of this map - */ - def mapEndOffset = lastMapOffset - - /** - * Get a copy of the active producers - */ - def activePids: immutable.Map[Long, ProducerIdEntry] = pidMap.toMap - - private def loadFromSnapshot(logStartOffset: Long, currentTime: Long) { - pidMap.clear() - - while (true) { - latestSnapshotFile match { - case Some(file) => - try { - info(s"Loading PID mapping from snapshot file ${file.getName} for partition $topicPartition") - readSnapshot(file).foreach { case (pid, entry) => - if (!isExpired(currentTime, entry)) - pidMap.put(pid, entry) - } - - lastSnapOffset = Log.offsetFromFilename(file.getName) - lastMapOffset = lastSnapOffset - return - } catch { - case e: CorruptSnapshotException => - error(s"Snapshot file at ${file.getPath} is corrupt: ${e.getMessage}") - Files.deleteIfExists(file.toPath) - } - case None => - lastSnapOffset = logStartOffset - lastMapOffset = logStartOffset - return - } - } - } - - private def isExpired(currentTimeMs: Long, producerIdEntry: ProducerIdEntry) : Boolean = - currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs - - - def removeExpiredPids(currentTimeMs: Long) { - pidMap.retain { case (pid, lastEntry) => - !isExpired(currentTimeMs, lastEntry) - } - } - - /** - * Truncate the PID mapping to the given offset range and reload the entries from the most recent - * snapshot in range (if there is one). - */ - def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { - if (logEndOffset != mapEndOffset) { - deleteSnapshotFiles { file => - val offset = Log.offsetFromFilename(file.getName) - offset > logEndOffset || offset <= logStartOffset - } - loadFromSnapshot(logStartOffset, currentTimeMs) - } else { - expirePids(logStartOffset) - } - } - - /** - * Update the mapping with the given append information - */ - def update(appendInfo: ProducerAppendInfo): Unit = { - if (appendInfo.pid == RecordBatch.NO_PRODUCER_ID) - throw new IllegalArgumentException("Invalid PID passed to update") - val entry = appendInfo.lastEntry - pidMap.put(appendInfo.pid, entry) - } - - def updateMapEndOffset(lastOffset: Long): Unit = { - lastMapOffset = lastOffset - } - - /** - * Load a previously stored PID entry into the cache. Ignore the entry if the timestamp is older - * than the current time minus the PID expiration time (i.e. if the PID has expired). - */ - def load(pid: Long, entry: ProducerIdEntry, currentTimeMs: Long) { - if (pid != RecordBatch.NO_PRODUCER_ID && !isExpired(currentTimeMs, entry)) - pidMap.put(pid, entry) - } - - /** - * Get the last written entry for the given PID. - */ - def lastEntry(pid: Long): Option[ProducerIdEntry] = pidMap.get(pid) - - /** - * Write a new snapshot if there have been updates since the last one. - */ - def maybeTakeSnapshot() { - // If not a new offset, then it is not worth taking another snapshot - if (lastMapOffset > lastSnapOffset) { - val snapshotFile = Log.pidSnapshotFilename(logDir, lastMapOffset) - debug(s"Writing producer snapshot for partition $topicPartition at offset $lastMapOffset") - writeSnapshot(snapshotFile, pidMap) - - // Update the last snap offset according to the serialized map - lastSnapOffset = lastMapOffset - - maybeRemoveOldestSnapshot() - } - } - - /** - * Get the last offset (exclusive) of the latest snapshot file. - */ - def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => Log.offsetFromFilename(file.getName)) - - /** - * When we remove the head of the log due to retention, we need to clean up the id map. This method takes - * the new start offset and expires all pids which have a smaller last written offset. - */ - def expirePids(logStartOffset: Long) { - pidMap.retain((pid, entry) => entry.lastOffset >= logStartOffset) - deleteSnapshotFiles(file => Log.offsetFromFilename(file.getName) <= logStartOffset) - if (lastMapOffset < logStartOffset) - lastMapOffset = logStartOffset - lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) - } - - /** - * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping. - */ - def truncate() { - pidMap.clear() - deleteSnapshotFiles() - lastSnapOffset = 0L - lastMapOffset = 0L - } - - private def maybeRemoveOldestSnapshot() { - val list = listSnapshotFiles - if (list.size > maxPidSnapshotsToRetain) { - val toDelete = list.minBy(file => Log.offsetFromFilename(file.getName)) - Files.deleteIfExists(toDelete.toPath) - } - } - - private def listSnapshotFiles: List[File] = { - if (logDir.exists && logDir.isDirectory) - logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList - else - List.empty[File] - } - - private def latestSnapshotFile: Option[File] = { - val files = listSnapshotFiles - if (files.nonEmpty) - Some(files.maxBy(file => Log.offsetFromFilename(file.getName))) - else - None - } - - private def deleteSnapshotFiles(predicate: File => Boolean = _ => true) { - listSnapshotFiles.filter(predicate).foreach(file => Files.deleteIfExists(file.toPath)) - } - -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala new file mode 100644 index 0000000..b1a43d2 --- /dev/null +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io._ +import java.nio.ByteBuffer +import java.nio.file.Files + +import kafka.common.KafkaException +import kafka.log.Log.offsetFromFilename +import kafka.server.LogOffsetMetadata +import kafka.utils.{Logging, nonthreadsafe, threadsafe} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors._ +import org.apache.kafka.common.protocol.types._ +import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch} +import org.apache.kafka.common.utils.{ByteUtils, Crc32C} + +import scala.collection.mutable.ListBuffer +import scala.collection.{immutable, mutable} + +class CorruptSnapshotException(msg: String) extends KafkaException(msg) + +private[log] case class TxnMetadata(producerId: Long, var firstOffset: LogOffsetMetadata, var lastOffset: Option[Long] = None) { + def this(producerId: Long, firstOffset: Long) = this(producerId, LogOffsetMetadata(firstOffset)) +} + +private[log] object ProducerIdEntry { + val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + -1, 0, RecordBatch.NO_TIMESTAMP, -1, None) +} + +private[log] case class ProducerIdEntry(producerId: Long, producerEpoch: Short, lastSeq: Int, lastOffset: Long, + offsetDelta: Int, timestamp: Long, coordinatorEpoch: Int, + currentTxnFirstOffset: Option[Long]) { + def firstSeq: Int = lastSeq - offsetDelta + def firstOffset: Long = lastOffset - offsetDelta + + def isDuplicate(batch: RecordBatch): Boolean = { + batch.producerEpoch == producerEpoch && + batch.baseSequence == firstSeq && + batch.lastSequence == lastSeq + } +} + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: ProducerIdEntry, loadingFromLog: Boolean = false) { + private var producerEpoch = initialEntry.producerEpoch + private var firstSeq = initialEntry.firstSeq + private var lastSeq = initialEntry.lastSeq + private var lastOffset = initialEntry.lastOffset + private var maxTimestamp = initialEntry.timestamp + private var currentTxnFirstOffset = initialEntry.currentTxnFirstOffset + private var coordinatorEpoch = initialEntry.coordinatorEpoch + private val transactions = ListBuffer.empty[TxnMetadata] + + def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) = + this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog) + + private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = { + if (this.producerEpoch > epoch) { + throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " + + s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} (server epoch)") + } else if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < epoch) { + if (firstSeq != 0) + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " + + s"(request epoch), $firstSeq (seq. number)") + } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) { + // the epoch was bumped by a control record, so we expect the sequence number to be reset + throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), found $firstSeq " + + s"(incoming seq. number), but expected 0") + } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) { + throw new DuplicateSequenceNumberException(s"Duplicate sequence number: pid: $producerId, (incomingBatch.firstSeq, " + + s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " + + s"(${this.firstSeq}, ${this.lastSeq}).") + } else if (firstSeq != this.lastSeq + 1L) { + throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), $firstSeq " + + s"(incoming seq. number), ${this.lastSeq} (current end sequence number)") + } + } + + def append(batch: RecordBatch): Option[CompletedTxn] = { + if (batch.isControlBatch) { + val record = batch.iterator.next() + val endTxnMarker = EndTransactionMarker.deserialize(record) + val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset, record.timestamp) + Some(completedTxn) + } else { + append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset, + batch.isTransactional) + None + } + } + + def append(epoch: Short, + firstSeq: Int, + lastSeq: Int, + lastTimestamp: Long, + lastOffset: Long, + isTransactional: Boolean): Unit = { + if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog) + // skip validation if this is the first entry when loading from the log. Log retention + // will generally have removed the beginning entries from each PID + validateAppend(epoch, firstSeq, lastSeq) + + this.producerEpoch = epoch + this.firstSeq = firstSeq + this.lastSeq = lastSeq + this.maxTimestamp = lastTimestamp + this.lastOffset = lastOffset + + if (currentTxnFirstOffset.isDefined && !isTransactional) + throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId") + + if (isTransactional && currentTxnFirstOffset.isEmpty) { + val firstOffset = lastOffset - (lastSeq - firstSeq) + currentTxnFirstOffset = Some(firstOffset) + transactions += new TxnMetadata(producerId, firstOffset) + } + } + + def appendEndTxnMarker(endTxnMarker: EndTransactionMarker, + producerEpoch: Short, + offset: Long, + timestamp: Long): CompletedTxn = { + if (this.producerEpoch > producerEpoch) + throw new ProducerFencedException(s"Invalid producer epoch: $producerEpoch (zombie): ${this.producerEpoch} (current)") + + if (this.coordinatorEpoch > endTxnMarker.coordinatorEpoch) + throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " + + s"(zombie), $coordinatorEpoch (current)") + + if (producerEpoch > this.producerEpoch) { + // it is possible that this control record is the first record seen from a new epoch (the producer + // may fail before sending to the partition or the request itself could fail for some reason). In this + // case, we bump the epoch and reset the sequence numbers + this.producerEpoch = producerEpoch + this.firstSeq = RecordBatch.NO_SEQUENCE + this.lastSeq = RecordBatch.NO_SEQUENCE + } else { + // the control record is the last append to the log, so the last offset will be updated to point to it. + // However, the sequence numbers still point to the previous batch, so the duplicate check would no longer + // be correct: it would return the wrong offset. To fix this, we treat the control record as a batch + // of size 1 which uses the last appended sequence number. + this.firstSeq = this.lastSeq + } + + val firstOffset = currentTxnFirstOffset match { + case Some(firstOffset) => firstOffset + case None => + transactions += new TxnMetadata(producerId, offset) + offset + } + + this.lastOffset = offset + this.currentTxnFirstOffset = None + this.maxTimestamp = timestamp + this.coordinatorEpoch = endTxnMarker.coordinatorEpoch + CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) + } + + def lastEntry: ProducerIdEntry = + ProducerIdEntry(producerId, producerEpoch, lastSeq, lastOffset, lastSeq - firstSeq, maxTimestamp, + coordinatorEpoch, currentTxnFirstOffset) + + def startedTransactions: List[TxnMetadata] = transactions.toList + + def maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata: LogOffsetMetadata): Unit = { + // we will cache the log offset metadata if it corresponds to the starting offset of + // the last transaction that was started. This is optimized for leader appends where it + // is only possible to have one transaction started for each log append, and the log + // offset metadata will always match in that case since no data from other producers + // is mixed into the append + transactions.headOption.foreach { txn => + if (txn.firstOffset.messageOffset == logOffsetMetadata.messageOffset) + txn.firstOffset = logOffsetMetadata + } + } + +} + +object ProducerStateManager { + private val PidSnapshotVersion: Short = 1 + private val VersionField = "version" + private val CrcField = "crc" + private val PidField = "pid" + private val LastSequenceField = "last_sequence" + private val ProducerEpochField = "epoch" + private val LastOffsetField = "last_offset" + private val OffsetDeltaField = "offset_delta" + private val TimestampField = "timestamp" + private val PidEntriesField = "pid_entries" + private val CoordinatorEpochField = "coordinator_epoch" + private val CurrentTxnFirstOffsetField = "current_txn_first_offset" + + private val VersionOffset = 0 + private val CrcOffset = VersionOffset + 2 + private val PidEntriesOffset = CrcOffset + 4 + + val PidSnapshotEntrySchema = new Schema( + new Field(PidField, Type.INT64, "The producer ID"), + new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"), + new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"), + new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"), + new Field(OffsetDeltaField, Type.INT32, "The difference of the last sequence and first sequence in the last written batch"), + new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry"), + new Field(CoordinatorEpochField, Type.INT32, "The epoch of the last transaction coordinator to send an end transaction marker"), + new Field(CurrentTxnFirstOffsetField, Type.INT64, "The first offset of the on-going transaction (-1 if there is none)")) + val PidSnapshotMapSchema = new Schema( + new Field(VersionField, Type.INT16, "Version of the snapshot file"), + new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"), + new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table")) + + def readSnapshot(file: File): Iterable[ProducerIdEntry] = { + val buffer = Files.readAllBytes(file.toPath) + val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) + + val version = struct.getShort(VersionField) + if (version != PidSnapshotVersion) + throw new IllegalArgumentException(s"Unhandled snapshot file version $version") + + val crc = struct.getUnsignedInt(CrcField) + val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset) + if (crc != computedCrc) + throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " + + s"Stored crc: $crc. Computed crc: $computedCrc") + + struct.getArray(PidEntriesField).map { pidEntryObj => + val pidEntryStruct = pidEntryObj.asInstanceOf[Struct] + val pid: Long = pidEntryStruct.getLong(PidField) + val epoch = pidEntryStruct.getShort(ProducerEpochField) + val seq = pidEntryStruct.getInt(LastSequenceField) + val offset = pidEntryStruct.getLong(LastOffsetField) + val timestamp = pidEntryStruct.getLong(TimestampField) + val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField) + val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField) + val currentTxnFirstOffset = pidEntryStruct.getLong(CurrentTxnFirstOffsetField) + val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, timestamp, + coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) + newEntry + } + } + + private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) { + val struct = new Struct(PidSnapshotMapSchema) + struct.set(VersionField, PidSnapshotVersion) + struct.set(CrcField, 0L) // we'll fill this after writing the entries + val entriesArray = entries.map { + case (pid, entry) => + val pidEntryStruct = struct.instance(PidEntriesField) + pidEntryStruct.set(PidField, pid) + .set(ProducerEpochField, entry.producerEpoch) + .set(LastSequenceField, entry.lastSeq) + .set(LastOffsetField, entry.lastOffset) + .set(OffsetDeltaField, entry.offsetDelta) + .set(TimestampField, entry.timestamp) + .set(CoordinatorEpochField, entry.coordinatorEpoch) + .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L)) + pidEntryStruct + }.toArray + struct.set(PidEntriesField, entriesArray) + + val buffer = ByteBuffer.allocate(struct.sizeOf) + struct.writeTo(buffer) + buffer.flip() + + // now fill in the CRC + val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset) + ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc) + + val fos = new FileOutputStream(file) + try { + fos.write(buffer.array, buffer.arrayOffset, buffer.limit) + } finally { + fos.close() + } + } + + private def isSnapshotFile(name: String): Boolean = name.endsWith(Log.PidSnapshotFileSuffix) + +} + +/** + * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a PID is contained in the map, the corresponding producer can continue to write data. + * However, PIDs can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given PID is retained in the log provided it hasn't expired due to + * age. This ensures that PIDs will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +@nonthreadsafe +class ProducerStateManager(val topicPartition: TopicPartition, + val logDir: File, + val maxPidExpirationMs: Int = 60 * 60 * 1000) extends Logging { + import ProducerStateManager._ + import java.util + + private val producers = mutable.Map.empty[Long, ProducerIdEntry] + private var lastMapOffset = 0L + private var lastSnapOffset = 0L + + // ongoing transactions sorted by the first offset of the transaction + private val ongoingTxns = new util.TreeMap[Long, TxnMetadata] + + // completed transactions whose markers are at offsets above the high watermark + private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + + /** + * An unstable offset is one which is either undecided (i.e. its ultimate outcome is not yet known), + * or one that is decided, but may not have been replicated (i.e. any transaction which has a COMMIT/ABORT + * marker written at a higher offset than the current high watermark). + */ + def firstUnstableOffset: Option[LogOffsetMetadata] = { + val unreplicatedFirstOffset = Option(unreplicatedTxns.firstEntry).map(_.getValue.firstOffset) + val undecidedFirstOffset = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset) + if (unreplicatedFirstOffset.isEmpty) + undecidedFirstOffset + else if (undecidedFirstOffset.isEmpty) + unreplicatedFirstOffset + else if (undecidedFirstOffset.get.messageOffset < unreplicatedFirstOffset.get.messageOffset) + undecidedFirstOffset + else + unreplicatedFirstOffset + } + + /** + * Acknowledge all transactions which have been completed before a given offset. This allows the LSO + * to advance to the next unstable offset. + */ + def onHighWatermarkUpdated(highWatermark: Long): Unit = { + removeUnreplicatedTransactions(highWatermark) + } + + /** + * The first undecided offset is the earliest transactional message which has not yet been committed + * or aborted. + */ + def firstUndecidedOffset: Option[Long] = Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset.messageOffset) + + /** + * Returns the last offset of this map + */ + def mapEndOffset = lastMapOffset + + /** + * Get a copy of the active producers + */ + def activeProducers: immutable.Map[Long, ProducerIdEntry] = producers.toMap + + def isEmpty: Boolean = producers.isEmpty && unreplicatedTxns.isEmpty + + private def loadFromSnapshot(logStartOffset: Long, currentTime: Long) { + while (true) { + latestSnapshotFile match { + case Some(file) => + try { + info(s"Loading producer state from snapshot file ${file.getName} for partition $topicPartition") + readSnapshot(file).filter(!isExpired(currentTime, _)).foreach(loadProducerEntry) + lastSnapOffset = offsetFromFilename(file.getName) + lastMapOffset = lastSnapOffset + return + } catch { + case e: CorruptSnapshotException => + error(s"Snapshot file at ${file.getPath} is corrupt: ${e.getMessage}") + Files.deleteIfExists(file.toPath) + } + case None => + lastSnapOffset = logStartOffset + lastMapOffset = logStartOffset + return + } + } + } + + // visible for testing + private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = { + val pid = entry.producerId + producers.put(pid, entry) + entry.currentTxnFirstOffset.foreach { offset => + ongoingTxns.put(offset, new TxnMetadata(pid, offset)) + } + } + + private def isExpired(currentTimeMs: Long, producerIdEntry: ProducerIdEntry): Boolean = + producerIdEntry.currentTxnFirstOffset.isEmpty && currentTimeMs - producerIdEntry.timestamp >= maxPidExpirationMs + + /** + * Expire any PIDs which have been idle longer than the configured maximum expiration timeout. + */ + def removeExpiredProducers(currentTimeMs: Long) { + producers.retain { case (pid, lastEntry) => + !isExpired(currentTimeMs, lastEntry) + } + } + + /** + * Truncate the PID mapping to the given offset range and reload the entries from the most recent + * snapshot in range (if there is one). Note that the log end offset is assumed to be less than + * or equal to the high watermark. + */ + def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { + if (logEndOffset != mapEndOffset) { + producers.clear() + ongoingTxns.clear() + + // since we assume that the offset is less than or equal to the high watermark, it is + // safe to clear the unreplicated transactions + unreplicatedTxns.clear() + deleteSnapshotFiles { file => + val offset = offsetFromFilename(file.getName) + offset > logEndOffset || offset <= logStartOffset + } + loadFromSnapshot(logStartOffset, currentTimeMs) + } else { + evictUnretainedProducers(logStartOffset) + } + } + + /** + * Update the mapping with the given append information + */ + def update(appendInfo: ProducerAppendInfo): Unit = { + if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID) + throw new IllegalArgumentException("Invalid PID passed to update") + + val entry = appendInfo.lastEntry + producers.put(appendInfo.producerId, entry) + appendInfo.startedTransactions.foreach { txn => + ongoingTxns.put(txn.firstOffset.messageOffset, txn) + } + } + + def updateMapEndOffset(lastOffset: Long): Unit = { + lastMapOffset = lastOffset + } + + /** + * Get the last written entry for the given PID. + */ + def lastEntry(producerId: Long): Option[ProducerIdEntry] = producers.get(producerId) + + /** + * Take a snapshot at the current end offset if one does not already exist. + */ + def takeSnapshot(): Unit = { + // If not a new offset, then it is not worth taking another snapshot + if (lastMapOffset > lastSnapOffset) { + val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset) + debug(s"Writing producer snapshot for partition $topicPartition at offset $lastMapOffset") + writeSnapshot(snapshotFile, producers) + + // Update the last snap offset according to the serialized map + lastSnapOffset = lastMapOffset + } + } + + /** + * Get the last offset (exclusive) of the latest snapshot file. + */ + def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFilename(file.getName)) + + /** + * Get the last offset (exclusive) of the oldest snapshot file. + */ + def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFilename(file.getName)) + + /** + * When we remove the head of the log due to retention, we need to clean up the id map. This method takes + * the new start offset and removes all pids which have a smaller last written offset. + */ + def evictUnretainedProducers(logStartOffset: Long) { + val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset) + val evictedProducerIds = evictedProducerEntries.keySet + + producers --= evictedProducerIds + removeEvictedOngoingTransactions(evictedProducerIds) + removeUnreplicatedTransactions(logStartOffset) + + deleteSnapshotFiles(file => offsetFromFilename(file.getName) <= logStartOffset) + if (lastMapOffset < logStartOffset) + lastMapOffset = logStartOffset + lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) + } + + private def removeEvictedOngoingTransactions(expiredProducerIds: collection.Set[Long]): Unit = { + val iterator = ongoingTxns.entrySet.iterator + while (iterator.hasNext) { + val txnEntry = iterator.next() + if (expiredProducerIds.contains(txnEntry.getValue.producerId)) + iterator.remove() + } + } + + private def removeUnreplicatedTransactions(offset: Long): Unit = { + val iterator = unreplicatedTxns.entrySet.iterator + while (iterator.hasNext) { + val txnEntry = iterator.next() + val lastOffset = txnEntry.getValue.lastOffset + if (lastOffset.exists(_ < offset)) + iterator.remove() + } + } + + /** + * Truncate the PID mapping and remove all snapshots. This resets the state of the mapping. + */ + def truncate() { + producers.clear() + ongoingTxns.clear() + unreplicatedTxns.clear() + deleteSnapshotFiles() + lastSnapOffset = 0L + lastMapOffset = 0L + } + + /** + * Complete the transaction and return the last stable offset. + */ + def completeTxn(completedTxn: CompletedTxn): Long = { + val txnMetdata = ongoingTxns.remove(completedTxn.firstOffset) + if (txnMetdata == null) + throw new IllegalArgumentException("Attempted to complete a transaction which was not started") + + txnMetdata.lastOffset = Some(completedTxn.lastOffset) + unreplicatedTxns.put(completedTxn.firstOffset, txnMetdata) + + val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1) + lastStableOffset + } + + @threadsafe + def deleteSnapshotsBefore(offset: Long): Unit = { + deleteSnapshotFiles(file => offsetFromFilename(file.getName) < offset) + } + + private def listSnapshotFiles: List[File] = { + if (logDir.exists && logDir.isDirectory) + logDir.listFiles.filter(f => f.isFile && isSnapshotFile(f.getName)).toList + else + List.empty[File] + } + + private def oldestSnapshotFile: Option[File] = { + val files = listSnapshotFiles + if (files.nonEmpty) + Some(files.minBy(file => offsetFromFilename(file.getName))) + else + None + } + + private def latestSnapshotFile: Option[File] = { + val files = listSnapshotFiles + if (files.nonEmpty) + Some(files.maxBy(file => offsetFromFilename(file.getName))) + else + None + } + + private def deleteSnapshotFiles(predicate: File => Boolean = _ => true) { + listSnapshotFiles.filter(predicate).foreach(file => Files.deleteIfExists(file.toPath)) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/TimeIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 731b173..19ab71a 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -144,7 +144,7 @@ class TimeIndex(file: File, def lookup(targetTimestamp: Long): TimestampOffset = { maybeLock(lock) { val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY) + val slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY) if (slot == -1) TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset) else { @@ -163,7 +163,7 @@ class TimeIndex(file: File, override def truncateTo(offset: Long) { inLock(lock) { val idx = mmap.duplicate - val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE) + val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.VALUE) /* There are 3 cases for choosing the new size * 1) if there is no entry in the index <= the offset, delete everything @@ -206,4 +206,5 @@ class TimeIndex(file: File, "Time index file " + file.getAbsolutePath + " is corrupt, found " + len + " bytes which is not positive or not a multiple of 12.") } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/TransactionIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala new file mode 100644 index 0000000..bf6a6d4 --- /dev/null +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io.{File, IOException} +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.StandardOpenOption + +import kafka.utils.{Logging, nonthreadsafe} +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.apache.kafka.common.utils.Utils + +import scala.collection.mutable.ListBuffer + +private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTransaction], isComplete: Boolean) + +/** + * The transaction index maintains metadata about the aborted transactions for each segment. This includes + * the start and end offsets for the aborted transactions and the last stable offset (LSO) at the time of + * the abort. This index is used to find the aborted transactions in the range of a given fetch request at + * the READ_COMMITTED isolation level. + * + * There is at most one transaction index for each log segment. The entries correspond to the transactions + * whose commit markers were written in the corresponding log segment. Note, however, that individual transactions + * may span multiple segments. Recovering the index therefore requires scanning the earlier segments in + * order to find the start of the transactions. + */ +@nonthreadsafe +class TransactionIndex(val startOffset: Long, @volatile var file: File) extends Logging { + // note that the file is not created until we need it + @volatile private var maybeChannel: Option[FileChannel] = None + private var lastOffset: Option[Long] = None + + if (file.exists) + openChannel() + + def append(abortedTxn: AbortedTxn): Unit = { + lastOffset.foreach { offset => + if (offset >= abortedTxn.lastOffset) + throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially") + } + lastOffset = Some(abortedTxn.lastOffset) + Utils.writeFully(channel, abortedTxn.buffer.duplicate()) + } + + def flush(): Unit = maybeChannel.foreach(_.force(true)) + + def delete(): Boolean = { + maybeChannel.forall { channel => + channel.force(true) + close() + file.delete() + } + } + + private def channel: FileChannel = { + maybeChannel match { + case Some(channel) => channel + case None => openChannel() + } + } + + private def openChannel(): FileChannel = { + val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE, + StandardOpenOption.CREATE) + maybeChannel = Some(channel) + channel.position(channel.size) + channel + } + + def truncate() = { + maybeChannel.foreach(_.truncate(0)) + lastOffset = None + } + + def close(): Unit = { + maybeChannel.foreach(_.close()) + maybeChannel = None + } + + def renameTo(f: File): Unit = { + try { + if (file.exists) + Utils.atomicMoveWithFallback(file.toPath, f.toPath) + } finally file = f + } + + def truncateTo(offset: Long): Unit = { + val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) + var newLastOffset: Option[Long] = None + for ((abortedTxn, position) <- iterator(() => buffer)) { + if (abortedTxn.lastOffset >= offset) { + channel.truncate(position) + lastOffset = newLastOffset + return + } + newLastOffset = Some(abortedTxn.lastOffset) + } + } + + private def iterator(allocate: () => ByteBuffer): Iterator[(AbortedTxn, Int)] = { + maybeChannel match { + case None => Iterator.empty + case Some(channel) => + var position = 0 + + new Iterator[(AbortedTxn, Int)] { + override def hasNext: Boolean = channel.position - position >= AbortedTxn.TotalSize + + override def next(): (AbortedTxn, Int) = { + try { + val buffer = allocate() + Utils.readFully(channel, buffer, position) + buffer.flip() + + val abortedTxn = new AbortedTxn(buffer) + if (abortedTxn.version > AbortedTxn.CurrentVersion) + throw new KafkaException(s"Unexpected aborted transaction version ${abortedTxn.version}, " + + s"current version is ${AbortedTxn.CurrentVersion}") + val nextEntry = (abortedTxn, position) + position += AbortedTxn.TotalSize + nextEntry + } catch { + case e: IOException => + // We received an unexpected error reading from the index file. We propagate this as an + // UNKNOWN error to the consumer, which will cause it to retry the fetch. + throw new KafkaException(s"Failed to read from the transaction index $file", e) + } + } + } + } + } + + def allAbortedTxns: List[AbortedTxn] = { + iterator(() => ByteBuffer.allocate(AbortedTxn.TotalSize)).map(_._1).toList + } + + /** + * Collect all aborted transactions which overlap with a given fetch range. + * + * @param fetchOffset Inclusive first offset of the fetch range + * @param upperBoundOffset Exclusive last offset in the fetch range + * @return An object containing the aborted transactions and whether the search needs to continue + * into the next log segment. + */ + def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = { + val abortedTransactions = ListBuffer.empty[AbortedTransaction] + val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) + for ((abortedTxn, _) <- iterator(() => buffer)) { + if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset) + abortedTransactions += abortedTxn.asAbortedTransaction + + if (abortedTxn.lastStableOffset >= upperBoundOffset) + return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true) + } + TxnIndexSearchResult(abortedTransactions.toList, isComplete = false) + } + + def sanityCheck(): Unit = { + val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize) + for ((abortedTxn, _) <- iterator(() => buffer)) { + require(abortedTxn.lastOffset >= startOffset) + } + } + +} + +private[log] object AbortedTxn { + val VersionOffset = 0 + val VersionSize = 2 + val ProducerIdOffset = VersionOffset + VersionSize + val ProducerIdSize = 8 + val FirstOffsetOffset = ProducerIdOffset + ProducerIdSize + val FirstOffsetSize = 8 + val LastOffsetOffset = FirstOffsetOffset + FirstOffsetSize + val LastOffsetSize = 8 + val LastStableOffsetOffset = LastOffsetOffset + LastOffsetSize + val LastStableOffsetSize = 8 + val TotalSize = LastStableOffsetOffset + LastStableOffsetSize + + val CurrentVersion: Short = 0 +} + +private[log] class AbortedTxn(val buffer: ByteBuffer) { + import AbortedTxn._ + + def this(producerId: Long, + firstOffset: Long, + lastOffset: Long, + lastStableOffset: Long) = { + this(ByteBuffer.allocate(AbortedTxn.TotalSize)) + buffer.putShort(CurrentVersion) + buffer.putLong(producerId) + buffer.putLong(firstOffset) + buffer.putLong(lastOffset) + buffer.putLong(lastStableOffset) + buffer.flip() + } + + def this(completedTxn: CompletedTxn, lastStableOffset: Long) = + this(completedTxn.producerId, completedTxn.firstOffset, completedTxn.lastOffset, lastStableOffset) + + def version: Short = buffer.get(VersionOffset) + + def producerId: Long = buffer.getLong(ProducerIdOffset) + + def firstOffset: Long = buffer.getLong(FirstOffsetOffset) + + def lastOffset: Long = buffer.getLong(LastOffsetOffset) + + def lastStableOffset: Long = buffer.getLong(LastStableOffsetOffset) + + def asAbortedTransaction: AbortedTransaction = new AbortedTransaction(producerId, firstOffset) + + override def toString: String = + s"AbortedTxn(version=$version, producerId=$producerId, firstOffset=$firstOffset, " + + s"lastOffset=$lastOffset, lastStableOffset=$lastStableOffset)" + + override def equals(any: Any): Boolean = { + any match { + case that: AbortedTxn => this.buffer.equals(that.buffer) + case _ => false + } + } + + override def hashCode(): Int = buffer.hashCode +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index cbee78a..8c4731a 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -23,6 +23,7 @@ import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException} import org.apache.kafka.common.requests.FetchRequest.PartitionData +import org.apache.kafka.common.requests.IsolationLevel import scala.collection._ @@ -45,9 +46,11 @@ case class FetchMetadata(fetchMinBytes: Int, fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) { override def toString = "[minBytes: " + fetchMinBytes + ", " + - "onlyLeader:" + fetchOnlyLeader + ", " - "onlyCommitted: " + fetchOnlyCommitted + ", " - "partitionStatus: " + fetchPartitionStatus + "]" + "maxBytes:" + fetchMaxBytes + ", " + + "onlyLeader:" + fetchOnlyLeader + ", " + + "onlyCommitted: " + fetchOnlyCommitted + ", " + + "replicaId: " + replicaId + ", " + + "partitionStatus: " + fetchPartitionStatus + "]" } /** * A delayed fetch operation that can be created by the replica manager and watched @@ -57,6 +60,7 @@ class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, quota: ReplicaQuota, + isolationLevel: IsolationLevel, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) extends DelayedOperation(delayMs) { @@ -80,7 +84,9 @@ class DelayedFetch(delayMs: Long, if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val replica = replicaManager.getLeaderReplicaIfLocal(topicPartition) val endOffset = - if (fetchMetadata.fetchOnlyCommitted) + if (isolationLevel == IsolationLevel.READ_COMMITTED) + replica.lastStableOffset + else if (fetchMetadata.fetchOnlyCommitted) replica.highWatermark else replica.logEndOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/FetchDataInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala index acfb5b0..cbd54c0 100644 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -18,7 +18,9 @@ package kafka.server import org.apache.kafka.common.record.Records +import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, records: Records, - firstEntryIncomplete: Boolean = false) + firstEntryIncomplete: Boolean = false, + abortedTransactions: Option[List[AbortedTransaction]] = None) http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3d821f7..fbd74ac 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,6 +21,8 @@ import java.nio.ByteBuffer import java.lang.{Long => JLong} import java.util.{Collections, Properties} import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse} @@ -41,7 +43,7 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} -import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType} +import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} @@ -436,6 +438,7 @@ class KafkaApis(val requestChannel: RequestChannel, produceRequest.timeout.toLong, produceRequest.acks, internalTopicsAllowed, + isFromClient = true, authorizedRequestInfo, sendResponseCallback) @@ -495,8 +498,9 @@ class KafkaApis(val requestChannel: RequestChannel, case _ => data } + val abortedTransactions = convertedData.abortedTransactions.map(_.asJava).orNull tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, - convertedData.logStartOffset, null, convertedData.records) + convertedData.logStartOffset, abortedTransactions, convertedData.records) } } @@ -560,7 +564,8 @@ class KafkaApis(val requestChannel: RequestChannel, versionId <= 2, authorizedRequestInfo, replicationQuota(fetchRequest), - sendResponseCallback) + sendResponseCallback, + fetchRequest.isolationLevel) } } @@ -589,7 +594,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (version == 0) handleListOffsetRequestV0(request) else - handleListOffsetRequestV1(request) + handleListOffsetRequestV1AndAbove(request) def createResponse(throttleTimeMs: Int): AbstractResponse = new ListOffsetResponse(throttleTimeMs, mergedResponseMap.asJava) sendResponseMaybeThrottle(request, createResponse) @@ -646,7 +651,7 @@ class KafkaApis(val requestChannel: RequestChannel, responseMap ++ unauthorizedResponseStatus } - private def handleListOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] @@ -679,9 +684,13 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getReplicaOrException(topicPartition) val found = { - if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) - TimestampOffset(RecordBatch.NO_TIMESTAMP, localReplica.highWatermark.messageOffset) - else { + if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP) { + val lastFetchableOffset = offsetRequest.isolationLevel match { + case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset + case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset + } + TimestampOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset) + } else { def allowed(timestampOffset: TimestampOffset): Boolean = !fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset @@ -1415,9 +1424,45 @@ class KafkaApis(val requestChannel: RequestChannel, def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = { authorizeClusterAction(request) - val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]() - val responseBody = new WriteTxnMarkersResponse(emptyResponse) - sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody)) + val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest] + val errors = new ConcurrentHashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]() + val markers = writeTxnMarkersRequest.markers + val numAppends = new AtomicInteger(markers.size) + + if (numAppends.get == 0) { + sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) + return + } + + def sendResponseCallback(pid: Long)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + errors.put(pid, responseStatus.mapValues(_.error).asJava) + if (numAppends.decrementAndGet() == 0) + sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) + } + + // TODO: The current append API makes doing separate writes per producerId a little easier, but it would + // be nice to have only one append to the log. This requires pushing the building of the control records + // into Log so that we only append those having a valid producer epoch, and exposing a new appendControlRecord + // API in ReplicaManager. For now, we've done the simpler approach + for (marker <- markers.asScala) { + val producerId = marker.producerId + val controlRecords = marker.partitions.asScala.map { partition => + val controlRecordType = marker.transactionResult match { + case TransactionResult.COMMIT => ControlRecordType.COMMIT + case TransactionResult.ABORT => ControlRecordType.ABORT + } + val endTxnMarker = new EndTransactionMarker(controlRecordType, marker.coordinatorEpoch) + partition -> MemoryRecords.withEndTransactionMarker(producerId, marker.producerEpoch, endTxnMarker) + }.toMap + + replicaManager.appendRecords( + timeout = config.requestTimeoutMs.toLong, + requiredAcks = -1, + internalTopicsAllowed = true, + isFromClient = false, + entriesPerPartition = controlRecords, + sendResponseCallback(producerId)) + } } def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/LogOffsetMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index 05e9842..edc010e 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -44,7 +44,7 @@ case class LogOffsetMetadata(messageOffset: Long, // check if this offset is already on an older segment compared with the given offset def onOlderSegment(that: LogOffsetMetadata): Boolean = { - if (messageOffsetOnly()) + if (messageOffsetOnly) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset < that.segmentBaseOffset @@ -52,7 +52,7 @@ case class LogOffsetMetadata(messageOffset: Long, // check if this offset is on the same segment with the given offset def onSameSegment(that: LogOffsetMetadata): Boolean = { - if (messageOffsetOnly()) + if (messageOffsetOnly) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset == that.segmentBaseOffset @@ -68,14 +68,14 @@ case class LogOffsetMetadata(messageOffset: Long, def positionDiff(that: LogOffsetMetadata): Int = { if(!onSameSegment(that)) throw new KafkaException(s"$this cannot compare its segment position with $that since they are not on the same segment") - if(messageOffsetOnly()) + if(messageOffsetOnly) throw new KafkaException(s"$this cannot compare its segment position with $that since it only has message offset info") this.relativePositionInSegment - that.relativePositionInSegment } // decide if the offset metadata only contains message offset info - def messageOffsetOnly(): Boolean = { + def messageOffsetOnly: Boolean = { segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index de670e8..663ab1e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Time import org.apache.kafka.common.requests.FetchRequest.PartitionData +import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import scala.collection._ import scala.collection.JavaConverters._ @@ -95,7 +96,8 @@ case class LogReadResult(info: FetchDataInfo, } -case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records) +case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records, + abortedTransactions: Option[List[AbortedTransaction]] = None) object LogReadResult { val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), @@ -334,12 +336,14 @@ class ReplicaManager(val config: KafkaConfig, def appendRecords(timeout: Long, requiredAcks: Short, internalTopicsAllowed: Boolean, + isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit) { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds - val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks) + val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, + isFromClient = isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map { case (topicPartition, result) => @@ -493,6 +497,7 @@ class ReplicaManager(val config: KafkaConfig, * Append the messages to the local replica logs */ private def appendToLocalLog(internalTopicsAllowed: Boolean, + isFromClient: Boolean, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(entriesPerPartition)) @@ -510,7 +515,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionOpt = getPartition(topicPartition) val info = partitionOpt match { case Some(partition) => - partition.appendRecordsToLeader(records, requiredAcks) + partition.appendRecordsToLeader(records, isFromClient, requiredAcks) case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicPartition, localBrokerId)) @@ -566,7 +571,8 @@ class ReplicaManager(val config: KafkaConfig, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota = UnboundedQuota, - responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) { + responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + isolationLevel: IsolationLevel) { val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -579,7 +585,8 @@ class ReplicaManager(val config: KafkaConfig, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, - quota = quota) + quota = quota, + isolationLevel = isolationLevel) // if the fetch comes from the follower, // update its corresponding log end offset @@ -598,7 +605,8 @@ class ReplicaManager(val config: KafkaConfig, // 4) some error happens while reading data if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => - tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records) + tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records, + result.info.abortedTransactions) } responseCallback(fetchPartitionData) } else { @@ -611,7 +619,7 @@ class ReplicaManager(val config: KafkaConfig, } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus) - val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) + val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, isolationLevel, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) } @@ -632,7 +640,8 @@ class ReplicaManager(val config: KafkaConfig, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, readPartitionInfo: Seq[(TopicPartition, PartitionData)], - quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { + quota: ReplicaQuota, + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Seq[(TopicPartition, LogReadResult)] = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.fetchOffset @@ -654,7 +663,9 @@ class ReplicaManager(val config: KafkaConfig, getReplicaOrException(tp) // decide whether to only fetch committed data (i.e. messages below high watermark) - val maxOffsetOpt = if (readOnlyCommitted) + val maxOffsetOpt = if (isolationLevel == IsolationLevel.READ_COMMITTED) + Some(localReplica.lastStableOffset.messageOffset) + else if (readOnlyCommitted) Some(localReplica.highWatermark.messageOffset) else None @@ -674,7 +685,7 @@ class ReplicaManager(val config: KafkaConfig, val adjustedFetchSize = math.min(partitionFetchSize, limitBytes) // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition - val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage) + val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel) // If the partition is being throttled, simply return an empty set. if (shouldLeaderThrottle(quota, tp, replicaId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 7a5f671..0b0ad7b 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -105,6 +105,8 @@ object DumpLogSegments { dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize) case Log.PidSnapshotFileSuffix => dumpPidSnapshot(file) + case Log.TxnIndexFileSuffix => + dumpTxnIndex(file) case _ => System.err.println(s"Ignoring unknown file $file") } @@ -131,11 +133,20 @@ object DumpLogSegments { } } + private def dumpTxnIndex(file: File): Unit = { + val index = new TransactionIndex(Log.offsetFromFilename(file.getName), file) + for (abortedTxn <- index.allAbortedTxns) { + println(s"version: ${abortedTxn.version} pid: ${abortedTxn.producerId} firstOffset: ${abortedTxn.firstOffset} " + + s"lastOffset: ${abortedTxn.lastOffset} lastStableOffset: ${abortedTxn.lastStableOffset}") + } + } + private def dumpPidSnapshot(file: File): Unit = { try { - ProducerIdMapping.readSnapshot(file).foreach { case (pid, entry) => - println(s"pid: $pid epoch: ${entry.epoch} lastSequence: ${entry.lastSeq} lastOffset: ${entry.lastOffset} " + - s"offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp}") + ProducerStateManager.readSnapshot(file).foreach { entry=> + println(s"producerId: ${entry.producerId} producerEpoch: ${entry.producerEpoch} lastSequence: ${entry.lastSeq} " + + s"lastOffset: ${entry.lastOffset} offsetDelta: ${entry.offsetDelta} lastTimestamp: ${entry.timestamp} " + + s"coordinatorEpoch: ${entry.coordinatorEpoch} currentTxnFirstOffset: ${entry.currentTxnFirstOffset}") } } catch { case e: CorruptSnapshotException => @@ -349,9 +360,15 @@ object DumpLogSegments { " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]")) } - if (record.isControlRecord) { - val controlType = ControlRecordType.parse(record.key) - print(s" controlType: $controlType") + if (batch.isControlBatch) { + val controlTypeId = ControlRecordType.parseTypeId(record.key) + ControlRecordType.fromTypeId(controlTypeId) match { + case ControlRecordType.ABORT | ControlRecordType.COMMIT => + val endTxnMarker = EndTransactionMarker.deserialize(record) + print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}") + case controlType => + print(s" controlType: $controlType($controlTypeId)") + } } else if (printContents) { val (key, payload) = parser.parse(record) key.foreach(key => print(s" key: $key")) http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 757e216..4277d26 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -199,7 +199,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def createListOffsetsRequest = { - requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes( + requests.ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED).setTargetTimes( Map(tp -> (0L: java.lang.Long)).asJava). build() } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala index b4aa56f..2dfbf48 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala @@ -1124,7 +1124,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), - EasyMock.anyBoolean(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( @@ -1205,7 +1206,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), - EasyMock.anyBoolean(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 387d4b3..9053e0a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -28,7 +28,7 @@ import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.OffsetFetchResponse +import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.Assert.{assertEquals, assertFalse, assertTrue} @@ -509,7 +509,8 @@ class GroupMetadataManagerTest { time.sleep(2) EasyMock.reset(partition) - EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), + isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(partition) @@ -541,7 +542,8 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) - EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), + isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(replicaManager, partition) @@ -588,7 +590,8 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) - EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), + isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(replicaManager, partition) @@ -664,7 +667,8 @@ class GroupMetadataManagerTest { EasyMock.reset(partition) val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() - EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), + isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(partition) @@ -738,7 +742,8 @@ class GroupMetadataManagerTest { // expect the offset tombstone EasyMock.reset(partition) - EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt())) + EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), + isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) EasyMock.replay(partition) @@ -758,7 +763,8 @@ class GroupMetadataManagerTest { val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), - EasyMock.anyBoolean(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( @@ -794,7 +800,7 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset)) - EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true))) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) .andReturn(records.buffer) http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 94dc12b..09a89dd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -27,6 +27,7 @@ import kafka.utils.TestUtils.fail import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ +import org.apache.kafka.common.requests.IsolationLevel import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.MockTime import org.junit.Assert.{assertEquals, assertFalse, assertTrue} @@ -349,7 +350,8 @@ class TransactionStateManagerTest { EasyMock.expect(replicaManager.getLogEndOffset(topicPartition)).andStubReturn(Some(endOffset)) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) - EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true))) + EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), + EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) .andReturn(records.buffer) @@ -363,7 +365,8 @@ class TransactionStateManagerTest { val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), - EasyMock.anyBoolean(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))) .andAnswer(new IAnswer[Unit] { http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index cfd66de..a42ae22 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -57,7 +57,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin /* append two messages */ log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0, - new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) + new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) def readBatch(offset: Int) = log.read(offset, 4096).records.batches.iterator.next() http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 19a97bc..8a119c2 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -259,7 +259,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield { val value = counter.toString val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.toString.getBytes, codec = codec, - key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0) + key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0) counter += 1 (key, value, appendInfo.firstOffset) }
