Repository: kafka
Updated Branches:
  refs/heads/trunk 1cb39f757 -> a1c8e7d94


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 4b2cedb..d7b1c33 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -73,28 +73,28 @@ private[log] class ProducerAppendInfo(val producerId: Long, 
initialEntry: Produc
   private var coordinatorEpoch = initialEntry.coordinatorEpoch
   private val transactions = ListBuffer.empty[TxnMetadata]
 
-  def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: 
Boolean) =
-    this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
+  def this(producerId: Long, initialEntry: Option[ProducerIdEntry], 
loadingFromLog: Boolean) =
+    this(producerId, initialEntry.getOrElse(ProducerIdEntry.Empty), 
loadingFromLog)
 
-  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int, 
shouldValidateSequenceNumbers: Boolean) = {
-    if (this.producerEpoch > epoch) {
+  private def validateAppend(producerEpoch: Short, firstSeq: Int, lastSeq: 
Int, shouldValidateSequenceNumbers: Boolean) = {
+    if (this.producerEpoch > producerEpoch) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. 
There is probably another producer " +
-        s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} 
(server epoch)")
+        s"with a newer epoch. $producerEpoch (request epoch), 
${this.producerEpoch} (server epoch)")
     } else if (shouldValidateSequenceNumbers) {
-      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
this.producerEpoch < epoch) {
+      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
this.producerEpoch < producerEpoch) {
         if (firstSeq != 0)
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch: $epoch " +
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch: $producerEpoch " +
             s"(request epoch), $firstSeq (seq. number)")
       } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
         // the epoch was bumped by a control record, so we expect the sequence 
number to be reset
-        throw new OutOfOrderSequenceException(s"Out of order sequence number: 
$producerId (pid), found $firstSeq " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producerId $producerId: found $firstSeq " +
           s"(incoming seq. number), but expected 0")
       } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-        throw new DuplicateSequenceNumberException(s"Duplicate sequence 
number: pid: $producerId, (incomingBatch.firstSeq, " +
+        throw new DuplicateSequenceNumberException(s"Duplicate sequence number 
for producerId $producerId: (incomingBatch.firstSeq, " +
           s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), 
(lastEntry.firstSeq, lastEntry.lastSeq): " +
           s"(${this.firstSeq}, ${this.lastSeq}).")
       } else if (firstSeq != this.lastSeq + 1L) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number: 
$producerId (pid), $firstSeq " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producerId $producerId: $firstSeq " +
           s"(incoming seq. number), ${this.lastSeq} (current end sequence 
number)")
       }
     }
@@ -202,25 +202,25 @@ private[log] class ProducerAppendInfo(val producerId: 
Long, initialEntry: Produc
 }
 
 object ProducerStateManager {
-  private val PidSnapshotVersion: Short = 1
+  private val ProducerSnapshotVersion: Short = 1
   private val VersionField = "version"
   private val CrcField = "crc"
-  private val PidField = "pid"
+  private val ProducerIdField = "producer_id"
   private val LastSequenceField = "last_sequence"
   private val ProducerEpochField = "epoch"
   private val LastOffsetField = "last_offset"
   private val OffsetDeltaField = "offset_delta"
   private val TimestampField = "timestamp"
-  private val PidEntriesField = "pid_entries"
+  private val ProducerEntriesField = "producer_entries"
   private val CoordinatorEpochField = "coordinator_epoch"
   private val CurrentTxnFirstOffsetField = "current_txn_first_offset"
 
   private val VersionOffset = 0
   private val CrcOffset = VersionOffset + 2
-  private val PidEntriesOffset = CrcOffset + 4
+  private val ProducerEntriesOffset = CrcOffset + 4
 
-  val PidSnapshotEntrySchema = new Schema(
-    new Field(PidField, Type.INT64, "The producer ID"),
+  val ProducerSnapshotEntrySchema = new Schema(
+    new Field(ProducerIdField, Type.INT64, "The producer ID"),
     new Field(ProducerEpochField, Type.INT16, "Current epoch of the producer"),
     new Field(LastSequenceField, Type.INT32, "Last written sequence of the 
producer"),
     new Field(LastOffsetField, Type.INT64, "Last written offset of the 
producer"),
@@ -231,33 +231,33 @@ object ProducerStateManager {
   val PidSnapshotMapSchema = new Schema(
     new Field(VersionField, Type.INT16, "Version of the snapshot file"),
     new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
-    new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The 
entries in the PID table"))
+    new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), 
"The entries in the producer table"))
 
   def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
     val buffer = Files.readAllBytes(file.toPath)
     val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
 
     val version = struct.getShort(VersionField)
-    if (version != PidSnapshotVersion)
+    if (version != ProducerSnapshotVersion)
       throw new IllegalArgumentException(s"Unhandled snapshot file version 
$version")
 
     val crc = struct.getUnsignedInt(CrcField)
-    val computedCrc =  Crc32C.compute(buffer, PidEntriesOffset, buffer.length 
- PidEntriesOffset)
+    val computedCrc =  Crc32C.compute(buffer, ProducerEntriesOffset, 
buffer.length - ProducerEntriesOffset)
     if (crc != computedCrc)
       throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted 
(CRC is no longer valid). " +
         s"Stored crc: $crc. Computed crc: $computedCrc")
 
-    struct.getArray(PidEntriesField).map { pidEntryObj =>
-      val pidEntryStruct = pidEntryObj.asInstanceOf[Struct]
-      val pid: Long = pidEntryStruct.getLong(PidField)
-      val epoch = pidEntryStruct.getShort(ProducerEpochField)
-      val seq = pidEntryStruct.getInt(LastSequenceField)
-      val offset = pidEntryStruct.getLong(LastOffsetField)
-      val timestamp = pidEntryStruct.getLong(TimestampField)
-      val offsetDelta = pidEntryStruct.getInt(OffsetDeltaField)
-      val coordinatorEpoch = pidEntryStruct.getInt(CoordinatorEpochField)
-      val currentTxnFirstOffset = 
pidEntryStruct.getLong(CurrentTxnFirstOffsetField)
-      val newEntry = ProducerIdEntry(pid, epoch, seq, offset, offsetDelta, 
timestamp,
+    struct.getArray(ProducerEntriesField).map { producerEntryObj =>
+      val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
+      val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
+      val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
+      val seq = producerEntryStruct.getInt(LastSequenceField)
+      val offset = producerEntryStruct.getLong(LastOffsetField)
+      val timestamp = producerEntryStruct.getLong(TimestampField)
+      val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
+      val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
+      val currentTxnFirstOffset = 
producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
+      val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, 
offsetDelta, timestamp,
         coordinatorEpoch, if (currentTxnFirstOffset >= 0) 
Some(currentTxnFirstOffset) else None)
       newEntry
     }
@@ -265,12 +265,12 @@ object ProducerStateManager {
 
   private def writeSnapshot(file: File, entries: mutable.Map[Long, 
ProducerIdEntry]) {
     val struct = new Struct(PidSnapshotMapSchema)
-    struct.set(VersionField, PidSnapshotVersion)
+    struct.set(VersionField, ProducerSnapshotVersion)
     struct.set(CrcField, 0L) // we'll fill this after writing the entries
     val entriesArray = entries.map {
-      case (pid, entry) =>
-        val pidEntryStruct = struct.instance(PidEntriesField)
-        pidEntryStruct.set(PidField, pid)
+      case (producerId, entry) =>
+        val producerEntryStruct = struct.instance(ProducerEntriesField)
+        producerEntryStruct.set(ProducerIdField, producerId)
           .set(ProducerEpochField, entry.producerEpoch)
           .set(LastSequenceField, entry.lastSeq)
           .set(LastOffsetField, entry.lastOffset)
@@ -278,16 +278,16 @@ object ProducerStateManager {
           .set(TimestampField, entry.timestamp)
           .set(CoordinatorEpochField, entry.coordinatorEpoch)
           .set(CurrentTxnFirstOffsetField, 
entry.currentTxnFirstOffset.getOrElse(-1L))
-        pidEntryStruct
+        producerEntryStruct
     }.toArray
-    struct.set(PidEntriesField, entriesArray)
+    struct.set(ProducerEntriesField, entriesArray)
 
     val buffer = ByteBuffer.allocate(struct.sizeOf)
     struct.writeTo(buffer)
     buffer.flip()
 
     // now fill in the CRC
-    val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - 
PidEntriesOffset)
+    val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - 
ProducerEntriesOffset)
     ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
 
     val fos = new FileOutputStream(file)
@@ -404,10 +404,10 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
 
   // visible for testing
   private[log] def loadProducerEntry(entry: ProducerIdEntry): Unit = {
-    val pid = entry.producerId
-    producers.put(pid, entry)
+    val producerId = entry.producerId
+    producers.put(producerId, entry)
     entry.currentTxnFirstOffset.foreach { offset =>
-      ongoingTxns.put(offset, new TxnMetadata(pid, offset))
+      ongoingTxns.put(offset, new TxnMetadata(producerId, offset))
     }
   }
 
@@ -418,7 +418,7 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    * Expire any PIDs which have been idle longer than the configured maximum 
expiration timeout.
    */
   def removeExpiredProducers(currentTimeMs: Long) {
-    producers.retain { case (pid, lastEntry) =>
+    producers.retain { case (producerId, lastEntry) =>
       !isExpired(currentTimeMs, lastEntry)
     }
   }
@@ -496,7 +496,7 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
 
   /**
    * When we remove the head of the log due to retention, we need to clean up 
the id map. This method takes
-   * the new start offset and removes all pids which have a smaller last 
written offset.
+   * the new start offset and removes all producerIds which have a smaller 
last written offset.
    */
   def evictUnretainedProducers(logStartOffset: Long) {
     val evictedProducerEntries = producers.filter(_._2.lastOffset < 
logStartOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index aaa2458..1f8bea5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -32,7 +32,7 @@ import kafka.common.Topic.{GroupMetadataTopicName, 
TransactionStateTopicName, is
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
-import kafka.coordinator.transaction.{InitPidResult, TransactionCoordinator}
+import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.network.RequestChannel.{Response, Session}
@@ -110,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
+        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => 
handleOffsetForLeaderEpochRequest(request)
         case ApiKeys.ADD_PARTITIONS_TO_TXN => 
handleAddPartitionToTxnRequest(request)
         case ApiKeys.ADD_OFFSETS_TO_TXN => 
handleAddOffsetsToTxnRequest(request)
@@ -1386,20 +1386,20 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleInitPidRequest(request: RequestChannel.Request): Unit = {
-    val initPidRequest = request.body[InitPidRequest]
-    val transactionalId = initPidRequest.transactionalId
+  def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
+    val initProducerIdRequest = request.body[InitProducerIdRequest]
+    val transactionalId = initProducerIdRequest.transactionalId
 
     // Send response callback
-    def sendResponseCallback(result: InitPidResult): Unit = {
+    def sendResponseCallback(result: InitProducerIdResult): Unit = {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
-        val responseBody: InitPidResponse = new 
InitPidResponse(throttleTimeMs, result.error, result.pid, result.epoch)
-        trace(s"InitPidRequest: Completed $transactionalId's InitPidRequest 
with result $result from client ${request.header.clientId}.")
+        val responseBody = new InitProducerIdResponse(throttleTimeMs, 
result.error, result.producerId, result.producerEpoch)
+        trace(s"Completed $transactionalId's InitProducerIdRequest with result 
$result from client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
     }
-    txnCoordinator.handleInitPid(transactionalId, 
initPidRequest.transactionTimeoutMs, sendResponseCallback)
+    txnCoordinator.handleInitProducerId(transactionalId, 
initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
@@ -1408,7 +1408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(error: Errors) {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
         val responseBody = new EndTxnResponse(throttleTimeMs, error)
-        trace(s"Completed ${endTxnRequest.transactionalId()}'s EndTxnRequest 
with command: ${endTxnRequest.command()}, errors: $error from client 
${request.header.clientId}.")
+        trace(s"Completed ${endTxnRequest.transactionalId}'s EndTxnRequest 
with command: ${endTxnRequest.command}, errors: $error from client 
${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
@@ -1433,23 +1433,22 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    def sendResponseCallback(pid: Long, result: 
TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
-      errors.put(pid, responseStatus.mapValues(_.error).asJava)
+    def sendResponseCallback(producerId: Long, result: 
TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
+      errors.put(producerId, responseStatus.mapValues(_.error).asJava)
 
       val successfulPartitions = responseStatus.filter { case (_, 
partitionResponse) =>
         partitionResponse.error == Errors.NONE
       }.keys.toSeq
 
       try {
-        groupCoordinator.handleTxnCompletion(producerId = pid, topicPartitions 
= successfulPartitions, transactionResult = result)
+        groupCoordinator.handleTxnCompletion(producerId = producerId, 
topicPartitions = successfulPartitions, transactionResult = result)
       } catch {
         case e: Exception =>
           error(s"Received an exception while trying to update the offsets 
cache on transaction completion: $e")
-          val producerIdErrors = errors.get(pid)
+          val producerIdErrors = errors.get(producerId)
           successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN))
       }
 
-
       if (numAppends.decrementAndGet() == 0)
         sendResponseExemptThrottle(request, new 
RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 690d167..5ee4b12 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -590,7 +590,7 @@ object KafkaConfig {
   /** ********* Transaction management configuration ***********/
   val TransactionalIdExpirationMsDoc = "The maximum amount of time in ms that 
the transaction coordinator will wait before proactively expire a producer's 
transactional id without receiving any transaction status updates from it."
   val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for 
transactions. " +
-    "If a client’s requested transaction time exceed this, then the broker 
will return an error in InitPidRequest. This prevents a client from too large 
of a timeout, which can stall consumers reading from topics included in the 
transaction."
+    "If a client’s requested transaction time exceed this, then the broker 
will return an error in InitProducerIdRequest. This prevents a client from too 
large of a timeout, which can stall consumers reading from topics included in 
the transaction."
   val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + " 
config for the transaction topic."
   val TransactionsLoadBufferSizeDoc = "Batch size for reading from the 
transaction log segments when loading pid and transactions into the cache."
   val TransactionsTopicReplicationFactorDoc = "The replication factor for the 
transaction topic (set higher to ensure availability). " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4e2b11a..c12f774 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -64,7 +64,7 @@ object ZkUtils {
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
-  val PidBlockPath = "/latest_pid_block"
+  val ProducerIdBlockPath = "/latest_pid_block"
   // Important: it is necessary to add any new top level Zookeeper path to the 
Seq
   val SecureZkRootPaths = Seq(AdminPath,
                               BrokersPath,
@@ -75,7 +75,7 @@ object ZkUtils {
                               IsrChangeNotificationPath,
                               KafkaAclPath,
                               KafkaAclChangesPath,
-                              PidBlockPath)
+                              ProducerIdBlockPath)
 
   // Important: it is necessary to add any new top level Zookeeper path that 
contains
   //            sensitive information that should not be world readable to the 
Seq
@@ -239,7 +239,7 @@ class ZkUtils(val zkClient: ZkClient,
                               DeleteTopicsPath,
                               BrokerSequenceIdPath,
                               IsrChangeNotificationPath,
-                              PidBlockPath)
+                              ProducerIdBlockPath)
 
   // Visible for testing
   val zkPath = new ZkPath(zkClient)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 85c631c..b032f8d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -68,22 +68,22 @@ class ProducerIdManagerTest {
     val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
     val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
 
-    val pid1 = manager1.nextPid()
-    val pid2 = manager2.nextPid()
+    val pid1 = manager1.generateProducerId()
+    val pid2 = manager2.generateProducerId()
 
     assertEquals(0, pid1)
     assertEquals(ProducerIdManager.PidBlockSize, pid2)
 
     for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid1 + i, manager1.nextPid())
+      assertEquals(pid1 + i, manager1.generateProducerId())
     }
 
     for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid2 + i, manager2.nextPid())
+      assertEquals(pid2 + i, manager2.generateProducerId())
     }
 
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize, 
manager1.generateProducerId())
+    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, 
manager2.generateProducerId())
   }
 
   @Test(expected = classOf[KafkaException])
@@ -91,7 +91,7 @@ class ProducerIdManagerTest {
     EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
       .andAnswer(new IAnswer[(Option[String], Int)] {
         override def answer(): (Option[String], Int) = {
-          (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
+          
(Some(ProducerIdManager.generateProducerIdBlockJson(ProducerIdBlock(0,
             Long.MaxValue - ProducerIdManager.PidBlockSize,
             Long.MaxValue))), 0)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
index df23952..83cba71 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala
@@ -47,19 +47,19 @@ class TransactionCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
 
     val tc = servers.head.transactionCoordinator
 
-    var initPidResult: InitPidResult = null
-    def callback(result: InitPidResult): Unit = {
-      initPidResult = result
+    var initProducerIdResult: InitProducerIdResult = null
+    def callback(result: InitProducerIdResult): Unit = {
+      initProducerIdResult = result
     }
 
     val txnId = "txn"
-    tc.handleInitPid(txnId, 900000, callback)
+    tc.handleInitProducerId(txnId, 900000, callback)
 
-    while(initPidResult == null) {
+    while(initProducerIdResult == null) {
       Utils.sleep(1)
     }
 
-    Assert.assertEquals(Errors.NONE, initPidResult.error)
+    Assert.assertEquals(Errors.NONE, initProducerIdResult.error)
 
     @volatile var addPartitionErrors: Errors = null
     def addPartitionsCallback(errors: Errors): Unit = {
@@ -67,8 +67,8 @@ class TransactionCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
     }
 
     tc.handleAddPartitionsToTransaction(txnId,
-      initPidResult.pid,
-      initPidResult.epoch,
+      initProducerIdResult.producerId,
+      initProducerIdResult.producerEpoch,
       Set[TopicPartition](new TopicPartition(topic, 0)),
       addPartitionsCallback
     )

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 395bfb9..2f4f572 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -58,11 +58,11 @@ class TransactionCoordinatorTest {
     txnMarkerPurgatory,
     time)
 
-  var result: InitPidResult = _
+  var result: InitProducerIdResult = _
   var error: Errors = Errors.NONE
 
   private def mockPidManager(): Unit = {
-    EasyMock.expect(pidManager.nextPid())
+    EasyMock.expect(pidManager.generateProducerId())
       .andAnswer(new IAnswer[Long] {
         override def answer(): Long = {
           nextPid += 1
@@ -90,10 +90,10 @@ class TransactionCoordinatorTest {
     mockPidManager()
     EasyMock.replay(pidManager)
 
-    coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-    coordinator.handleInitPid("", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId("", txnTimeoutMs, 
initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId("", txnTimeoutMs, 
initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
   }
 
   @Test
@@ -101,10 +101,10 @@ class TransactionCoordinatorTest {
     mockPidManager()
     EasyMock.replay(pidManager)
 
-    coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-    coordinator.handleInitPid(null, txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(null, txnTimeoutMs, 
initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(null, txnTimeoutMs, 
initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
   }
 
   @Test
@@ -143,16 +143,16 @@ class TransactionCoordinatorTest {
       .anyTimes()
     EasyMock.replay(pidManager, transactionManager)
 
-    coordinator.handleInitPid(transactionalId, txnTimeoutMs, 
initPidMockCallback)
-    assertEquals(InitPidResult(nextPid - 1, 0, Errors.NONE), result)
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, 
initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(nextPid - 1, 0, Errors.NONE), result)
   }
 
   @Test
   def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinatorForId(): Unit 
= {
     mockPidManager()
     EasyMock.replay(pidManager)
-    coordinator.handleInitPid("some-pid", txnTimeoutMs, initPidMockCallback)
-    assertEquals(InitPidResult(-1, -1, Errors.NOT_COORDINATOR), result)
+    coordinator.handleInitProducerId("some-pid", txnTimeoutMs, 
initProducerIdMockCallback)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.NOT_COORDINATOR), result)
   }
 
   @Test
@@ -165,7 +165,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 1, 
partitions, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
   }
 
   @Test
@@ -299,7 +299,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleEndTransaction(transactionalId, 0, 0, 
TransactionResult.COMMIT, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
     EasyMock.verify(transactionManager)
   }
 
@@ -312,7 +312,7 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     coordinator.handleEndTransaction(transactionalId, 0, 0, 
TransactionResult.COMMIT, errorsCallback)
-    assertEquals(Errors.INVALID_PID_MAPPING, error)
+    assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, error)
     EasyMock.verify(transactionManager)
   }
 
@@ -513,9 +513,9 @@ class TransactionCoordinatorTest {
 
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
-    coordinator.handleInitPid(transactionalId, txnTimeoutMs, 
initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, 
initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), 
result)
     EasyMock.verify(transactionManager)
   }
 
@@ -568,7 +568,7 @@ class TransactionCoordinatorTest {
 
     EasyMock.expect(transactionManager.transactionsToExpire())
       .andReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, pid, 
epoch)))
-    
+
     EasyMock.replay(transactionManager, transactionMarkerChannelManager)
 
     coordinator.startup(false)
@@ -589,9 +589,9 @@ class TransactionCoordinatorTest {
 
     EasyMock.replay(transactionManager)
 
-    coordinator.handleInitPid(transactionalId, 10, initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, 10, 
initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), 
result)
   }
 
   private def validateIncrementEpochAndUpdateMetadata(state: TransactionState) 
= {
@@ -620,9 +620,9 @@ class TransactionCoordinatorTest {
     EasyMock.replay(transactionManager)
 
     val newTxnTimeoutMs = 10
-    coordinator.handleInitPid(transactionalId, newTxnTimeoutMs, 
initPidMockCallback)
+    coordinator.handleInitProducerId(transactionalId, newTxnTimeoutMs, 
initProducerIdMockCallback)
 
-    assertEquals(InitPidResult(pid, (epoch + 1).toShort, Errors.NONE), result)
+    assertEquals(InitProducerIdResult(pid, (epoch + 1).toShort, Errors.NONE), 
result)
     assertEquals(newTxnTimeoutMs, metadata.txnTimeoutMs)
     assertEquals(time.milliseconds(), metadata.txnLastUpdateTimestamp)
     assertEquals((epoch + 1).toShort, metadata.producerEpoch)
@@ -704,7 +704,7 @@ class TransactionCoordinatorTest {
     completedMetadata
   }
 
-  def initPidMockCallback(ret: InitPidResult): Unit = {
+  def initProducerIdMockCallback(ret: InitProducerIdResult): Unit = {
     result = ret
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1c8e7d9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9270544..425b9f1 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -252,7 +252,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: 
java.lang.Long)).asJava)
 
         case ApiKeys.INIT_PRODUCER_ID =>
-          new InitPidRequest.Builder("abc")
+          new InitProducerIdRequest.Builder("abc")
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
           new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
@@ -353,7 +353,7 @@ class RequestQuotaTest extends BaseRequestTest {
       case ApiKeys.CREATE_TOPICS => new 
CreateTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_TOPICS => new 
DeleteTopicsResponse(response).throttleTimeMs
       case ApiKeys.DELETE_RECORDS => new 
DeleteRecordsResponse(response).throttleTimeMs
-      case ApiKeys.INIT_PRODUCER_ID => new 
InitPidResponse(response).throttleTimeMs
+      case ApiKeys.INIT_PRODUCER_ID => new 
InitProducerIdResponse(response).throttleTimeMs
       case ApiKeys.ADD_PARTITIONS_TO_TXN => new 
AddPartitionsToTxnResponse(response).throttleTimeMs
       case ApiKeys.ADD_OFFSETS_TO_TXN => new 
AddOffsetsToTxnResponse(response).throttleTimeMs
       case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs

Reply via email to