Genmao Yu created SPARK-18560:
---------------------------------

             Summary: Receiver data can not be dataSerialized properly.
                 Key: SPARK-18560
                 URL: https://issues.apache.org/jira/browse/SPARK-18560
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, Structured Streaming
    Affects Versions: 2.0.2
            Reporter: Genmao Yu
            Priority: Critical


My spark streaming job can run correctly on Spark 1.6.1, but it can not run 
properly on Spark 2.0.1, with following exception:
{code}
16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 
(TID 87)
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
13994
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
        at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

Go deep into  relevant implementation, I find the type of data received by 
{{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate 
{{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the type 
of data. 

At the {{Receiver}} side, the type of data is erased to be {{Object}}, so 
framework will choose {{JavaSerializer}}, with following code:

{code}
def canUseKryo(ct: ClassTag[_]): Boolean = {
    primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
  }

  def getSerializer(ct: ClassTag[_]): Serializer = {
    if (canUseKryo(ct)) {
      kryoSerializer
    } else {
      defaultSerializer
    }
  }
{code}

At task side, we can get correct data type, and framework will choose 
{{KryoSerializer}} if possible, with following supported type:

{code}
private[this] val stringClassTag: ClassTag[String] = 
implicitly[ClassTag[String]]
private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
    val primitiveClassTags = Set[ClassTag[_]](
      ClassTag.Boolean,
      ClassTag.Byte,
      ClassTag.Char,
      ClassTag.Double,
      ClassTag.Float,
      ClassTag.Int,
      ClassTag.Long,
      ClassTag.Null,
      ClassTag.Short
    )
    val arrayClassTags = primitiveClassTags.map(_.wrap)
    primitiveClassTags ++ arrayClassTags
  }
{code}

In my case, the type of data is Byte Array.

This problem stems from #SPARK-13990, a patch to have Spark automatically pick 
the "best" serializer when caching RDDs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to