[ https://issues.apache.org/jira/browse/SPARK-17411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Milanowski closed SPARK-17411. ------------------------------------ Resolution: Fixed Duplicate of https://issues.apache.org/jira/browse/SPARK-16950 > Cannot set fromOffsets in createDirectStream function > ----------------------------------------------------- > > Key: SPARK-17411 > URL: https://issues.apache.org/jira/browse/SPARK-17411 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming > Affects Versions: 2.0.0 > Reporter: Piotr Milanowski > > I am trying to create a kafka direct stream with a custom starting offset: > {code} > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition > streamin_ctx = StreamingContext(sc, 300) > topic = TopicAndPartition("my-kafka-topic", 1) > d_stream = KafkaUtils.createDirectStream(streaming_ctx, ["my-kafka-topic"], > {"metadata.broker.list": "kafka.server.com:9092"}, fromOffsets={topic: > 123445}) > {code} > This code snippet, run in pyspark (with a proper topic name and server > localisation, obviously) returns a casting error due to the _fromOffset_ > variable. > {code} > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "spark-2.0/python/pyspark/streaming/kafka.py", line 130, in > createDirectStream > ssc._jssc, kafkaParams, set(topics), jfromOffsets) > File "spark-2.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line > 933, in __call__ > File "spark-2.0/python/pyspark/sql/utils.py", line 63, in deco > return f(*a, **kw) > File "spark-2.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, > in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o57.createDirectStreamWithoutMessageHandler. > : java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Long > at > org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper$$anonfun$17.apply(KafkaUtils.scala:717) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.copyToBuffer(TraversableOnce.scala:275) > at > scala.collection.AbstractTraversable.copyToBuffer(Traversable.scala:104) > at scala.collection.MapLike$class.toBuffer(MapLike.scala:326) > at scala.collection.AbstractMap.toBuffer(Map.scala:59) > at scala.collection.MapLike$class.toSeq(MapLike.scala:323) > at scala.collection.AbstractMap.toSeq(Map.scala:59) > at > org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:717) > at > org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:211) > at java.lang.Thread.run(Thread.java:745) > {code} > Either this is a bug, or the documentation of kafka streaming API can be a > little more precise on what can be used as _fromOffsets_. > What should be used as _fromOffsets_? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org