[
https://issues.apache.org/jira/browse/SPARK-18560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shixiong Zhu resolved SPARK-18560.
----------------------------------
Resolution: Duplicate
> Receiver data can not be dataSerialized properly.
> -------------------------------------------------
>
> Key: SPARK-18560
> URL: https://issues.apache.org/jira/browse/SPARK-18560
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.2
> Reporter: Genmao Yu
> Priority: Critical
> Fix For: 2.0.3, 2.1.0
>
>
> My spark streaming job can run correctly on Spark 1.6.1, but it can not run
> properly on Spark 2.0.1, with following exception:
> {code}
> 16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0
> (TID 87)
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 13994
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Go deep into relevant implementation, I find the type of data received by
> {{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate
> {{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the
> type of data.
> At the {{Receiver}} side, the type of data is erased to be {{Object}}, so
> framework will choose {{JavaSerializer}}, with following code:
> {code}
> def canUseKryo(ct: ClassTag[_]): Boolean = {
> primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
> }
> def getSerializer(ct: ClassTag[_]): Serializer = {
> if (canUseKryo(ct)) {
> kryoSerializer
> } else {
> defaultSerializer
> }
> }
> {code}
> At task side, we can get correct data type, and framework will choose
> {{KryoSerializer}} if possible, with following supported type:
> {code}
> private[this] val stringClassTag: ClassTag[String] =
> implicitly[ClassTag[String]]
> private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
> val primitiveClassTags = Set[ClassTag[_]](
> ClassTag.Boolean,
> ClassTag.Byte,
> ClassTag.Char,
> ClassTag.Double,
> ClassTag.Float,
> ClassTag.Int,
> ClassTag.Long,
> ClassTag.Null,
> ClassTag.Short
> )
> val arrayClassTags = primitiveClassTags.map(_.wrap)
> primitiveClassTags ++ arrayClassTags
> }
> {code}
> In my case, the type of data is Byte Array.
> This problem stems from SPARK-13990, a patch to have Spark automatically pick
> the "best" serializer when caching RDDs.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]