[ 
https://issues.apache.org/jira/browse/SPARK-2492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14069795#comment-14069795
 ] 

Tobias Pfeiffer commented on SPARK-2492:
----------------------------------------

Maybe we should just add a helper function resetOffsets() to KafkaUtils? If we 
add this to createStream, we may run into all kind of weird concurrency issues 
when multiple KafkaReceivers are started simultaneously. I think something like

{code:java}
  KafkaUtils.resetOffsets()
  /* the above function will block and basically do what
  KafkaReceiver.tryZookeeperConsumerGroupCleanup() is doing right now. */
  val streams = (1 to 4).map{_ => {
    KafkaUtils.createStream(...)
  }}
{code}

would be a good way to avoid concurrency issues and allow to still make all 
Kafka options behave as they are supposed to.

> KafkaReceiver minor changes to align with Kafka 0.8 
> ----------------------------------------------------
>
>                 Key: SPARK-2492
>                 URL: https://issues.apache.org/jira/browse/SPARK-2492
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Saisai Shao
>            Assignee: Saisai Shao
>            Priority: Minor
>             Fix For: 1.1.0
>
>
> Update to delete Zookeeper metadata when Kafka's parameter 
> "auto.offset.reset" is set to "smallest", which is aligned with Kafka 0.8's 
> ConsoleConsumer.
> Also use Kafka offered API without directly using zkClient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to