Repository: kafka Updated Branches: refs/heads/trunk 602d572f6 -> 1769642bb
kafka-1870; Cannot commit with simpleConsumer on Zookeeper only with Java API; patched by Jun Rao; reviewed by Jeol Koshy and Sriharsha Chintalapani Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1769642b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1769642b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1769642b Branch: refs/heads/trunk Commit: 1769642bb779921267bd57d3d338591dbdf33842 Parents: 602d572 Author: Jun Rao <jun...@gmail.com> Authored: Fri Jan 16 18:34:39 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Jan 16 18:34:39 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/consumer/SimpleConsumer.scala | 2 ++ core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala | 1 + core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala | 2 +- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala | 4 ++-- 4 files changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1769642b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index e53ee51..cbef84a 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -128,6 +128,7 @@ class SimpleConsumer(val host: String, /** * Commit offsets for a topic + * Version 0 of the request will commit offsets to Zookeeper and version 1 and above will commit offsets to Kafka. * @param request a [[kafka.api.OffsetCommitRequest]] object. * @return a [[kafka.api.OffsetCommitResponse]] object. */ @@ -139,6 +140,7 @@ class SimpleConsumer(val host: String, /** * Fetch offsets for a topic + * Version 0 of the request will fetch offsets from Zookeeper and version 1 and above will fetch offsets from Kafka. * @param request a [[kafka.api.OffsetFetchRequest]] object. * @return a [[kafka.api.OffsetFetchResponse]] object. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/1769642b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala index 27fc1eb..873f575 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -32,6 +32,7 @@ class OffsetCommitRequest(groupId: String, kafka.api.OffsetCommitRequest( groupId = groupId, requestInfo = scalaMap, + versionId = 0, // binds to version 0 so that it commits to Zookeeper correlationId = correlationId, clientId = clientId ) http://git-wip-us.apache.org/repos/asf/kafka/blob/1769642b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala index 5b4f4bb..1c25aa3 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala @@ -36,7 +36,7 @@ class OffsetFetchRequest(groupId: String, kafka.api.OffsetFetchRequest( groupId = groupId, requestInfo = scalaSeq, - versionId = versionId, + versionId = 0, // binds to version 0 so that it commits to Zookeeper correlationId = correlationId, clientId = clientId ) http://git-wip-us.apache.org/repos/asf/kafka/blob/1769642b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 0ab0195..abf6069 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -80,7 +80,7 @@ class SimpleConsumer(val host: String, } /** - * Commit offsets for a topic + * Commit offsets for a topic to Zookeeper * @param request a [[kafka.javaapi.OffsetCommitRequest]] object. * @return a [[kafka.javaapi.OffsetCommitResponse]] object. */ @@ -90,7 +90,7 @@ class SimpleConsumer(val host: String, } /** - * Fetch offsets for a topic + * Fetch offsets for a topic from Zookeeper * @param request a [[kafka.javaapi.OffsetFetchRequest]] object. * @return a [[kafka.javaapi.OffsetFetchResponse]] object. */