[ 
https://issues.apache.org/jira/browse/SPARK-2492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14063063#comment-14063063
 ] 

Saisai Shao commented on SPARK-2492:
------------------------------------

Hi TD,

The parameter "auto.offset.reset" is triggered when current consumer offset is 
out of range (When not consuming data for a while but data is still feed into 
brokers). "auto.offset.reset" can have to values:

1. "smallest", which means seeking the current offset to the beginning of the 
queue (in case we want to read all the data).
2. "largest", which means seeking the current offset to the end of the queue 
(in case we want to abandon some old unneeded data).

The default value is "largest" in Kafka 0.8, while it is "smallest" by default 
in Kafka 0.7.

In previous code, we clean zookeeper metadata when "auto.offset.reset" is set 
in configuration, no matter is "smallest" or "largest"

{code}
    if (kafkaParams.contains("auto.offset.reset")) {
      tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
    }
{code}

Actually in Kafka ConsoleConsumer, the code is:

{code}
    if(options.has(resetBeginningOpt))
      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + 
options.valueOf(groupIdOpt))
{code}

The difference is that we don't need to clean the Zookeeper metadata when 
setting to "largest". So here I change to align with Kafka's code.

Actually, in Kafka 0.8, default is "largest", so if we don't set this 
parameter, this code path will never be triggered, which means the default 
behavior is the same.



> KafkaReceiver minor changes to align with Kafka 0.8 
> ----------------------------------------------------
>
>                 Key: SPARK-2492
>                 URL: https://issues.apache.org/jira/browse/SPARK-2492
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Saisai Shao
>            Assignee: Saisai Shao
>            Priority: Minor
>             Fix For: 1.1.0
>
>
> Update to delete Zookeeper metadata when Kafka's parameter 
> "auto.offset.reset" is set to "smallest", which is aligned with Kafka 0.8's 
> ConsoleConsumer.
> Also use Kafka offered API without directly using zkClient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to