[
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14604633#comment-14604633
]
Juan RodrĂguez Hortalá commented on SPARK-8337:
-----------------------------------------------
Hi,
I have worked a bit on the OffsetRange way, you can access the code at
https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff.
I have added the following method to pyspark KafkaUtils
@staticmethod
def getOffsetRanges(rdd):
scalaRdd = rdd._jrdd.rdd()
offsetRangesArray = scalaRdd.offsetRanges()
return [ OffsetRange(topic = offsetRange.topic(),
partition = offsetRange.partition(),
fromOffset = offsetRange.fromOffset(),
untilOffset = offsetRange.untilOffset())
for offsetRange in offsetRangesArray]
This method is used in KafkaUtils.createDirectStreamJB, which is based on the
original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is
that I don't know where to store the OffsetRange objects. The naive trick of
adding them to the __dict__ of each python RDD object doesn't work, the new
field is lost in the pyspark wrappers. So the new method createDirectStreamJB
takes two additional options, one for performing an action on the OffsetRange
list, and another for adding it to each record of the DStream
def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={},
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
offsetRangeForeach=None, addOffsetRange=False):
"""
FIXME: temporary working placeholder
:param offsetRangeForeach: if different to None, this function should
be a function from a list of OffsetRange to None, and is applied to the
OffsetRange
list of each rdd
:param addOffsetRange: if False (default) output records are of the
shape (kafkaKey, kafkaValue); if True output records are of the shape
(offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for
the Spark partition for the record
This is an example of using createDirectStreamJB:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
topics = ["test"]
kafkaParams = {"metadata.broker.list" : "localhost:9092"}
def offsetRangeForeach(offsetRangeList):
print
print
for offsetRange in offsetRangeList:
print offsetRange
print
print
kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams,
offsetRangeForeach=offsetRangeForeach, addOffsetRange=True)
# OffsetRange printed as <pyspark.streaming.kafka.OffsetRange object at
0x7f2fdc045950>, I guess due to some kind of pyspark proxy
kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) :
str(offRan._fromOffset) + " " + str(offRan._untilOffset) + " " + str(k) + " " +
str(v))
# kafkaStream.pprint()
kafkaStrStream.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)
which gets the following output
15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata:
1435487761000 ms
OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179)
15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms
...
15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at
PythonRDD.scala:366, took 0,075387 s
-------------------------------------------
Time: 2015-06-28 12:36:04
-------------------------------------------
178 179 None hola
()
15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000
ms.0 from job set of time 1435487764000 ms
...
15/06/28 12:36:05 INFO BlockManager: Removing RDD 12
OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180)
15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000
ms.0 from job set of time 1435487766000 ms
....
15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at
NativeMethodAccessorImpl.java:-2, took 0,077993 s
-------------------------------------------
Time: 2015-06-28 12:36:06
-------------------------------------------
179 180 None caracola
()
15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000
ms.0 from job set of time 1435487766000 ms
Any thoughts on this will be appreciated, in particular about a suitable place
to store the list of OffsetRange objects
Greetings,
Juan
> KafkaUtils.createDirectStream for python is lacking API/feature parity with
> the Scala/Java version
> --------------------------------------------------------------------------------------------------
>
> Key: SPARK-8337
> URL: https://issues.apache.org/jira/browse/SPARK-8337
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Streaming
> Affects Versions: 1.4.0
> Reporter: Amit Ramesh
> Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]