Eduard Biceri Matei created SPARK-23767: -------------------------------------------
Summary: DirectStream is producing the incorrect type of message Key: SPARK-23767 URL: https://issues.apache.org/jira/browse/SPARK-23767 Project: Spark Issue Type: Bug Components: DStreams, PySpark Affects Versions: 2.3.0 Environment: * Spark version 2.3.0 with pyspark3 * spark-eventhubs artifactId and version com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.0 Reporter: Eduard Biceri Matei h1. EventHubsUtils DirectStream is producing the incorrect type of message (see also https://github.com/Azure/azure-event-hubs-spark/issues/282) example code: {quote} from pyspark.serializers import PairDeserializer, NoOpSerializer from pyspark.streaming import DStream def foreach(rdd): rdd.foreach(lambda row: print(row)) ssc = ... sparkStreamingContext # deserializer to convert java dstream to python dstream ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) # java interop jvm = sc._gateway.jvm ehConf = jvm.org.apache.spark.eventhubs.EventHubsConf(config['ConnectionString']) # use java to instantiate the EventHub connector directStream = jvm.org.apache.spark.eventhubs.EventHubsUtils.createDirectStream(ssc._jssc, ehConf) # convert java dstream to python stream = DStream(directStream, ssc, ser) stream.foreachRDD(lambda rdd: foreach(rdd)) ssc.start... {quote} {quote}detailed traceback: 2018-03-20 20:29:10 ERROR PythonRunner:91 - Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 214, in main eval_type = read_int(infile) File "/usr/local/Cellar/apache-spark/2.3.0/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 685, in read_int raise EOFError EOFError at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Unexpected element type class com.microsoft.azure.eventhubs.impl.EventDataImpl at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:201) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:204) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.foreach(EventHubsRDD.scala:101) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org