[ https://issues.apache.org/jira/browse/SPARK-23767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16420291#comment-16420291 ]
Saisai Shao commented on SPARK-23767: ------------------------------------- This seems not a Spark issue, more like a microsoft event-hub issue. So resolve as not a problem. Please feel free to reopen it if it is indeed a Spark issue. > DirectStream is producing the incorrect type of message > ------------------------------------------------------- > > Key: SPARK-23767 > URL: https://issues.apache.org/jira/browse/SPARK-23767 > Project: Spark > Issue Type: Bug > Components: DStreams, PySpark > Affects Versions: 2.3.0 > Environment: * Spark version > 2.3.0 with pyspark3 > * spark-eventhubs artifactId and version > com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.0 > Reporter: Eduard Biceri Matei > Priority: Major > > h1. EventHubsUtils DirectStream is producing the incorrect type of message > > (see also https://github.com/Azure/azure-event-hubs-spark/issues/282) > example code: > {quote} > from pyspark.serializers import PairDeserializer, NoOpSerializer > from pyspark.streaming import DStream > def foreach(rdd): > rdd.foreach(lambda row: print(row)) > ssc = ... sparkStreamingContext > # deserializer to convert java dstream to python dstream > ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) > # java interop > jvm = sc._gateway.jvm > ehConf = > jvm.org.apache.spark.eventhubs.EventHubsConf(config['ConnectionString']) > # use java to instantiate the EventHub connector > directStream = > jvm.org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc._jssc, > ehConf) > # convert java dstream to python > stream = DStream(directStream, ssc, ser) > stream.foreachRDD(lambda rdd: foreach(rdd)) > ssc.start... > {quote} > > > {quote}detailed traceback: > 2018-03-20 20:29:10 ERROR PythonRunner:91 - Python worker exited unexpectedly > (crashed) > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File > "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", > line 214, in main > eval_type = read_int(infile) > File > "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", > line 685, in read_int > raise EOFError > EOFError > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: Unexpected element type class > com.microsoft.azure.eventhubs.impl.EventDataImpl > at > org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:201) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:204) > at > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:204) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.foreach(EventHubsRDD.scala:101) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:204) > at > org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988) > at > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org