Repository: kafka
Updated Branches:
  refs/heads/trunk 6fb25f080 -> d04b0998c


http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f72df9a..2c843e8 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,10 +30,7 @@ import kafka.message.{ByteBufferMessageSet, 
InvalidMessageException, Message, Me
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, InvalidTimestampException,
-                                        InvalidTopicException, 
NotLeaderForPartitionException, OffsetOutOfRangeException,
-                                        RecordBatchTooLargeException, 
RecordTooLargeException, ReplicaNotAvailableException,
-                                        UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, 
CorruptRecordException, InvalidTimestampException, InvalidTopicException, 
NotLeaderForPartitionException, OffsetOutOfRangeException, 
RecordBatchTooLargeException, RecordTooLargeException, 
ReplicaNotAvailableException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -110,8 +107,7 @@ class ReplicaManager(val config: KafkaConfig,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
                      quotaManager: ReplicationQuotaManager,
-                     threadNamePrefix: Option[String] = None
-                    ) extends Logging with KafkaMetricsGroup {
+                     threadNamePrefix: Option[String] = None) extends Logging 
with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch 
- 1
   private val localBrokerId = config.brokerId
@@ -266,8 +262,8 @@ class ReplicaManager(val config: KafkaConfig,
         val partitions = stopReplicaRequest.partitions.asScala
         controllerEpoch = stopReplicaRequest.controllerEpoch
         // First stop fetchers for all partitions, then stop the corresponding 
replicas
-        replicaFetcherManager.removeFetcherForPartitions(partitions.map(r => 
TopicAndPartition(r.topic, r.partition)))
-        for(topicPartition <- partitions){
+        replicaFetcherManager.removeFetcherForPartitions(partitions)
+        for (topicPartition <- partitions){
           val errorCode = stopReplica(topicPartition.topic, 
topicPartition.partition, stopReplicaRequest.deletePartitions)
           responseMap.put(topicPartition, errorCode)
         }
@@ -460,15 +456,18 @@ class ReplicaManager(val config: KafkaConfig,
   def fetchMessages(timeout: Long,
                     replicaId: Int,
                     fetchMinBytes: Int,
-                    fetchInfo: immutable.Map[TopicAndPartition, 
PartitionFetchInfo],
+                    fetchMaxBytes: Int,
+                    hardMaxBytesLimit: Boolean,
+                    fetchInfos: Seq[(TopicAndPartition, PartitionFetchInfo)],
                     quota: ReplicaQuota = UnboundedQuota,
-                    responseCallback: Map[TopicAndPartition, 
FetchResponsePartitionData] => Unit) {
+                    responseCallback: Seq[(TopicAndPartition, 
FetchResponsePartitionData)] => Unit) {
     val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
 
     // read from local logs
-    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, 
fetchOnlyCommitted, fetchInfo, quota)
+    val logReadResults = readFromLocalLog(fetchOnlyFromLeader, 
fetchOnlyCommitted, fetchMaxBytes, hardMaxBytesLimit,
+      fetchInfos, quota)
 
     // if the fetch comes from the follower,
     // update its corresponding log end offset
@@ -476,29 +475,34 @@ class ReplicaManager(val config: KafkaConfig,
       updateFollowerLogReadResults(replicaId, logReadResults)
 
     // check if this fetch request can be satisfied right away
-    val bytesReadable = 
logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
-    val errorReadingData = logReadResults.values.foldLeft(false) 
((errorIncurred, readResult) =>
+    val logReadResultValues = logReadResults.map { case (_, v) => v }
+    val bytesReadable = 
logReadResultValues.map(_.info.messageSet.sizeInBytes).sum
+    val errorReadingData = logReadResultValues.foldLeft(false) 
((errorIncurred, readResult) =>
       errorIncurred || (readResult.errorCode != Errors.NONE.code))
 
     // respond immediately if 1) fetch request does not want to wait
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
-    if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes 
|| errorReadingData) {
-      val fetchPartitionData = logReadResults.mapValues(result =>
-        FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet))
+    if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes 
|| errorReadingData) {
+      val fetchPartitionData = logReadResults.map { case (tp, result) =>
+        tp -> FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet)
+      }
       responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
       val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, 
result) =>
-        (topicAndPartition, 
FetchPartitionStatus(result.info.fetchOffsetMetadata, 
fetchInfo.get(topicAndPartition).get))
+        val fetchInfo = fetchInfos.collectFirst {
+          case (tp, v) if tp == topicAndPartition => v
+        }.getOrElse(sys.error(s"Partition $topicAndPartition not found in 
fetchInfos"))
+        (topicAndPartition, 
FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
       }
-
-      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, 
fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
-      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota 
,responseCallback)
+      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, 
hardMaxBytesLimit, fetchOnlyFromLeader,
+        fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
+      val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, 
responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.keys.map(new 
TopicPartitionOperationKey(_)).toSeq
+      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new 
TopicPartitionOperationKey(tp) }
 
       // try to complete the request immediately, otherwise put it into the 
purgatory;
       // this is because while the delayed fetch operation is being created, 
new requests
@@ -508,78 +512,99 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Read from a single topic/partition at the given offset upto maxSize bytes
+   * Read from multiple topic partitions at the given offset up to maxSize 
bytes
    */
   def readFromLocalLog(fetchOnlyFromLeader: Boolean,
                        readOnlyCommitted: Boolean,
-                       readPartitionInfo: Map[TopicAndPartition, 
PartitionFetchInfo],
-                       quota: ReplicaQuota): Map[TopicAndPartition, 
LogReadResult] = {
-    readPartitionInfo.map { case (TopicAndPartition(topic, partition), 
PartitionFetchInfo(offset, fetchSize)) =>
+                       fetchMaxBytes: Int,
+                       hardMaxBytesLimit: Boolean,
+                       readPartitionInfo: Seq[(TopicAndPartition, 
PartitionFetchInfo)],
+                       quota: ReplicaQuota): Seq[(TopicAndPartition, 
LogReadResult)] = {
+
+    def read(tp: TopicAndPartition, fetchInfo: PartitionFetchInfo, limitBytes: 
Int, minOneMessage: Boolean): LogReadResult = {
+      val TopicAndPartition(topic, partition) = tp
+      val PartitionFetchInfo(offset, fetchSize) = fetchInfo
+
       BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
       BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
 
-      val partitionDataAndOffsetInfo =
-        try {
-          trace("Fetching log segment for topic %s, partition %d, offset %d, 
size %d".format(topic, partition, offset, fetchSize))
-
-          // decide whether to only fetch from leader
-          val localReplica = if (fetchOnlyFromLeader)
-            getLeaderReplicaIfLocal(topic, partition)
-          else
-            getReplicaOrException(topic, partition)
-
-          // decide whether to only fetch committed data (i.e. messages below 
high watermark)
-          val maxOffsetOpt = if (readOnlyCommitted)
-            Some(localReplica.highWatermark.messageOffset)
-          else
-            None
-
-          /* Read the LogOffsetMetadata prior to performing the read from the 
log.
-           * We use the LogOffsetMetadata to determine if a particular replica 
is in-sync or not.
-           * Using the log end offset after performing the read can lead to a 
race condition
-           * where data gets appended to the log immediately after the replica 
has consumed from it
-           * This can cause a replica to always be out of sync.
-           */
-          val initialLogEndOffset = localReplica.logEndOffset
-
-          val logReadInfo = localReplica.log match {
-            case Some(log) =>
-              val adjustedFetchSize = if (Topic.isInternal(topic) && 
!readOnlyCommitted) Math.max(fetchSize, log.config.maxMessageSize) else 
fetchSize
-
-              //Try the read first, this tells us whether we need all of 
adjustedFetchSize for this partition
-              var fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt)
-
-              //If the partition is marked as throttled, and we are over-quota 
then exclude it
-              if (quota.isThrottled(TopicAndPartition(topic, partition)) && 
quota.isQuotaExceeded)
-                fetch = FetchDataInfo(fetch.fetchOffsetMetadata, 
MessageSet.Empty)
-              fetch
-            case None =>
-              error("Leader for partition [%s,%d] does not have a local 
log".format(topic, partition))
-              FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty)
-          }
+      try {
+        trace(s"Fetching log segment for partition $tp, offset ${offset}, 
partition fetch size ${fetchSize}, " +
+          s"remaining response limit ${limitBytes}" +
+          (if (minOneMessage) s", ignoring response/partition size limits" 
else ""))
 
-          val readToEndOfLog = initialLogEndOffset.messageOffset - 
logReadInfo.fetchOffsetMetadata.messageOffset <= 0
+        // decide whether to only fetch from leader
+        val localReplica = if (fetchOnlyFromLeader)
+          getLeaderReplicaIfLocal(topic, partition)
+        else
+          getReplicaOrException(topic, partition)
 
-          LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, 
fetchSize, readToEndOfLog, None)
-        } catch {
-          // NOTE: Failed fetch requests metric is not incremented for known 
exceptions since it
-          // is supposed to indicate un-expected failure of a broker in 
handling a fetch request
-          case utpe: UnknownTopicOrPartitionException =>
-            
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(utpe))
-          case nle: NotLeaderForPartitionException =>
-            
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(nle))
-          case rnae: ReplicaNotAvailableException =>
-            
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(rnae))
-          case oor : OffsetOutOfRangeException =>
-            
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(oor))
-          case e: Throwable =>
-            
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
-            
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
-            error("Error processing fetch operation on partition [%s,%d] 
offset %d".format(topic, partition, offset), e)
-            
LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(e))
+        // decide whether to only fetch committed data (i.e. messages below 
high watermark)
+        val maxOffsetOpt = if (readOnlyCommitted)
+          Some(localReplica.highWatermark.messageOffset)
+        else
+          None
+
+        /* Read the LogOffsetMetadata prior to performing the read from the 
log.
+         * We use the LogOffsetMetadata to determine if a particular replica 
is in-sync or not.
+         * Using the log end offset after performing the read can lead to a 
race condition
+         * where data gets appended to the log immediately after the replica 
has consumed from it
+         * This can cause a replica to always be out of sync.
+         */
+        val initialLogEndOffset = localReplica.logEndOffset
+        val logReadInfo = localReplica.log match {
+          case Some(log) =>
+            val adjustedFetchSize = math.min(fetchSize, limitBytes)
+
+            // Try the read first, this tells us whether we need all of 
adjustedFetchSize for this partition
+            val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, 
minOneMessage)
+
+            // If the partition is marked as throttled, and we are over-quota 
then exclude it
+            if (quota.isThrottled(tp) && quota.isQuotaExceeded)
+              FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
+            // For FetchRequest version 3, we replace incomplete message sets 
with an empty one as consumers can make
+            // progress in such cases and don't need to report a 
`RecordTooLargeException`
+            else if (!hardMaxBytesLimit && fetch.firstMessageSetIncomplete)
+              FetchDataInfo(fetch.fetchOffsetMetadata, MessageSet.Empty)
+            else fetch
+
+          case None =>
+            error(s"Leader for partition $tp does not have a local log")
+            FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty)
         }
-      (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
+
+        val readToEndOfLog = initialLogEndOffset.messageOffset - 
logReadInfo.fetchOffsetMetadata.messageOffset <= 0
+
+        LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, 
fetchSize, readToEndOfLog, None)
+      } catch {
+        // NOTE: Failed fetch requests metric is not incremented for known 
exceptions since it
+        // is supposed to indicate un-expected failure of a broker in handling 
a fetch request
+        case e@ (_: UnknownTopicOrPartitionException |
+                 _: NotLeaderForPartitionException |
+                 _: ReplicaNotAvailableException |
+                 _: OffsetOutOfRangeException) =>
+          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(e))
+        case e: Throwable =>
+          
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+          
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
+          error(s"Error processing fetch operation on partition ${tp}, offset 
$offset", e)
+          LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, 
MessageSet.Empty), -1L, fetchSize, false, Some(e))
+      }
+    }
+
+    var limitBytes = fetchMaxBytes
+    val result = new mutable.ArrayBuffer[(TopicAndPartition, LogReadResult)]
+    var minOneMessage = !hardMaxBytesLimit
+    readPartitionInfo.foreach { case (tp, fetchInfo) =>
+      val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
+      val messageSetSize = readResult.info.messageSet.sizeInBytes
+      // Once we read from a non-empty partition, we stop ignoring request and 
partition level size limits
+      if (messageSetSize > 0)
+        minOneMessage = false
+      limitBytes = math.max(0, limitBytes - messageSetSize)
+      result += (tp -> readResult)
     }
+    result
   }
 
   def getMessageFormatVersion(topicAndPartition: TopicAndPartition): 
Option[Byte] =
@@ -709,7 +734,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     try {
       // First stop fetchers for all the partitions
-      
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new 
TopicAndPartition(_)))
+      
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(p => 
new TopicPartition(p.topic, p.partitionId)))
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         if (partition.makeLeader(controllerId, partitionStateInfo, 
correlationId))
@@ -808,7 +833,7 @@ class ReplicaManager(val config: KafkaConfig,
         }
       }
 
-      
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new
 TopicAndPartition(_)))
+      
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(p 
=> new TopicPartition(p.topic, p.partitionId)))
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(("Broker %d stopped fetchers as part of 
become-follower request from controller " +
           "%d epoch %d with correlation id %d for partition %s")
@@ -838,7 +863,7 @@ class ReplicaManager(val config: KafkaConfig,
       else {
         // we do not need to check if the leader exists again since this has 
been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map(partition =>
-          new TopicAndPartition(partition) -> BrokerAndInitialOffset(
+          new TopicPartition(partition.topic, partition.partitionId) -> 
BrokerAndInitialOffset(
             metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
             partition.getReplica().get.logEndOffset.messageOffset)).toMap
         
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
@@ -872,7 +897,7 @@ class ReplicaManager(val config: KafkaConfig,
     allPartitions.values.foreach(partition => 
partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
   }
 
-  private def updateFollowerLogReadResults(replicaId: Int, readResults: 
Map[TopicAndPartition, LogReadResult]) {
+  private def updateFollowerLogReadResults(replicaId: Int, readResults: 
Seq[(TopicAndPartition, LogReadResult)]) {
     debug("Recording follower broker %d log read results: %s 
".format(replicaId, readResults))
     readResults.foreach { case (topicAndPartition, readResult) =>
       getPartition(topicAndPartition.topic, topicAndPartition.partition) match 
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1a5f187..6d3b098 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -13,6 +13,7 @@
 package kafka.api
 
 import java.nio.ByteBuffer
+import java.util
 import java.util.concurrent.ExecutionException
 import java.util.{ArrayList, Collections, Properties}
 
@@ -176,7 +177,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createFetchRequest = {
-    new requests.FetchRequest(5000, 100, Map(tp -> new 
requests.FetchRequest.PartitionData(0, 100)).asJava)
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
+    partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100))
+    new requests.FetchRequest(5000, 100, Int.MaxValue, partitionMap)
   }
 
   private def createListOffsetsRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index c13bf58..102b7cf 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,16 +19,15 @@ import org.apache.kafka.clients.producer.{ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import kafka.utils.{TestUtils, Logging, ShutdownableThread}
+import kafka.utils.{Logging, ShutdownableThread, TestUtils}
 import kafka.common.Topic
 import kafka.server.KafkaConfig
-import java.util.ArrayList
 
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.Buffer
+import scala.collection.mutable.{ArrayBuffer, Buffer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.errors.WakeupException
 
@@ -134,19 +133,22 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
     }
   }
 
-  protected def sendRecords(numRecords: Int): Unit = {
+  protected def sendRecords(numRecords: Int): Seq[ProducerRecord[Array[Byte], 
Array[Byte]]] =
     sendRecords(numRecords, tp)
-  }
 
-  protected def sendRecords(numRecords: Int, tp: TopicPartition) {
+  protected def sendRecords(numRecords: Int, tp: TopicPartition): 
Seq[ProducerRecord[Array[Byte], Array[Byte]]] =
     sendRecords(this.producers.head, numRecords, tp)
-  }
 
-  protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int, tp: TopicPartition) {
-    (0 until numRecords).foreach { i =>
-      producer.send(new ProducerRecord(tp.topic(), tp.partition(), i.toLong, 
s"key $i".getBytes, s"value $i".getBytes))
+  protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int,
+                            tp: TopicPartition): 
Seq[ProducerRecord[Array[Byte], Array[Byte]]] = {
+    val records = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, 
s"key $i".getBytes, s"value $i".getBytes)
+      producer.send(record)
+      record
     }
     producer.flush()
+
+    records
   }
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], 
Array[Byte]],
@@ -160,7 +162,7 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
     val records = consumeRecords(consumer, numRecords, maxPollRecords = 
maxPollRecords)
     val now = System.currentTimeMillis()
     for (i <- 0 until numRecords) {
-      val record = records.get(i)
+      val record = records(i)
       val offset = startingOffset + i
       assertEquals(tp.topic, record.topic)
       assertEquals(tp.partition, record.partition)
@@ -183,15 +185,15 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
 
   protected def consumeRecords[K, V](consumer: Consumer[K, V],
                                      numRecords: Int,
-                                     maxPollRecords: Int = Int.MaxValue): 
ArrayList[ConsumerRecord[K, V]] = {
-    val records = new ArrayList[ConsumerRecord[K, V]]
+                                     maxPollRecords: Int = Int.MaxValue): 
ArrayBuffer[ConsumerRecord[K, V]] = {
+    val records = new ArrayBuffer[ConsumerRecord[K, V]]
     val maxIters = numRecords * 300
     var iters = 0
     while (records.size < numRecords) {
       val polledRecords = consumer.poll(50).asScala
       assertTrue(polledRecords.size <= maxPollRecords)
       for (record <- polledRecords)
-        records.add(record)
+        records += record
       if (iters > maxIters)
         throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
       iters += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6c10632..b449a69 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,7 +15,6 @@ package kafka.api
 
 import java.util
 import java.util.Properties
-
 import java.util.regex.Pattern
 
 import kafka.log.LogConfig
@@ -23,11 +22,11 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer, ByteArraySerializer}
-import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
+import org.apache.kafka.common.serialization.{ByteArraySerializer, 
StringDeserializer, StringSerializer}
+import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidTopicException, 
RecordTooLargeException}
+import org.apache.kafka.common.errors.{InvalidTopicException}
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.Assert._
@@ -539,26 +538,124 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testFetchRecordTooLarge() {
+  def testFetchRecordLargerThanFetchMaxBytes() {
     val maxFetchBytes = 10 * 1024
-    
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 maxFetchBytes.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
maxFetchBytes.toString)
+    checkLargeRecord(maxFetchBytes + 1)
+  }
+
+  private def checkLargeRecord(producerRecordSize: Int): Unit = {
     val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
     consumers += consumer0
 
     // produce a record that is larger than the configured fetch size
-    val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), 
tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
+    val record = new ProducerRecord(tp.topic(), tp.partition(), "key".getBytes,
+      new Array[Byte](producerRecordSize))
     this.producers.head.send(record)
 
-    // consuming a too-large record should fail
+    // consuming a record that is too large should succeed since KIP-74
     consumer0.assign(List(tp).asJava)
-    val e = intercept[RecordTooLargeException] {
-      consumer0.poll(20000)
+    val records = consumer0.poll(20000)
+    assertEquals(1, records.count)
+    val consumerRecord = records.iterator().next()
+    assertEquals(0L, consumerRecord.offset)
+    assertEquals(tp.topic(), consumerRecord.topic())
+    assertEquals(tp.partition(), consumerRecord.partition())
+    assertArrayEquals(record.key(), consumerRecord.key())
+    assertArrayEquals(record.value(), consumerRecord.value())
+  }
+
+  /** We should only return a large record if it's the first record in the 
first non-empty partition of the fetch request */
+  @Test
+  def testFetchHonoursFetchSizeIfLargeRecordNotFirst(): Unit = {
+    val maxFetchBytes = 10 * 1024
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
maxFetchBytes.toString)
+    checkFetchHonoursSizeIfLargeRecordNotFirst(maxFetchBytes)
+  }
+
+  private def 
checkFetchHonoursSizeIfLargeRecordNotFirst(largeProducerRecordSize: Int): Unit 
= {
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
+
+    val smallRecord = new ProducerRecord(tp.topic(), tp.partition(), 
"small".getBytes,
+      "value".getBytes)
+    val largeRecord = new ProducerRecord(tp.topic(), tp.partition(), 
"large".getBytes,
+      new Array[Byte](largeProducerRecordSize))
+    this.producers.head.send(smallRecord)
+    this.producers.head.send(largeRecord)
+
+    // we should only get the small record in the first `poll`
+    consumer0.assign(List(tp).asJava)
+    val records = consumer0.poll(20000)
+    assertEquals(1, records.count)
+    val consumerRecord = records.iterator().next()
+    assertEquals(0L, consumerRecord.offset)
+    assertEquals(tp.topic(), consumerRecord.topic())
+    assertEquals(tp.partition(), consumerRecord.partition())
+    assertArrayEquals(smallRecord.key(), consumerRecord.key())
+    assertArrayEquals(smallRecord.value(), consumerRecord.value())
+  }
+
+  /** We should only return a large record if it's the first record in the 
first partition of the fetch request */
+  @Test
+  def testFetchHonoursMaxPartitionFetchBytesIfLargeRecordNotFirst(): Unit = {
+    val maxPartitionFetchBytes = 10 * 1024
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 maxPartitionFetchBytes.toString)
+    checkFetchHonoursSizeIfLargeRecordNotFirst(maxPartitionFetchBytes)
+  }
+
+  @Test
+  def testFetchRecordLargerThanMaxPartitionFetchBytes() {
+    val maxPartitionFetchBytes = 10 * 1024
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 maxPartitionFetchBytes.toString)
+    checkLargeRecord(maxPartitionFetchBytes + 1)
+  }
+
+  /** Test that we consume all partitions if fetch max bytes and 
max.partition.fetch.bytes are low */
+  @Test
+  def testLowMaxFetchSizeForRequestAndPartition(): Unit = {
+    // one of the effects of this is that there will be some log reads where 
`0 > remaining limit bytes < message size`
+    // and we don't return the message because it's not the first message in 
the first non-empty partition of the fetch
+    // this behaves a little different than when remaining limit bytes is 0 
and it's important to test it
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
"500")
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 "100")
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
+
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+    val topic3 = "topic3"
+    val partitionCount = 30
+    val topics = Seq(topic1, topic2, topic3)
+    topics.foreach { topicName =>
+      TestUtils.createTopic(zkUtils, topicName, partitionCount, serverCount, 
servers)
     }
-    val oversizedPartitions = e.recordTooLargePartitions()
-    assertNotNull(oversizedPartitions)
-    assertEquals(1, oversizedPartitions.size)
-    // the oversized message is at offset 0
-    assertEquals(0L, oversizedPartitions.get(tp))
+
+    val partitions = topics.flatMap { topic =>
+      (0 until partitionCount).map(new TopicPartition(topic, _))
+    }
+
+    assertEquals(0, consumer0.assignment().size)
+
+    consumer0.subscribe(List(topic1, topic2, topic3).asJava)
+
+    TestUtils.waitUntilTrue(() => {
+      consumer0.poll(50)
+      consumer0.assignment() == partitions.toSet.asJava
+    }, s"Expected partitions ${partitions.asJava} but actually got 
${consumer0.assignment}")
+
+    val producerRecords = partitions.flatMap(sendRecords(partitionCount, _))
+    val consumerRecords = consumeRecords(consumer0, producerRecords.size)
+
+    val expected = producerRecords.map { record =>
+      (record.topic, record.partition, new String(record.key), new 
String(record.value), record.timestamp)
+    }.toSet
+
+    val actual = consumerRecords.map { record =>
+      (record.topic, record.partition, new String(record.key), new 
String(record.value), record.timestamp)
+    }.toSet
+
+    assertEquals(expected, actual)
   }
 
   @Test
@@ -712,7 +809,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // consume and verify that values are modified by interceptors
     val records = consumeRecords(testConsumer, numRecords)
     for (i <- 0 until numRecords) {
-      val record = records.get(i)
+      val record = records(i)
       assertEquals(s"key $i", new String(record.key()))
       assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new 
String(record.value()))
     }
@@ -796,7 +893,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
     producerProps.put("mock.interceptor.append", appendStr)
-    val testProducer = new KafkaProducer[Array[Byte], 
Array[Byte]](producerProps, new ByteArraySerializer(), new 
ByteArraySerializer())
+    val testProducer = new KafkaProducer(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer())
     producers += testProducer
 
     // producing records should succeed
@@ -812,7 +909,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // consume and verify that values are not modified by interceptors -- 
their exceptions are caught and logged, but not propagated
     val records = consumeRecords(testConsumer, 1)
-    val record = records.get(0)
+    val record = records.head
     assertEquals(s"value will not be modified", new String(record.value()))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index c8fcba6..457a909 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -18,14 +18,14 @@
 package kafka.api
 
 import java.util.concurrent.{ExecutionException, TimeoutException}
-import java.util.{Properties}
+import java.util.Properties
+
 import kafka.common.Topic
-import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig
 import kafka.server.KafkaConfig
-import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.{TestUtils}
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.errors.{InvalidTopicException, 
NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -33,12 +33,16 @@ import org.junit.{After, Before, Test}
 class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
   private val serverMessageMaxBytes =  producerBufferSize/2
+  private val replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200
+  private val replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 
200
 
   val numServers = 2
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
   overridingProps.put(KafkaConfig.MessageMaxBytesProp, 
serverMessageMaxBytes.toString)
+  overridingProps.put(KafkaConfig.ReplicaFetchMaxBytesProp, 
replicaFetchMaxPartitionBytes.toString)
+  overridingProps.put(KafkaConfig.ReplicaFetchResponseMaxBytesDoc, 
replicaFetchMaxResponseBytes.toString)
   // Set a smaller value for the number of partitions for the offset commit 
topic (__consumer_offset topic)
   // so that the creation of that topic/partition(s) and subsequent leader 
assignment doesn't take relatively long
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
@@ -46,13 +50,10 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   def generateConfigs() =
     TestUtils.createBrokerConfigs(numServers, zkConnect, 
false).map(KafkaConfig.fromProps(_, overridingProps))
 
-  private var consumer1: SimpleConsumer = null
-  private var consumer2: SimpleConsumer = null
-
-  private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
-  private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = null
+  private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = null
+  private var producer3: KafkaProducer[Array[Byte], Array[Byte]] = null
+  private var producer4: KafkaProducer[Array[Byte], Array[Byte]] = null
 
   private val topic1 = "topic-1"
   private val topic2 = "topic-2"
@@ -88,7 +89,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // send a too-large record
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, 
"key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
+    val record = new ProducerRecord(topic1, null, "key".getBytes, new 
Array[Byte](serverMessageMaxBytes + 1))
     assertEquals("Returned metadata should have offset -1", 
producer1.send(record).get.offset, -1L)
   }
 
@@ -101,19 +102,48 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // send a too-large record
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, 
"key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
+    val record = new ProducerRecord(topic1, null, "key".getBytes, new 
Array[Byte](serverMessageMaxBytes + 1))
     intercept[ExecutionException] {
       producer2.send(record).get
     }
   }
 
+  private def checkTooLargeRecordForReplicationWithAckAll(maxFetchSize: Int) {
+    val maxMessageSize = maxFetchSize + 100
+    val topicConfig = new Properties
+    topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 
numServers.toString)
+    topicConfig.setProperty(LogConfig.MaxMessageBytesProp, 
maxMessageSize.toString)
+
+    // create topic
+    val topic10 = "topic10"
+    TestUtils.createTopic(zkUtils, topic10, servers.size, numServers, servers, 
topicConfig)
+
+    // send a record that is too large for replication, but within the broker 
max message limit
+    val record = new ProducerRecord(topic10, null, "key".getBytes, new 
Array[Byte](maxMessageSize - 50))
+    val recordMetadata = producer3.send(record).get
+
+    assertEquals(topic10, recordMetadata.topic())
+  }
+
+  /** This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74 */
+  @Test
+  def testPartitionTooLargeForReplicationWithAckAll() {
+    checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxPartitionBytes)
+  }
+
+  /** This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74 */
+  @Test
+  def testResponseTooLargeForReplicationWithAckAll() {
+    checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxResponseBytes)
+  }
+
   /**
    * With non-exist-topic the future metadata should return ExecutionException 
caused by TimeoutException
    */
   @Test
   def testNonExistentTopic() {
     // send a record with non-exist topic
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic2, null, 
"key".getBytes, "value".getBytes)
+    val record = new ProducerRecord(topic2, null, "key".getBytes, 
"value".getBytes)
     intercept[ExecutionException] {
       producer1.send(record).get
     }
@@ -138,7 +168,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", 
acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
 
     // send a record with incorrect broker list
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, 
"key".getBytes, "value".getBytes)
+    val record = new ProducerRecord(topic1, null, "key".getBytes, 
"value".getBytes)
     intercept[ExecutionException] {
       producer4.send(record).get
     }
@@ -212,7 +242,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
 
     TestUtils.createTopic(zkUtils, topicName, 1, numServers, servers, 
topicProps)
 
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, 
"key".getBytes, "value".getBytes)
+    val record = new ProducerRecord(topicName, null, "key".getBytes, 
"value".getBytes)
     try {
       producer3.send(record).get
       fail("Expected exception when producing to topic with fewer brokers than 
min.insync.replicas")
@@ -228,11 +258,11 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   def testNotEnoughReplicasAfterBrokerShutdown() {
     val topicName = "minisrtest2"
     val topicProps = new Properties()
-    topicProps.put("min.insync.replicas",numServers.toString)
+    topicProps.put("min.insync.replicas", numServers.toString)
 
     TestUtils.createTopic(zkUtils, topicName, 1, numServers, 
servers,topicProps)
 
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, 
"key".getBytes, "value".getBytes)
+    val record = new ProducerRecord(topicName, null, "key".getBytes, 
"value".getBytes)
     // this should work with all brokers up and running
     producer3.send(record).get
 
@@ -256,32 +286,4 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     servers.head.startup()
   }
 
-  private class ProducerScheduler extends 
ShutdownableThread("daemon-producer", false)
-  {
-    val numRecords = 1000
-    var sent = 0
-    var failed = false
-
-    val producer = TestUtils.createNewProducer(brokerList, bufferSize = 
producerBufferSize, retries = 10)
-
-    override def doWork(): Unit = {
-      val responses =
-        for (i <- sent+1 to sent+numRecords)
-        yield producer.send(new 
ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, 
i.toString.getBytes),
-                            new ErrorLoggingCallback(topic1, null, null, true))
-      val futures = responses.toList
-
-      try {
-        futures.map(_.get)
-        sent += numRecords
-      } catch {
-        case e : Exception => failed = true
-      }
-    }
-
-    override def shutdown(){
-      super.shutdown()
-      producer.close()
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index ca9dac4..f8fbae7 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -127,11 +127,11 @@ object SerializationTestUtils {
     ), ProducerRequest.CurrentVersion, 100)
 
   def createTestFetchRequest: FetchRequest = {
-    new FetchRequest(requestInfo = requestInfos)
+    new FetchRequest(requestInfo = requestInfos.toVector)
   }
 
   def createTestFetchResponse: FetchResponse = {
-    FetchResponse(1, topicDataFetchResponse)
+    FetchResponse(1, topicDataFetchResponse.toVector)
   }
 
   def createTestOffsetRequest = new OffsetRequest(
@@ -267,11 +267,11 @@ class RequestResponseSerializationTest extends JUnitSuite 
{
   def testFetchResponseVersion() {
     val oldClientResponse = FetchResponse(1, Map(
       TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = 
new ByteBufferMessageSet(new Message("first message".getBytes)))
-    ), 0)
+    ).toVector, 0)
 
     val newClientResponse = FetchResponse(1, Map(
       TopicAndPartition("t1", 0) -> new FetchResponsePartitionData(messages = 
new ByteBufferMessageSet(new Message("first message".getBytes)))
-    ), 1, 100)
+    ).toVector, 1, 100)
 
     // new response should have 4 bytes more than the old response since 
delayTime is an INT32
     assertEquals(oldClientResponse.sizeInBytes + 4, 
newClientResponse.sizeInBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 140f615..3998a21 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -60,7 +60,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
   @Test
   def testEmptyFetchRequest() {
     val partitionRequests = immutable.Map[TopicAndPartition, 
PartitionFetchInfo]()
-    val request = new FetchRequest(requestInfo = partitionRequests)
+    val request = new FetchRequest(requestInfo = partitionRequests.toVector)
     val fetched = consumer.fetch(request)
     assertTrue(!fetched.hasError && fetched.data.isEmpty)
   }
@@ -149,7 +149,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.values.foreach(pdata => 
ErrorMapping.maybeThrowException(pdata.error))
+        response.data.foreach(pdata => 
ErrorMapping.maybeThrowException(pdata._2.error))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => // This is good.
@@ -165,7 +165,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        response.data.values.foreach(pdata => 
ErrorMapping.maybeThrowException(pdata.error))
+        response.data.foreach(pdata => 
ErrorMapping.maybeThrowException(pdata._2.error))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => // This is good.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala 
b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index 82496f2..8702474 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -108,24 +108,28 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
     // append a new message with a high offset
     val lastMessage = new Message("test".getBytes)
     messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new 
LongRef(50), lastMessage))
+    val messages = messageSet.toSeq
     var position = 0
-    assertEquals("Should be able to find the first message by its offset", 
-                 OffsetPosition(0L, position), 
-                 messageSet.searchForOffset(0, 0))
-    position += MessageSet.entrySize(messageSet.head.message)
+    val message1Size = MessageSet.entrySize(messages.head.message)
+    assertEquals("Should be able to find the first message by its offset",
+                 (OffsetPosition(0L, position), message1Size),
+                 messageSet.searchForOffsetWithSize(0, 0))
+    position += message1Size
+    val message2Size = MessageSet.entrySize(messages(1).message)
     assertEquals("Should be able to find second message when starting from 0", 
-                 OffsetPosition(1L, position), 
-                 messageSet.searchForOffset(1, 0))
+                 (OffsetPosition(1L, position), message2Size),
+                 messageSet.searchForOffsetWithSize(1, 0))
     assertEquals("Should be able to find second message starting from its 
offset", 
-                 OffsetPosition(1L, position), 
-                 messageSet.searchForOffset(1, position))
-    position += MessageSet.entrySize(messageSet.tail.head.message) + 
MessageSet.entrySize(messageSet.tail.tail.head.message)
+                 (OffsetPosition(1L, position), message2Size),
+                 messageSet.searchForOffsetWithSize(1, position))
+    position += message2Size + MessageSet.entrySize(messages(2).message)
+    val message4Size = MessageSet.entrySize(messages(3).message)
     assertEquals("Should be able to find fourth message from a non-existant 
offset", 
-                 OffsetPosition(50L, position), 
-                 messageSet.searchForOffset(3, position))
-    assertEquals("Should be able to find fourth message by correct offset", 
-                 OffsetPosition(50L, position), 
-                 messageSet.searchForOffset(50,  position))
+                 (OffsetPosition(50L, position), message4Size),
+                 messageSet.searchForOffsetWithSize(3, position))
+    assertEquals("Should be able to find fourth message by correct offset",
+                 (OffsetPosition(50L, position), message4Size),
+                 messageSet.searchForOffsetWithSize(50,  position))
   }
   
   /**
@@ -134,7 +138,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testIteratorWithLimits() {
     val message = messageSet.toList(1)
-    val start = messageSet.searchForOffset(1, 0).position
+    val start = messageSet.searchForOffsetWithSize(1, 0)._1.position
     val size = message.message.size + 12
     val slice = messageSet.read(start, size)
     assertEquals(List(message), slice.toList)
@@ -148,7 +152,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testTruncate() {
     val message = messageSet.toList.head
-    val end = messageSet.searchForOffset(1, 0).position
+    val end = messageSet.searchForOffsetWithSize(1, 0)._1.position
     messageSet.truncateTo(end)
     assertEquals(List(message), messageSet.toList)
     assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
@@ -272,7 +276,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testFormatConversionWithPartialMessage() {
     val message = messageSet.toList(1)
-    val start = messageSet.searchForOffset(1, 0).position
+    val start = messageSet.searchForOffsetWithSize(1, 0)._1.position
     val size = message.message.size + 12
     val slice = messageSet.read(start, size - 1)
     val messageV0 = slice.toMessageFormat(Message.MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 64140e8..49feebd 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -284,7 +284,7 @@ class LogSegmentTest {
         seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the 
way to the end
-      val position = seg.log.searchForOffset(offsetToBeginCorruption, 
0).position + TestUtils.random.nextInt(15)
+      val position = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 
0)._1.position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, 
seg.log.file.length.toInt - position)
       seg.recover(64*1024)
       assertEquals("Should have truncated off bad messages.", (0 until 
offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4935aae..e496853 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -246,6 +246,64 @@ class LogTest extends JUnitSuite {
     assertEquals("A read should now return the last message in the log", 
log.logEndOffset - 1, log.read(1, 200, None).messageSet.head.offset)
   }
 
+  @Test
+  def testReadWithMinMessage() {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, 
time.scheduler, time = time)
+    val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
+    val messages = messageIds.map(id => new Message(id.toString.getBytes))
+
+    // now test the case that we give the offsets and use non-sequential 
offsets
+    for (i <- 0 until messages.length)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, new 
LongRef(messageIds(i)), messages = messages(i)),
+        assignOffsets = false)
+
+    for (i <- 50 until messageIds.max) {
+      val idx = messageIds.indexWhere(_ >= i)
+      val reads = Seq(
+        log.read(i, 1, minOneMessage = true),
+        log.read(i, 100, minOneMessage = true),
+        log.read(i, 100, Some(10000), minOneMessage = true)
+      ).map(_.messageSet.head)
+      reads.foreach { read =>
+        assertEquals("Offset read should match message id.", messageIds(idx), 
read.offset)
+        assertEquals("Message should match appended.", messages(idx), 
read.message)
+      }
+
+      assertEquals(Seq.empty, log.read(i, 1, Some(1), minOneMessage = 
true).messageSet.toIndexedSeq)
+    }
+
+  }
+
+  @Test
+  def testReadWithTooSmallMaxLength() {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, 
time.scheduler, time = time)
+    val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
+    val messages = messageIds.map(id => new Message(id.toString.getBytes))
+
+    // now test the case that we give the offsets and use non-sequential 
offsets
+    for (i <- 0 until messages.length)
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, new 
LongRef(messageIds(i)), messages = messages(i)),
+        assignOffsets = false)
+
+    for (i <- 50 until messageIds.max) {
+      assertEquals(MessageSet.Empty, log.read(i, 0).messageSet)
+
+      // we return an incomplete message instead of an empty one for the case 
below
+      // we use this mechanism to tell consumers of the fetch request version 
2 and below that the message size is
+      // larger than the fetch size
+      // in fetch request version 3, we no longer need this as we return 
oversized messages from the first non-empty
+      // partition
+      val fetchInfo = log.read(i, 1)
+      assertTrue(fetchInfo.firstMessageSetIncomplete)
+      assertTrue(fetchInfo.messageSet.isInstanceOf[FileMessageSet])
+      assertEquals(1, fetchInfo.messageSet.sizeInBytes)
+    }
+  }
+
   /**
    * Test reading at the boundary of the log, specifically
    * - reading from the logEndOffset should give an empty message set

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 9edd3c0..1cd2496 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -19,10 +19,10 @@ package kafka.server
 
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
-import kafka.common.TopicAndPartition
 import kafka.message.ByteBufferMessageSet
 import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
@@ -40,7 +40,7 @@ class AbstractFetcherThreadTest {
 
   @Test
   def testMetricsRemovedOnShutdown() {
-    val partition = new TopicAndPartition("topic", 0)
+    val partition = new TopicPartition("topic", 0)
     val fetcherThread = new DummyFetcherThread("dummy", "client", new 
BrokerEndPoint(0, "localhost", 9092))
 
     fetcherThread.start()
@@ -61,7 +61,7 @@ class AbstractFetcherThreadTest {
 
   @Test
   def testConsumerLagRemovedWithPartition() {
-    val partition = new TopicAndPartition("topic", 0)
+    val partition = new TopicPartition("topic", 0)
     val fetcherThread = new DummyFetcherThread("dummy", "client", new 
BrokerEndPoint(0, "localhost", 9092))
 
     fetcherThread.start()
@@ -84,10 +84,10 @@ class AbstractFetcherThreadTest {
 
   private def allMetricsNames = 
Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
 
-  class DummyFetchRequest(val offsets: collection.Map[TopicAndPartition, 
Long]) extends FetchRequest {
+  class DummyFetchRequest(val offsets: collection.Map[TopicPartition, Long]) 
extends FetchRequest {
     override def isEmpty: Boolean = offsets.isEmpty
 
-    override def offset(topicAndPartition: TopicAndPartition): Long = 
offsets(topicAndPartition)
+    override def offset(topicAndPartition: TopicPartition): Long = 
offsets(topicAndPartition)
   }
 
   class DummyPartitionData extends PartitionData {
@@ -108,21 +108,19 @@ class AbstractFetcherThreadTest {
     type REQ = DummyFetchRequest
     type PD = PartitionData
 
-    override def processPartitionData(topicAndPartition: TopicAndPartition,
+    override def processPartitionData(topicAndPartition: TopicPartition,
                                       fetchOffset: Long,
                                       partitionData: PartitionData): Unit = {}
 
-    override def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): 
Long = 0L
+    override def handleOffsetOutOfRange(topicAndPartition: TopicPartition): 
Long = 0L
 
-    override def handlePartitionsWithErrors(partitions: 
Iterable[TopicAndPartition]): Unit = {}
+    override def handlePartitionsWithErrors(partitions: 
Iterable[TopicPartition]): Unit = {}
 
-    override protected def fetch(fetchRequest: DummyFetchRequest): 
collection.Map[TopicAndPartition, DummyPartitionData] = {
-      fetchRequest.offsets.mapValues(_ => new DummyPartitionData)
-    }
+    override protected def fetch(fetchRequest: DummyFetchRequest): 
Seq[(TopicPartition, DummyPartitionData)] =
+      fetchRequest.offsets.mapValues(_ => new DummyPartitionData).toSeq
 
-    override protected def buildFetchRequest(partitionMap: 
collection.Map[TopicAndPartition, PartitionFetchState]): DummyFetchRequest = {
-      new DummyFetchRequest(partitionMap.mapValues(_.offset))
-    }
+    override protected def buildFetchRequest(partitionMap: 
collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
+      new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) 
}.toMap)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
new file mode 100644
index 0000000..bb0e060
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -0,0 +1,233 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.record.{LogEntry, MemoryRecords}
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+/**
+  * Subclasses of `BaseConsumerTest` exercise the consumer and fetch 
request/response. This class
+  * complements those classes with tests that require lower-level access to 
the protocol.
+  */
+class FetchRequestTest extends BaseRequestTest {
+
+  private var producer: KafkaProducer[String, String] = null
+
+  override def setUp() {
+    super.setUp()
+    producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new 
StringSerializer)
+  }
+
+  override def tearDown() {
+    producer.close()
+    super.tearDown()
+  }
+
+
+  private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: 
Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long] = 
Map.empty): FetchRequest =
+    new FetchRequest(Int.MaxValue, 0, maxResponseBytes, 
createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap))
+
+  private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: 
Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long] = 
Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+    topicPartitions.foreach { tp =>
+      partitionMap.put(tp, new 
FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), maxPartitionBytes))
+    }
+    partitionMap
+  }
+
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest, version: 
Option[Short] = None): FetchResponse = {
+    val response = send(request, ApiKeys.FETCH, version, destination = 
brokerSocketServer(leaderId))
+    FetchResponse.parse(response)
+  }
+
+  @Test
+  def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = {
+    val messagesPerPartition = 9
+    val maxResponseBytes = 800
+    val maxPartitionBytes = 190
+
+    def createFetchRequest(topicPartitions: Seq[TopicPartition], offsetMap: 
Map[TopicPartition, Long] = Map.empty): FetchRequest =
+      this.createFetchRequest(maxResponseBytes, maxPartitionBytes, 
topicPartitions, offsetMap)
+
+    val topicPartitionToLeader = createTopics(numTopics = 5, numPartitions = 6)
+    val random = new Random(0)
+    val topicPartitions = topicPartitionToLeader.keySet
+    produceData(topicPartitions, messagesPerPartition)
+
+    val leaderId = servers.head.config.brokerId
+    val partitionsForLeader = topicPartitionToLeader.toVector.collect {
+      case (tp, partitionLeaderId) if partitionLeaderId == leaderId => tp
+    }
+
+    val partitionsWithLargeMessages = partitionsForLeader.takeRight(2)
+    val partitionWithLargeMessage1 = partitionsWithLargeMessages.head
+    val partitionWithLargeMessage2 = partitionsWithLargeMessages(1)
+    producer.send(new ProducerRecord(partitionWithLargeMessage1.topic, 
partitionWithLargeMessage1.partition,
+      "larger than partition limit", new String(new 
Array[Byte](maxPartitionBytes + 1)))).get
+    producer.send(new ProducerRecord(partitionWithLargeMessage2.topic, 
partitionWithLargeMessage2.partition,
+      "larger than response limit", new String(new 
Array[Byte](maxResponseBytes + 1)))).get
+
+    val partitionsWithoutLargeMessages = 
partitionsForLeader.filterNot(partitionsWithLargeMessages.contains)
+
+    // 1. Partitions with large messages at the end
+    val shuffledTopicPartitions1 = 
random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages
+    val fetchRequest1 = createFetchRequest(shuffledTopicPartitions1)
+    val fetchResponse1 = sendFetchRequest(leaderId, fetchRequest1)
+    checkFetchResponse(shuffledTopicPartitions1, fetchResponse1, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+
+    // 2. Same as 1, but shuffled again
+    val shuffledTopicPartitions2 = 
random.shuffle(partitionsWithoutLargeMessages) ++ partitionsWithLargeMessages
+    val fetchRequest2 = createFetchRequest(shuffledTopicPartitions2)
+    val fetchResponse2 = sendFetchRequest(leaderId, fetchRequest2)
+    checkFetchResponse(shuffledTopicPartitions2, fetchResponse2, 
maxPartitionBytes, maxResponseBytes, messagesPerPartition)
+
+    // 3. Partition with message larger than the partition limit at the start 
of the list
+    val shuffledTopicPartitions3 = Seq(partitionWithLargeMessage1, 
partitionWithLargeMessage2) ++
+      random.shuffle(partitionsWithoutLargeMessages)
+    val fetchRequest3 = createFetchRequest(shuffledTopicPartitions3, 
Map(partitionWithLargeMessage1 -> messagesPerPartition))
+    val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
+    assertEquals(shuffledTopicPartitions3, 
fetchResponse3.responseData.keySet.asScala.toSeq)
+    val responseSize3 = fetchResponse3.responseData.asScala.values.map { 
partitionData =>
+      logEntries(partitionData).map(_.size).sum
+    }.sum
+    assertTrue(responseSize3 <= maxResponseBytes)
+    val partitionData3 = 
fetchResponse3.responseData.get(partitionWithLargeMessage1)
+    assertEquals(Errors.NONE.code, partitionData3.errorCode)
+    assertTrue(partitionData3.highWatermark > 0)
+    val size3 = logEntries(partitionData3).map(_.size).sum
+    assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 
<= maxResponseBytes)
+    assertTrue(s"Expected $size3 to be larger than $maxPartitionBytes", size3 
> maxPartitionBytes)
+    assertTrue(maxPartitionBytes < 
MemoryRecords.readableRecords(partitionData3.recordSet).sizeInBytes)
+
+    // 4. Partition with message larger than the response limit at the start 
of the list
+    val shuffledTopicPartitions4 = Seq(partitionWithLargeMessage2, 
partitionWithLargeMessage1) ++
+      random.shuffle(partitionsWithoutLargeMessages)
+    val fetchRequest4 = createFetchRequest(shuffledTopicPartitions4, 
Map(partitionWithLargeMessage2 -> messagesPerPartition))
+    val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
+    assertEquals(shuffledTopicPartitions4, 
fetchResponse4.responseData.keySet.asScala.toSeq)
+    val nonEmptyPartitions4 = 
fetchResponse4.responseData.asScala.toSeq.collect {
+      case (tp, partitionData) if logEntries(partitionData).map(_.size).sum > 
0 => tp
+    }
+    assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
+    val partitionData4 = 
fetchResponse4.responseData.get(partitionWithLargeMessage2)
+    assertEquals(Errors.NONE.code, partitionData4.errorCode)
+    assertTrue(partitionData4.highWatermark > 0)
+    val size4 = logEntries(partitionData4).map(_.size).sum
+    assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > 
maxResponseBytes)
+    assertTrue(maxResponseBytes < 
MemoryRecords.readableRecords(partitionData4.recordSet).sizeInBytes)
+  }
+
+  @Test
+  def testFetchRequestV2WithOversizedMessage(): Unit = {
+    val maxPartitionBytes = 200
+    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
+    producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
+      "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
+    val fetchRequest = new FetchRequest(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes, Seq(topicPartition)))
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest, Some(2))
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NONE.code, partitionData.errorCode)
+    assertTrue(partitionData.highWatermark > 0)
+    assertEquals(maxPartitionBytes, 
MemoryRecords.readableRecords(partitionData.recordSet).sizeInBytes)
+    assertEquals(0, logEntries(partitionData).map(_.size).sum)
+  }
+
+  private def logEntries(partitionData: FetchResponse.PartitionData): 
Seq[LogEntry] = {
+    val memoryRecords = MemoryRecords.readableRecords(partitionData.recordSet)
+    memoryRecords.iterator.asScala.toIndexedSeq
+  }
+
+  private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], 
fetchResponse: FetchResponse,
+                                 maxPartitionBytes: Int, maxResponseBytes: 
Int, numMessagesPerPartition: Int): Unit = {
+    assertEquals(expectedPartitions, 
fetchResponse.responseData.keySet.asScala.toSeq)
+    var emptyResponseSeen = false
+    var responseSize = 0
+    var responseBufferSize = 0
+
+    expectedPartitions.foreach { tp =>
+      val partitionData = fetchResponse.responseData.get(tp)
+      assertEquals(Errors.NONE.code, partitionData.errorCode)
+      assertTrue(partitionData.highWatermark > 0)
+
+      val memoryRecords = 
MemoryRecords.readableRecords(partitionData.recordSet)
+      responseBufferSize += memoryRecords.sizeInBytes
+
+      val messages = memoryRecords.iterator.asScala.toIndexedSeq
+      assertTrue(messages.size < numMessagesPerPartition)
+      val messagesSize = messages.map(_.size).sum
+      responseSize += messagesSize
+      if (messagesSize == 0 && !emptyResponseSeen) {
+        assertEquals(0, memoryRecords.sizeInBytes)
+        emptyResponseSeen = true
+      }
+      else if (messagesSize != 0 && !emptyResponseSeen) {
+        assertTrue(messagesSize <= maxPartitionBytes)
+        assertEquals(maxPartitionBytes, memoryRecords.sizeInBytes)
+      }
+      else if (messagesSize != 0 && emptyResponseSeen)
+        fail(s"Expected partition with size 0, but found $tp with size 
$messagesSize")
+      else if (memoryRecords.sizeInBytes != 0 && emptyResponseSeen)
+        fail(s"Expected partition buffer with size 0, but found $tp with size 
${memoryRecords.sizeInBytes}")
+
+    }
+
+    assertEquals(maxResponseBytes - maxResponseBytes % maxPartitionBytes, 
responseBufferSize)
+    assertTrue(responseSize <= maxResponseBytes)
+  }
+
+  private def createTopics(numTopics: Int, numPartitions: Int): 
Map[TopicPartition, Int] = {
+    val topics = (0 until numPartitions).map(t => s"topic${t}")
+    val topicConfig = new Properties
+    topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
+    topics.flatMap { topic =>
+      val partitionToLeader = createTopic(zkUtils, topic, numPartitions = 
numPartitions, replicationFactor = 2,
+        servers = servers, topicConfig = topicConfig)
+      partitionToLeader.map { case (partition, leader) => new 
TopicPartition(topic, partition) -> leader.get }
+    }.toMap
+  }
+
+  private def produceData(topicPartitions: Iterable[TopicPartition], 
numMessagesPerPartition: Int): Seq[ProducerRecord[String, String]] = {
+    val records = for {
+      tp <- topicPartitions.toSeq
+      messageIndex <- 0 until numMessagesPerPartition
+    } yield {
+      val suffix = s"${tp}-${messageIndex}"
+      new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value 
$suffix")
+    }
+    records.map(producer.send).foreach(_.get)
+    records
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9ee3d32..042dabf 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -498,6 +498,7 @@ class KafkaConfigTest {
         case KafkaConfig.ReplicaFetchMaxBytesProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ReplicaFetchWaitMaxMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ReplicaFetchMinBytesProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaFetchResponseMaxBytesProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.NumReplicaFetchersProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 69365aa..f8f8dda 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -42,7 +42,7 @@ class ReplicaManagerQuotasTest {
   val message = new Message("some-data-in-a-message".getBytes())
   val topicAndPartition1 = TopicAndPartition("test-topic", 1)
   val topicAndPartition2 = TopicAndPartition("test-topic", 2)
-  val fetchInfo = Map(topicAndPartition1 -> PartitionFetchInfo(0, 100), 
topicAndPartition2 -> PartitionFetchInfo(0, 100))
+  val fetchInfo = Seq(topicAndPartition1 -> PartitionFetchInfo(0, 100), 
topicAndPartition2 -> PartitionFetchInfo(0, 100))
   var replicaManager: ReplicaManager = null
 
   @Test
@@ -54,12 +54,12 @@ class ReplicaManagerQuotasTest {
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, 
false, fetchInfo, quota)
     assertEquals("Given two partitions, with only one throttled, we should get 
the first", 1,
-      fetch.get(topicAndPartition1).get.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
 
     assertEquals("But we shouldn't get the second", 0,
-      fetch.get(topicAndPartition2).get.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
   }
 
   @Test
@@ -71,11 +71,11 @@ class ReplicaManagerQuotasTest {
     expect(quota.isQuotaExceeded()).andReturn(true).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, 
false, fetchInfo, quota)
     assertEquals("Given two partitions, with both throttled, we should get no 
messages", 0,
-      fetch.get(topicAndPartition1).get.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
     assertEquals("Given two partitions, with both throttled, we should get no 
messages", 0,
-      fetch.get(topicAndPartition2).get.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
   }
 
   @Test
@@ -87,14 +87,14 @@ class ReplicaManagerQuotasTest {
     expect(quota.isQuotaExceeded()).andReturn(false).once()
     replay(quota)
 
-    val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota)
+    val fetch = replicaManager.readFromLocalLog(true, true, Int.MaxValue, 
false, fetchInfo, quota)
     assertEquals("Given two partitions, with both non-throttled, we should get 
both messages", 1,
-      fetch.get(topicAndPartition1).get.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
     assertEquals("Given two partitions, with both non-throttled, we should get 
both messages", 1,
-      fetch.get(topicAndPartition2).get.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
   }
 
-  def setUpMocks(fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], 
message: Message = this.message) {
+  def setUpMocks(fetchInfo: Seq[(TopicAndPartition, PartitionFetchInfo)], 
message: Message = this.message) {
     val zkUtils = createNiceMock(classOf[ZkUtils])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -104,14 +104,14 @@ class ReplicaManagerQuotasTest {
     expect(log.logEndOffsetMetadata).andReturn(new 
LogOffsetMetadata(20L)).anyTimes()
 
     //if we ask for len 1 return a message
-    expect(log.read(anyObject(), geq(1), anyObject())).andReturn(
+    expect(log.read(anyObject(), geq(1), anyObject(), anyObject())).andReturn(
       new FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         new ByteBufferMessageSet(message)
       )).anyTimes()
 
     //if we ask for len = 0, return 0 messages
-    expect(log.read(anyObject(), EasyMock.eq(0), anyObject())).andReturn(
+    expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), 
anyObject())).andReturn(
       new FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         new ByteBufferMessageSet()
@@ -129,7 +129,7 @@ class ReplicaManagerQuotasTest {
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, 
metrics, time).follower)
 
     //create the two replicas
-    for (p <- fetchInfo.keySet) {
+    for ((p, _) <- fetchInfo) {
       val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
       val replica = new Replica(configs.head.brokerId, partition, time, 0, 
Some(log))
       replica.highWatermark = new LogOffsetMetadata(5)
@@ -151,4 +151,4 @@ class ReplicaManagerQuotasTest {
     expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes()
     quota
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 47e5461..ca8d712 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -136,8 +136,8 @@ class ReplicaManagerTest {
       }
 
       var fetchCallbackFired = false
-      def fetchCallback(responseStatus: Map[TopicAndPartition, 
FetchResponsePartitionData]) = {
-        assertEquals("Should give NotLeaderForPartitionException", 
Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.error)
+      def fetchCallback(responseStatus: Seq[(TopicAndPartition, 
FetchResponsePartitionData)]) = {
+        assertEquals("Should give NotLeaderForPartitionException", 
Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.map(_._2).head.error)
         fetchCallbackFired = true
       }
 
@@ -171,7 +171,9 @@ class ReplicaManagerTest {
         timeout = 1000,
         replicaId = -1,
         fetchMinBytes = 100000,
-        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) 
-> new PartitionFetchInfo(0, 100000)),
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        fetchInfos = Seq(new TopicAndPartition(topic, 0) -> new 
PartitionFetchInfo(0, 100000)),
         responseCallback = fetchCallback)
 
       // Make this replica the follower
@@ -202,8 +204,8 @@ class ReplicaManagerTest {
       
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
       EasyMock.replay(metadataCache)
       
-      val brokerList : java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
-      val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
+      val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
+      val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
       
       val partition = rm.getOrCreatePartition(topic, 0)
       partition.getOrCreateReplica(0)
@@ -229,9 +231,9 @@ class ReplicaManagerTest {
       var fetchCallbackFired = false
       var fetchError = 0
       var fetchedMessages: MessageSet = null
-      def fetchCallback(responseStatus: Map[TopicAndPartition, 
FetchResponsePartitionData]) = {
-        fetchError = responseStatus.values.head.error
-        fetchedMessages = responseStatus.values.head.messages
+      def fetchCallback(responseStatus: Seq[(TopicAndPartition, 
FetchResponsePartitionData)]) = {
+        fetchError = responseStatus.map(_._2).head.error
+        fetchedMessages = responseStatus.map(_._2).head.messages
         fetchCallbackFired = true
       }
       
@@ -240,7 +242,9 @@ class ReplicaManagerTest {
         timeout = 1000,
         replicaId = 1,
         fetchMinBytes = 0,
-        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) 
-> new PartitionFetchInfo(1, 100000)),
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        fetchInfos = Seq(new TopicAndPartition(topic, 0) -> new 
PartitionFetchInfo(1, 100000)),
         responseCallback = fetchCallback)
         
       
@@ -254,7 +258,9 @@ class ReplicaManagerTest {
         timeout = 1000,
         replicaId = -1,
         fetchMinBytes = 0,
-        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) 
-> new PartitionFetchInfo(1, 100000)),
+        fetchMaxBytes = Int.MaxValue,
+        hardMaxBytesLimit = false,
+        fetchInfos = Seq(new TopicAndPartition(topic, 0) -> new 
PartitionFetchInfo(1, 100000)),
         responseCallback = fetchCallback)
           
         assertTrue(fetchCallbackFired)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d04b0998/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1052be5..71c2b41 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -16,23 +16,24 @@
  */
 package kafka.server
 
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Collections, Properties}
-
 import kafka.api._
+import kafka.utils._
 import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.utils._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
+
+import org.junit.{Test, After, Before}
+
+import java.util.{Properties}
+import java.util.concurrent.atomic.AtomicBoolean
+import collection.JavaConversions._
+
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-import scala.collection.JavaConversions._
 
 class SimpleFetchTest {
 
@@ -62,7 +63,7 @@ class SimpleFetchTest {
   val partitionId = 0
   val topicAndPartition = TopicAndPartition(topic, partitionId)
 
-  val fetchInfo = Collections.singletonMap(topicAndPartition, 
PartitionFetchInfo(0, fetchSize)).toMap
+  val fetchInfo = Seq(topicAndPartition -> PartitionFetchInfo(0, fetchSize))
 
   var replicaManager: ReplicaManager = null
 
@@ -80,12 +81,12 @@ class SimpleFetchTest {
     val log = EasyMock.createMock(classOf[Log])
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new 
LogOffsetMetadata(leaderLEO)).anyTimes()
-    EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn(
+    EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
       new FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         new ByteBufferMessageSet(messagesToHW)
       )).anyTimes()
-    EasyMock.expect(log.read(0, fetchSize, None)).andReturn(
+    EasyMock.expect(log.read(0, fetchSize, None, true)).andReturn(
       new FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
         new ByteBufferMessageSet(messagesToLEO)
@@ -148,9 +149,9 @@ class SimpleFetchTest {
     val initialAllTopicsCount = 
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
 
     assertEquals("Reading committed data should return messages only up to 
high watermark", messagesToHW,
-      replicaManager.readFromLocalLog(true, true, fetchInfo, 
UnboundedQuota).get(topicAndPartition).get.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(true, true, Int.MaxValue, false, 
fetchInfo, UnboundedQuota).find(_._1 == 
topicAndPartition).get._2.info.messageSet.head.message)
     assertEquals("Reading any data can return messages up to the end of the 
log", messagesToLEO,
-      replicaManager.readFromLocalLog(true, false, fetchInfo, 
UnboundedQuota).get(topicAndPartition).get.info.messageSet.head.message)
+      replicaManager.readFromLocalLog(true, false, Int.MaxValue, false, 
fetchInfo, UnboundedQuota).find(_._1 == 
topicAndPartition).get._2.info.messageSet.head.message)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, 
BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", 
initialAllTopicsCount+2, 
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

Reply via email to