Updated Branches: refs/heads/0.8 de1a4d727 -> 777f66220
Disallow clients to set replicaId in FetchRequest; kafka-699; patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/777f6622 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/777f6622 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/777f6622 Branch: refs/heads/0.8 Commit: 777f66220153a64cd33cd5484a64de556f4fa3a8 Parents: de1a4d7 Author: Jun Rao <jun...@gmail.com> Authored: Tue Jan 15 21:26:45 2013 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Jan 15 21:26:45 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/api/FetchRequest.scala | 36 +++++++++++--- .../main/scala/kafka/javaapi/FetchRequest.scala | 6 +-- 2 files changed, 30 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/777f6622/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b4fb874..7968747 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -58,13 +58,13 @@ object FetchRequest { } } -case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, - correlationId: Int = FetchRequest.DefaultCorrelationId, - clientId: String = ConsumerConfig.DefaultClientId, - replicaId: Int = Request.OrdinaryConsumerId, - maxWait: Int = FetchRequest.DefaultMaxWait, - minBytes: Int = FetchRequest.DefaultMinBytes, - requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) +case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion, + correlationId: Int = FetchRequest.DefaultCorrelationId, + clientId: String = ConsumerConfig.DefaultClientId, + replicaId: Int = Request.OrdinaryConsumerId, + maxWait: Int = FetchRequest.DefaultMaxWait, + minBytes: Int = FetchRequest.DefaultMinBytes, + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { /** @@ -72,6 +72,23 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, */ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + /** + * Public constructor for the clients + */ + def this(correlationId: Int, + clientId: String, + maxWait: Int, + minBytes: Int, + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) { + this(versionId = FetchRequest.CurrentVersion, + correlationId = correlationId, + clientId = clientId, + replicaId = Request.OrdinaryConsumerId, + maxWait = maxWait, + minBytes= minBytes, + requestInfo = requestInfo) + } + def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) @@ -144,7 +161,10 @@ class FetchRequestBuilder() { this } - def replicaId(replicaId: Int): FetchRequestBuilder = { + /** + * Only for internal use. Clients shouldn't set replicaId. + */ + private[kafka] def replicaId(replicaId: Int): FetchRequestBuilder = { this.replicaId = replicaId this } http://git-wip-us.apache.org/repos/asf/kafka/blob/777f6622/core/src/main/scala/kafka/javaapi/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index 44d148e..b475240 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -18,14 +18,12 @@ package kafka.javaapi import scala.collection.JavaConversions -import kafka.api.PartitionFetchInfo import java.nio.ByteBuffer import kafka.common.TopicAndPartition - +import kafka.api.{Request, PartitionFetchInfo} class FetchRequest(correlationId: Int, clientId: String, - replicaId: Int, maxWait: Int, minBytes: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { @@ -35,7 +33,7 @@ class FetchRequest(correlationId: Int, kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, - replicaId = replicaId, + replicaId = Request.OrdinaryConsumerId, maxWait = maxWait, minBytes = minBytes, requestInfo = scalaMap