Hi, If you want I would be happy to work in this. I have worked with KafkaUtils.createDirectStream before, in a pull request that wasn't accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python and I'm starting to feel comfortable with Scala, so if someone opens a JIRA I can take it.
Greetings, Juan Rodriguez 2015-06-12 15:59 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > The scala api has 2 ways of calling createDirectStream. One of them > allows you to pass a message handler that gets full access to the kafka > MessageAndMetadata, including offset. > > I don't know why the python api was developed with only one way to call > createDirectStream, but the first thing I'd look at would be adding that > functionality back in. If someone wants help creating a patch for that, > just let me know. > > Dealing with offsets on a per-message basis may not be as efficient as > dealing with them on a batch basis using the HasOffsetRanges interface... > but if efficiency was a primary concern, you probably wouldn't be using > Python anyway. > > On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> Scala KafkaRDD uses a trait to handle this problem, but it is not so easy >> and straightforward in Python, where we need to have a specific API to >> handle this, I'm not sure is there any simple workaround to fix this, maybe >> we should think carefully about it. >> >> 2015-06-12 13:59 GMT+08:00 Amit Ramesh <a...@yelp.com>: >> >>> >>> Thanks, Jerry. That's what I suspected based on the code I looked at. >>> Any pointers on what is needed to build in this support would be great. >>> This is critical to the project we are currently working on. >>> >>> Thanks! >>> >>> >>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> OK, I get it, I think currently Python based Kafka direct API do not >>>> provide such equivalence like Scala, maybe we should figure out to add this >>>> into Python API also. >>>> >>>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>> >>>>> >>>>> Hi Jerry, >>>>> >>>>> Take a look at this example: >>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >>>>> >>>>> The offsets are needed because as RDDs get generated within spark the >>>>> offsets move further along. With direct Kafka mode the current offsets are >>>>> no more persisted in Zookeeper but rather within Spark itself. If you want >>>>> to be able to use zookeeper based monitoring tools to keep track of >>>>> progress, then this is needed. >>>>> >>>>> In my specific case we need to persist Kafka offsets externally so >>>>> that we can continue from where we left off after a code deployment. In >>>>> other words, we need exactly-once processing guarantees across code >>>>> deployments. Spark does not support any state persistence across >>>>> deployments so this is something we need to handle on our own. >>>>> >>>>> Hope that helps. Let me know if not. >>>>> >>>>> Thanks! >>>>> Amit >>>>> >>>>> >>>>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> What is your meaning of getting the offsets from the RDD, from my >>>>>> understanding, the offsetRange is a parameter you offered to KafkaRDD, >>>>>> why >>>>>> do you still want to get the one previous you set into? >>>>>> >>>>>> Thanks >>>>>> Jerry >>>>>> >>>>>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>>>> >>>>>>> >>>>>>> Congratulations on the release of 1.4! >>>>>>> >>>>>>> I have been trying out the direct Kafka support in python but >>>>>>> haven't been able to figure out how to get the offsets from the RDD. >>>>>>> Looks >>>>>>> like the documentation is yet to be updated to include Python examples ( >>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>>>>>> I am specifically looking for the equivalent of >>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >>>>>>> I tried digging through the python code but could not find anything >>>>>>> related. Any pointers would be greatly appreciated. >>>>>>> >>>>>>> Thanks! >>>>>>> Amit >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >