Repository: kafka Updated Branches: refs/heads/0.8.2 2a1e3d451 -> 96ce96dc9
KAFKA-1729; Add constructor to javaapi to allow constructing explicitly versioned offset commit requests; patched by Joel Koshy; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ce96dc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ce96dc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ce96dc Branch: refs/heads/0.8.2 Commit: 96ce96dc959dd827ae1f70a17e89e05b91b6ba58 Parents: 2a1e3d4 Author: Joel Koshy <jjko...@gmail.com> Authored: Wed Jan 28 19:16:43 2015 -0600 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Jan 28 19:16:43 2015 -0600 ---------------------------------------------------------------------- .../main/scala/kafka/api/OffsetCommitResponse.scala | 4 +++- .../kafka/javaapi/ConsumerMetadataResponse.scala | 6 ++++++ .../scala/kafka/javaapi/OffsetCommitRequest.scala | 14 ++++++++++++-- .../scala/kafka/javaapi/OffsetCommitResponse.scala | 9 +++++++++ .../scala/kafka/javaapi/OffsetFetchResponse.scala | 5 +++++ 5 files changed, 35 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/api/OffsetCommitResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 03dd736..abe67a5 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -20,7 +20,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.utils.Logging -import kafka.common.TopicAndPartition +import kafka.common.{ErrorMapping, TopicAndPartition} object OffsetCommitResponse extends Logging { val CurrentVersion: Short = 1 @@ -50,6 +50,8 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short], lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic) + def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError } + def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) buffer.putInt(commitStatusGroupedByTopic.size) http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala index 1b28861..d281bb3 100644 --- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala +++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala @@ -17,6 +17,8 @@ package kafka.javaapi +import java.nio.ByteBuffer + import kafka.cluster.Broker class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) { @@ -40,3 +42,7 @@ class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadat override def toString = underlying.toString } + +object ConsumerMetadataResponse { + def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer)) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala index 873f575..456c3c4 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -22,7 +22,8 @@ import kafka.common.{OffsetAndMetadata, TopicAndPartition} class OffsetCommitRequest(groupId: String, requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata], correlationId: Int, - clientId: String) { + clientId: String, + versionId: Short) { val underlying = { val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = { import collection.JavaConversions._ @@ -32,12 +33,21 @@ class OffsetCommitRequest(groupId: String, kafka.api.OffsetCommitRequest( groupId = groupId, requestInfo = scalaMap, - versionId = 0, // binds to version 0 so that it commits to Zookeeper + versionId = versionId, correlationId = correlationId, clientId = clientId ) } + def this(groupId: String, + requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata], + correlationId: Int, + clientId: String) { + + // by default bind to version 0 so that it commits to Zookeeper + this(groupId, requestInfo, correlationId, clientId, 0) + } + override def toString = underlying.toString http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala index c2d3d11..b222329 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala @@ -17,6 +17,8 @@ package kafka.javaapi +import java.nio.ByteBuffer + import kafka.common.TopicAndPartition import collection.JavaConversions @@ -27,5 +29,12 @@ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitRespons underlying.commitStatus } + def hasError = underlying.hasError + + def errorCode(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition) + } +object OffsetCommitResponse { + def readFrom(buffer: ByteBuffer) = new OffsetCommitResponse(kafka.api.OffsetCommitResponse.readFrom(buffer)) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala index 60924d2..c4bdb12 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala @@ -17,6 +17,8 @@ package kafka.javaapi +import java.nio.ByteBuffer + import kafka.common.{TopicAndPartition, OffsetMetadataAndError} import collection.JavaConversions @@ -29,3 +31,6 @@ class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) } +object OffsetFetchResponse { + def readFrom(buffer: ByteBuffer) = new OffsetFetchResponse(kafka.api.OffsetFetchResponse.readFrom(buffer)) +}