Repository: kafka Updated Branches: refs/heads/trunk 1cb39f757 -> a1c8e7d94
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 4b2cedb..d7b1c33 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -73,28 +73,28 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc private var coordinatorEpoch = initialEntry.coordinatorEpoch private val transactions = ListBuffer.empty[TxnMetadata] - def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) = - this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog) + def this(producerId: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: Boolean) = + this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog) - private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = { - if (this.producerEpoch > epoch) { + private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: Int, shouldValidateSequenceNumbers: Boolean) = { + if (this.producerEpoch > producerEpoch) { throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " + - s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} (server epoch)") + s"with a newer epoch. $producerEpoch (request epoch), ${this.producerEpoch} (server epoch)") } else if (shouldValidateSequenceNumbers) { - if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < epoch) { + if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || this.producerEpoch < producerEpoch) { if (firstSeq != 0) - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " + + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + s"(request epoch), $firstSeq (seq. number)") } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) { // the epoch was bumped by a control record, so we expect the sequence number to be reset - throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), found $firstSeq " + + throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " + s"(incoming seq. number), but expected 0") } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) { - throw new DuplicateSequenceNumberException(s"Duplicate sequence number: pid: $producerId, (incomingBatch.firstSeq, " + + throw new DuplicateSequenceNumberException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " + s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, lastEntry.lastSeq): " + s"(${this.firstSeq}, ${this.lastSeq}).") } else if (firstSeq != this.lastSeq + 1L) { - throw new OutOfOrderSequenceException(s"Out of order sequence number: $producerId (pid), $firstSeq " + + throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " + s"(incoming seq. number), ${this.lastSeq} (current end sequence number)") } } @@ -202,25 +202,25 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc } object ProducerStateManager { - private val PidSnapshotVersion: Short = 1 + private val ProducerSnapshotVersion: Short = 1 private val VersionField = "version" private val CrcField = "crc" - private val PidField = "pid" + private val ProducerIdField = "producer_id" private val LastSequenceField = "last_sequence" private val ProducerEpochField = "epoch" private val LastOffsetField = "last_offset" private val OffsetDeltaField = "offset_delta" private val TimestampField = "timestamp" - private val PidEntriesField = "pid_entries" + private val ProducerEntriesField = "producer_entries" private val CoordinatorEpochField = "coordinator_epoch" private val CurrentTxnFirstOffsetField = "current_txn_first_offset" private val VersionOffset = 0 private val CrcOffset = VersionOffset + 2 - private val PidEntriesOffset = CrcOffset + 4 + private val ProducerEntriesOffset = CrcOffset + 4 - val PidSnapshotEntrySchema = new Schema( - new Field(PidField, Type.INT64, "The producer ID"), + val ProducerSnapshotEntrySchema = new Schema( + new Field(ProducerIdField, Type.INT64, "The producer ID"), new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"), new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"), new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"), @@ -231,33 +231,33 @@ object ProducerStateManager { val PidSnapshotMapSchema = new Schema( new Field(VersionField, Type.INT16, "Version of the snapshot file"), new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"), - new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table")) + new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table")) def readSnapshot(file: File): Iterable[ProducerIdEntry] = { val buffer = Files.readAllBytes(file.toPath) val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) val version = struct.getShort(VersionField) - if (version != PidSnapshotVersion) + if (version != ProducerSnapshotVersion) throw new IllegalArgumentException(s"Unhandled snapshot file version $version") val crc = struct.getUnsignedInt(CrcField) - val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset) + val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset) if (crc != computedCrc) throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " + s"Stored crc: $crc. Computed crc: $computedCrc") - struct.getArray(PidEntriesField).map { pidEntryObj => - val pidEntryStruct = pidEntryObj.asInstanceOf[Struct] - val pid: Long = pidEntryStruct.getLong(PidField) - val epoch = pidEntryStruct.getShort(ProducerEpochField) - val seq = pidEntryStruct.getInt(LastSequenceField) - val offset = pidEntryStruct.getLong(LastOffsetField) - val timestamp = pidEntryStruct.getLong(TimestampField) - val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField) - val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField) - val currentTxnFirstOffset = pidEntryStruct.getLong(CurrentTxnFirstOffsetField) - val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, timestamp, + struct.getArray(ProducerEntriesField).map { producerEntryObj => + val producerEntryStruct = producerEntryObj.asInstanceOf[Struct] + val producerId: Long = producerEntryStruct.getLong(ProducerIdField) + val producerEpoch = producerEntryStruct.getShort(ProducerEpochField) + val seq = producerEntryStruct.getInt(LastSequenceField) + val offset = producerEntryStruct.getLong(LastOffsetField) + val timestamp = producerEntryStruct.getLong(TimestampField) + val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) + val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) + val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) + val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp, coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) newEntry } @@ -265,12 +265,12 @@ object ProducerStateManager { private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) { val struct = new Struct(PidSnapshotMapSchema) - struct.set(VersionField, PidSnapshotVersion) + struct.set(VersionField, ProducerSnapshotVersion) struct.set(CrcField, 0L) // we'll fill this after writing the entries val entriesArray = entries.map { - case (pid, entry) => - val pidEntryStruct = struct.instance(PidEntriesField) - pidEntryStruct.set(PidField, pid) + case (producerId, entry) => + val producerEntryStruct = struct.instance(ProducerEntriesField) + producerEntryStruct.set(ProducerIdField, producerId) .set(ProducerEpochField, entry.producerEpoch) .set(LastSequenceField, entry.lastSeq) .set(LastOffsetField, entry.lastOffset) @@ -278,16 +278,16 @@ object ProducerStateManager { .set(TimestampField, entry.timestamp) .set(CoordinatorEpochField, entry.coordinatorEpoch) .set(CurrentTxnFirstOffsetField, entry.currentTxnFirstOffset.getOrElse(-1L)) - pidEntryStruct + producerEntryStruct }.toArray - struct.set(PidEntriesField, entriesArray) + struct.set(ProducerEntriesField, entriesArray) val buffer = ByteBuffer.allocate(struct.sizeOf) struct.writeTo(buffer) buffer.flip() // now fill in the CRC - val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset) + val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset) ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc) val fos = new FileOutputStream(file) @@ -404,10 +404,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, // visible for testing private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = { - val pid = entry.producerId - producers.put(pid, entry) + val producerId = entry.producerId + producers.put(producerId, entry) entry.currentTxnFirstOffset.foreach { offset => - ongoingTxns.put(offset, new TxnMetadata(pid, offset)) + ongoingTxns.put(offset, new TxnMetadata(producerId, offset)) } } @@ -418,7 +418,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, * Expire any PIDs which have been idle longer than the configured maximum expiration timeout. */ def removeExpiredProducers(currentTimeMs: Long) { - producers.retain { case (pid, lastEntry) => + producers.retain { case (producerId, lastEntry) => !isExpired(currentTimeMs, lastEntry) } } @@ -496,7 +496,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * When we remove the head of the log due to retention, we need to clean up the id map. This method takes - * the new start offset and removes all pids which have a smaller last written offset. + * the new start offset and removes all producerIds which have a smaller last written offset. */ def evictUnretainedProducers(logStartOffset: Long) { val evictedProducerEntries = producers.filter(_._2.lastOffset < logStartOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index aaa2458..1f8bea5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -32,7 +32,7 @@ import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName, is import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.controller.KafkaController import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} -import kafka.coordinator.transaction.{InitPidResult, TransactionCoordinator} +import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} import kafka.network.RequestChannel.{Response, Session} @@ -110,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) - case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request) + case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request) case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request) case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request) @@ -1386,20 +1386,20 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleInitPidRequest(request: RequestChannel.Request): Unit = { - val initPidRequest = request.body[InitPidRequest] - val transactionalId = initPidRequest.transactionalId + def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = { + val initProducerIdRequest = request.body[InitProducerIdRequest] + val transactionalId = initProducerIdRequest.transactionalId // Send response callback - def sendResponseCallback(result: InitPidResult): Unit = { + def sendResponseCallback(result: InitProducerIdResult): Unit = { def createResponse(throttleTimeMs: Int): AbstractResponse = { - val responseBody: InitPidResponse = new InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch) - trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest with result $result from client ${request.header.clientId}.") + val responseBody = new InitProducerIdResponse(throttleTimeMs, result.error, result.producerId, result.producerEpoch) + trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") responseBody } sendResponseMaybeThrottle(request, createResponse) } - txnCoordinator.handleInitPid(transactionalId, initPidRequest.transactionTimeoutMs, sendResponseCallback) + txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) } def handleEndTxnRequest(request: RequestChannel.Request): Unit = { @@ -1408,7 +1408,7 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(error: Errors) { def createResponse(throttleTimeMs: Int): AbstractResponse = { val responseBody = new EndTxnResponse(throttleTimeMs, error) - trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest with command: ${endTxnRequest.command()}, errors: $error from client ${request.header.clientId}.") + trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest with command: ${endTxnRequest.command}, errors: $error from client ${request.header.clientId}.") responseBody } sendResponseMaybeThrottle(request, createResponse) @@ -1433,23 +1433,22 @@ class KafkaApis(val requestChannel: RequestChannel, return } - def sendResponseCallback(pid: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { - errors.put(pid, responseStatus.mapValues(_.error).asJava) + def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + errors.put(producerId, responseStatus.mapValues(_.error).asJava) val successfulPartitions = responseStatus.filter { case (_, partitionResponse) => partitionResponse.error == Errors.NONE }.keys.toSeq try { - groupCoordinator.handleTxnCompletion(producerId = pid, topicPartitions = successfulPartitions, transactionResult = result) + groupCoordinator.handleTxnCompletion(producerId = producerId, topicPartitions = successfulPartitions, transactionResult = result) } catch { case e: Exception => error(s"Received an exception while trying to update the offsets cache on transaction completion: $e") - val producerIdErrors = errors.get(pid) + val producerIdErrors = errors.get(producerId) successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN)) } - if (numAppends.decrementAndGet() == 0) sendResponseExemptThrottle(request, new RequestChannel.Response(request, new WriteTxnMarkersResponse(errors))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 690d167..5ee4b12 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -590,7 +590,7 @@ object KafkaConfig { /** ********* Transaction management configuration ***********/ val TransactionalIdExpirationMsDoc = "The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer's transactional id without receiving any transaction status updates from it." val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " + - "If a clientâs requested transaction time exceed this, then the broker will return an error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction." + "If a clientâs requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction." val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " config for the transaction topic." val TransactionsLoadBufferSizeDoc = "Batch size for reading from the transaction log segments when loading pid and transactions into the cache." val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " + http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4e2b11a..c12f774 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -64,7 +64,7 @@ object ZkUtils { val BrokerSequenceIdPath = s"$BrokersPath/seqid" val ConfigChangesPath = s"$ConfigPath/changes" val ConfigUsersPath = s"$ConfigPath/users" - val PidBlockPath = "/latest_pid_block" + val ProducerIdBlockPath = "/latest_pid_block" // Important: it is necessary to add any new top level Zookeeper path to the Seq val SecureZkRootPaths = Seq(AdminPath, BrokersPath, @@ -75,7 +75,7 @@ object ZkUtils { IsrChangeNotificationPath, KafkaAclPath, KafkaAclChangesPath, - PidBlockPath) + ProducerIdBlockPath) // Important: it is necessary to add any new top level Zookeeper path that contains // sensitive information that should not be world readable to the Seq @@ -239,7 +239,7 @@ class ZkUtils(val zkClient: ZkClient, DeleteTopicsPath, BrokerSequenceIdPath, IsrChangeNotificationPath, - PidBlockPath) + ProducerIdBlockPath) // Visible for testing val zkPath = new ZkPath(zkClient) http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala index 85c631c..b032f8d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala @@ -68,22 +68,22 @@ class ProducerIdManagerTest { val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils) val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils) - val pid1 = manager1.nextPid() - val pid2 = manager2.nextPid() + val pid1 = manager1.generateProducerId() + val pid2 = manager2.generateProducerId() assertEquals(0, pid1) assertEquals(ProducerIdManager.PidBlockSize, pid2) for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) { - assertEquals(pid1 + i, manager1.nextPid()) + assertEquals(pid1 + i, manager1.generateProducerId()) } for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) { - assertEquals(pid2 + i, manager2.nextPid()) + assertEquals(pid2 + i, manager2.generateProducerId()) } - assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid()) - assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid()) + assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.generateProducerId()) + assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.generateProducerId()) } @Test(expected = classOf[KafkaException]) @@ -91,7 +91,7 @@ class ProducerIdManagerTest { EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString())) .andAnswer(new IAnswer[(Option[String], Int)] { override def answer(): (Option[String], Int) = { - (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0, + (Some(ProducerIdManager.generateProducerIdBlockJson(ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue))), 0) } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala index df23952..83cba71 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala @@ -47,19 +47,19 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness { val tc = servers.head.transactionCoordinator - var initPidResult: InitPidResult = null - def callback(result: InitPidResult): Unit = { - initPidResult = result + var initProducerIdResult: InitProducerIdResult = null + def callback(result: InitProducerIdResult): Unit = { + initProducerIdResult = result } val txnId = "txn" - tc.handleInitPid(txnId, 900000, callback) + tc.handleInitProducerId(txnId, 900000, callback) - while(initPidResult == null) { + while(initProducerIdResult == null) { Utils.sleep(1) } - Assert.assertEquals(Errors.NONE, initPidResult.error) + Assert.assertEquals(Errors.NONE, initProducerIdResult.error) @volatile var addPartitionErrors: Errors = null def addPartitionsCallback(errors: Errors): Unit = { @@ -67,8 +67,8 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness { } tc.handleAddPartitionsToTransaction(txnId, - initPidResult.pid, - initPidResult.epoch, + initProducerIdResult.producerId, + initProducerIdResult.producerEpoch, Set[TopicPartition](new TopicPartition(topic, 0)), addPartitionsCallback ) http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index 395bfb9..2f4f572 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -58,11 +58,11 @@ class TransactionCoordinatorTest { txnMarkerPurgatory, time) - var result: InitPidResult = _ + var result: InitProducerIdResult = _ var error: Errors = Errors.NONE private def mockPidManager(): Unit = { - EasyMock.expect(pidManager.nextPid()) + EasyMock.expect(pidManager.generateProducerId()) .andAnswer(new IAnswer[Long] { override def answer(): Long = { nextPid += 1 @@ -90,10 +90,10 @@ class TransactionCoordinatorTest { mockPidManager() EasyMock.replay(pidManager) - coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback) - assertEquals(InitPidResult(0L, 0, Errors.NONE), result) - coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback) - assertEquals(InitPidResult(1L, 0, Errors.NONE), result) + coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback) + assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result) + coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback) + assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result) } @Test @@ -101,10 +101,10 @@ class TransactionCoordinatorTest { mockPidManager() EasyMock.replay(pidManager) - coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback) - assertEquals(InitPidResult(0L, 0, Errors.NONE), result) - coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback) - assertEquals(InitPidResult(1L, 0, Errors.NONE), result) + coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback) + assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result) + coordinator.handleInitProducerId(null, txnTimeoutMs, initProducerIdMockCallback) + assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result) } @Test @@ -143,16 +143,16 @@ class TransactionCoordinatorTest { .anyTimes() EasyMock.replay(pidManager, transactionManager) - coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback) - assertEquals(InitPidResult(nextPid - 1, 0, Errors.NONE), result) + coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback) + assertEquals(InitProducerIdResult(nextPid - 1, 0, Errors.NONE), result) } @Test def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinatorForId(): Unit = { mockPidManager() EasyMock.replay(pidManager) - coordinator.handleInitPid("some-pid", txnTimeoutMs, initPidMockCallback) - assertEquals(InitPidResult(-1, -1, Errors.NOT_COORDINATOR), result) + coordinator.handleInitProducerId("some-pid", txnTimeoutMs, initProducerIdMockCallback) + assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_COORDINATOR), result) } @Test @@ -165,7 +165,7 @@ class TransactionCoordinatorTest { EasyMock.replay(transactionManager) coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, partitions, errorsCallback) - assertEquals(Errors.INVALID_PID_MAPPING, error) + assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error) } @Test @@ -299,7 +299,7 @@ class TransactionCoordinatorTest { EasyMock.replay(transactionManager) coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback) - assertEquals(Errors.INVALID_PID_MAPPING, error) + assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error) EasyMock.verify(transactionManager) } @@ -312,7 +312,7 @@ class TransactionCoordinatorTest { EasyMock.replay(transactionManager) coordinator.handleEndTransaction(transactionalId, 0, 0, TransactionResult.COMMIT, errorsCallback) - assertEquals(Errors.INVALID_PID_MAPPING, error) + assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error) EasyMock.verify(transactionManager) } @@ -513,9 +513,9 @@ class TransactionCoordinatorTest { EasyMock.replay(transactionManager, transactionMarkerChannelManager) - coordinator.handleInitPid(transactionalId, txnTimeoutMs, initPidMockCallback) + coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, initProducerIdMockCallback) - assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result) + assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result) EasyMock.verify(transactionManager) } @@ -568,7 +568,7 @@ class TransactionCoordinatorTest { EasyMock.expect(transactionManager.transactionsToExpire()) .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, pid, epoch))) - + EasyMock.replay(transactionManager, transactionMarkerChannelManager) coordinator.startup(false) @@ -589,9 +589,9 @@ class TransactionCoordinatorTest { EasyMock.replay(transactionManager) - coordinator.handleInitPid(transactionalId, 10, initPidMockCallback) + coordinator.handleInitProducerId(transactionalId, 10, initProducerIdMockCallback) - assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result) + assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result) } private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) = { @@ -620,9 +620,9 @@ class TransactionCoordinatorTest { EasyMock.replay(transactionManager) val newTxnTimeoutMs = 10 - coordinator.handleInitPid(transactionalId, newTxnTimeoutMs, initPidMockCallback) + coordinator.handleInitProducerId(transactionalId, newTxnTimeoutMs, initProducerIdMockCallback) - assertEquals(InitPidResult(pid, (epoch + 1).toShort, Errors.NONE), result) + assertEquals(InitProducerIdResult(pid, (epoch + 1).toShort, Errors.NONE), result) assertEquals(newTxnTimeoutMs, metadata.txnTimeoutMs) assertEquals(time.milliseconds(), metadata.txnLastUpdateTimestamp) assertEquals((epoch + 1).toShort, metadata.producerEpoch) @@ -704,7 +704,7 @@ class TransactionCoordinatorTest { completedMetadata } - def initPidMockCallback(ret: InitPidResult): Unit = { + def initProducerIdMockCallback(ret: InitProducerIdResult): Unit = { result = ret } http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 9270544..425b9f1 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -252,7 +252,7 @@ class RequestQuotaTest extends BaseRequestTest { new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava) case ApiKeys.INIT_PRODUCER_ID => - new InitPidRequest.Builder("abc") + new InitProducerIdRequest.Builder("abc") case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochRequest.Builder().add(tp, 0) @@ -353,7 +353,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs - case ApiKeys.INIT_PRODUCER_ID => new InitPidResponse(response).throttleTimeMs + case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs
