[
https://issues.apache.org/jira/browse/SPARK-2492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14063063#comment-14063063
]
Saisai Shao commented on SPARK-2492:
------------------------------------
Hi TD,
The parameter "auto.offset.reset" is triggered when current consumer offset is
out of range (When not consuming data for a while but data is still feed into
brokers). "auto.offset.reset" can have to values:
1. "smallest", which means seeking the current offset to the beginning of the
queue (in case we want to read all the data).
2. "largest", which means seeking the current offset to the end of the queue
(in case we want to abandon some old unneeded data).
The default value is "largest" in Kafka 0.8, while it is "smallest" by default
in Kafka 0.7.
In previous code, we clean zookeeper metadata when "auto.offset.reset" is set
in configuration, no matter is "smallest" or "largest"
{code}
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
{code}
Actually in Kafka ConsoleConsumer, the code is:
{code}
if(options.has(resetBeginningOpt))
ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" +
options.valueOf(groupIdOpt))
{code}
The difference is that we don't need to clean the Zookeeper metadata when
setting to "largest". So here I change to align with Kafka's code.
Actually, in Kafka 0.8, default is "largest", so if we don't set this
parameter, this code path will never be triggered, which means the default
behavior is the same.
> KafkaReceiver minor changes to align with Kafka 0.8
> ----------------------------------------------------
>
> Key: SPARK-2492
> URL: https://issues.apache.org/jira/browse/SPARK-2492
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.0.0
> Reporter: Saisai Shao
> Assignee: Saisai Shao
> Priority: Minor
> Fix For: 1.1.0
>
>
> Update to delete Zookeeper metadata when Kafka's parameter
> "auto.offset.reset" is set to "smallest", which is aligned with Kafka 0.8's
> ConsoleConsumer.
> Also use Kafka offered API without directly using zkClient.
--
This message was sent by Atlassian JIRA
(v6.2#6252)