[ 
https://issues.apache.org/jira/browse/SPARK-13106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15125904#comment-15125904
 ] 

Luis Alves commented on SPARK-13106:
------------------------------------

Yes, but it requires me to pass the fromOffsets instead of a set with the 
partitions. It assumes you already know where the topic offsets start. What if 
you want to start from the earlier message in the topic, but you need messages 
to be ((mmd.topic, mmd.offset ,mmd.key), mmd.message)?

> KafkaUtils.createDirectStream method with messageHandler and topics
> -------------------------------------------------------------------
>
>                 Key: SPARK-13106
>                 URL: https://issues.apache.org/jira/browse/SPARK-13106
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Luis Alves
>              Labels: newbie
>
> Regarding the KafkaUtils.createDirectStream method here: 
> https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L473
> I purpose to also allow to pass the a messageHandler as argument instead of 
> using the one that is setted inside the method:
> {code}
>  val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
> mmd.message)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to