Peter Mead created SPARK-20695:
----------------------------------
Summary: Running multiple TCP socket streams in Spark Shell causes
driver error
Key: SPARK-20695
URL: https://issues.apache.org/jira/browse/SPARK-20695
Project: Spark
Issue Type: Bug
Components: DStreams, Spark Core, Spark Shell, Structured Streaming
Affects Versions: 2.0.2
Environment: DataStax DSE apache 3 node cassandra running with
analytics on RHEL 7.3 on Hyper-V windows 10 laptop.
Reporter: Peter Mead
Priority: Blocker
Whenever I include a second socket stream (lines02) the script errors if I am
not trying to process data. If I remove the lines02.... the script runs fine!!
script:
val s_server01="192.168.1.10"
val s_port01 = 8801
val s_port02 = 8802
import org.apache.spark.streaming._,
org.apache.spark.streaming.StreamingContext._
import scala.util.Random
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import java.util.Date;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import sys.process._
import org.apache.spark.streaming.dstream.ConstantInputDStream
sc.setLogLevel("ERROR")
val df2 = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")
var processed:Long = 0
var pdate=""
case class t_row (card_number: String, event_date: Int, event_time: Int,
processed: Long, transport_type: String, card_credit: java.lang.Float,
transport_location: String, journey_type: Int, journey_value: java.lang.Float)
var type2tot = 0
var type5tot = 0
var numb=0
var total_secs:Double = 0
val red = "\033[0;31m"
val green = "\033[0;32m"
val cyan = "\033[0;36m"
val yellow = "\033[0;33m"
val nocolour = "\033[0;0m"
var color = ""
val t_int = 5
var init = 0
var tot_cnt:Long = 0
val ssc = new StreamingContext(sc, Seconds(t_int))
val lines01 = ssc.socketTextStream(s_server01, s_port01)
val lines02 = ssc.socketTextStream(s_server01, s_port02)
// val lines = lines01.union(lines02)
val line01 = lines01.foreachRDD( rdd => {
println("\n------------line 01")
if (init == 0) {"clear".!;init = 1}
val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
val processed = System.currentTimeMillis
val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt,
System.currentTimeMillis, line(3), line(4).toFloat, line(5),
line(6).toInt, line(7).toFloat ))
val cnt:Long = bb.count
bb.saveToCassandra("transport", "card_data_input")
})
//val line02 = lines02.foreachRDD( rdd => {
//println("------------line 02")
//if (init == 0) {"clear".!;init = 1}
//val xx=rdd.map(bb => (bb.substring(0,bb.length).split(",")))
//xx.collect.foreach(println)
//val processed = System.currentTimeMillis
//val bb = xx.map(line => t_row(line(0), line(1).toInt, line(2).toInt,
System.currentTimeMillis, line(3), line(4).toFloat, line(5),
//line(6).toInt, line(7).toFloat ))
//val cnt:Long = bb.count
//bb.saveToCassandra("transport", "card_data_input")
//})
ERROR:
software.kryo.KryoException: Encountered unregistered class ID: 13994
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)...etc
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]