For your second question,

bq. Class is not registered: scala.Tuple3[]

The above IllegalArgumentException has stated the class Scala was expecting
registration.
Meaning the type of components in the tuple is insignificant.

BTW what Spark release are you using ?

Cheers

On Thu, Dec 31, 2015 at 9:49 AM, Russ <russ.br...@yahoo.com.invalid> wrote:

> The ScalaTest code that is enclosed at the end of this email message
> demonstrates what appears to be a bug in the KryoSerializer.  This code was
> executed from IntelliJ IDEA (community edition) under Mac OS X 10.11.2
>
> The KryoSerializer is enabled by updating the original SparkContext  (that
> is supplied by the ScalaTest) via:
>
> 1. reading the SparkConf from the SparkContext,
> 2. updating the SparkConf to enable the KryoSerializer,
> 3. stopping the original SparkContext, and
> 4. creating a new SparkContext from the updated SparkConf.
>
> Following enabling of the KryoSerializer, execution of the following line
> (line 56):
>
> val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => 
> Array(iter.size).iterator, true).collect
>
> threw the following three instances of IllegalArgumentException:
>
> java.lang.IllegalArgumentException: Class is not registered:
> scala.collection.mutable.WrappedArray$ofInt
> java.lang.IllegalArgumentException: Class is not registered: int[]
> java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]
>
> which prompted registration of the following three classes with the
> KryoSerializer via the SparkConf.registerKryoClasses() method:
>
>         classOf[scala.collection.mutable.WrappedArray.ofInt],
>         classOf[Array[Int]],
>         classOf[Array[Tuple3[_, _, _]]]
>
> Following registration of these three classes with the KryoSerializer, the
> above-indicated 'val rddPartitionsSizes...' line (line 56) executed without
> throwing an IllegalArgumentException.
>
> However, execution of the following line (line 59):
>
> val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => 
> Array(iter.size).iterator, true).collect
>
> threw the following SparkException:
>
> Task not serializable
> org.apache.spark.SparkException: Task not serializable
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>     at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1847)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
>     at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>     at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
>     at
> kryo.KryoSerializerTest$$anonfun$1.apply$mcV$sp(KryoSerializerTest.scala:59)
>     at
> kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
>     at
> kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
>     at
> org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
>     at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>     at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>     at org.scalatest.Transformer.apply(Transformer.scala:22)
>     at org.scalatest.Transformer.apply(Transformer.scala:20)
>     at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
>     at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
>     at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
>     at
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
>     at
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>     at
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
>     at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>     at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
>     at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
>     at
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>     at
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
>     at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
>     at
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
>     at scala.collection.immutable.List.foreach(List.scala:381)
>     at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>     at org.scalatest.SuperEngine.org
> $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
>     at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
>     at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
>     at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
>     at org.scalatest.Suite$class.run(Suite.scala:1424)
>     at org.scalatest.FunSuite.org
> $scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
>     at
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
>     at
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
>     at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
>     at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
>     at kryo.KryoSerializerTest.org
> $scalatest$BeforeAndAfterAll$$super$run(KryoSerializerTest.scala:37)
>     at
> org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
>     at
> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
>     at kryo.KryoSerializerTest.run(KryoSerializerTest.scala:37)
>     at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
>     at
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
>     at
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
>     at scala.collection.immutable.List.foreach(List.scala:381)
>     at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
>     at
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
>     at
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
>     at
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
>     at
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
>     at org.scalatest.tools.Runner$.run(Runner.scala:883)
>     at org.scalatest.tools.Runner.run(Runner.scala)
>     at
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
>     at
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.io.IOException: java.lang.IllegalArgumentException: Class
> is not registered: scala.reflect.ClassTag$$anon$1
> Note: To register this class use:
> kryo.register(scala.reflect.ClassTag$$anon$1.class);
>     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
>     at org.apache.spark.RangePartitioner.writeObject(Partitioner.scala:209)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
> scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
>     at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
> scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
>     at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>     ... 65 more
> Caused by: java.lang.IllegalArgumentException: Class is not registered:
> scala.reflect.ClassTag$$anon$1
> Note: To register this class use:
> kryo.register(scala.reflect.ClassTag$$anon$1.class);
>     at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
>     at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>     at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
>     at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
>     at
> org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:220)
>     at
> org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:219)
>     at
> org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:128)
>     at
> org.apache.spark.RangePartitioner$$anonfun$writeObject$1.apply$mcV$sp(Partitioner.scala:219)
>     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
>     ... 124 more
>
>
> Moreover, registering the following class did not eliminate the
> SparkException:
>
>     classOf[scala.reflect.ClassTag[_]]
>
> My guess is that the IllegalArgumentException associated with
> scala.reflect.ClassTag$$anon$1 has nothing to do with the SparkException
> but I'm not certain.
>
> Note that none of the exceptions that are discussed above occur if the
> KryoSerializer is not enabled.  Also, none of the exceptions occur if
> spark.kryo.registrationRequired is not set to true on line 44.  So, even
> the SparkException, which does not complain of an unregistered class,
> appears to be related to the requirement for class registration that is
> specified on line 44.
>
> Note also that the SparkException occurs only for the sorted RDD.  In
> addition, no complaint of an unregistered class accompanies this
> SparkException, other than the complaint about the
> scala.reflect.ClassTag$$anon$1 class that I think isn't relevant (although
> I'm not certain).
>
> So, I have two questions:
>
> First, why does line 59 give rise to the SparkException for the sorted
> RDD, and in particular, in the context of class registration that
> eliminated complaints about unregistered classes for the unsorted RDD?  How
> might this SparkException be eliminated?
>
> Second, why does:
>
>         classOf[Array[Tuple3[_, _, _]]]
>
> eliminate the 'java.lang.IllegalArgumentException: Class is not
> registered: scala.Tuple3[]' when in fact drilling down into the debugger
> suggests that a more thorough class registration would be:
>
>         classOf[Array[Tuple3[Int, Int, Array[Long]]]]
>
> That is, why does the wildcard specification '_' suffice?  And would the
> more thorough specification be preferred, that is, would it result in a
> smaller Kryo serialized result?
>
> Thanks in advance for any insight that you can provide into this problem.
>
>
> package kryo
>
> import context.SharedSparkContext
> import org.apache.spark.{SparkContext, SparkConf}
> import org.scalatest.FunSuite
>
> class KryoSerializerTest extends FunSuite with SharedSparkContext with 
> Serializable {
>
>   test("kryo serializer") {
>
>     // Update the SparkContext to specify the KryoSerializer
>     val sparkConf: SparkConf = sc.getConf
>     sparkConf.set(s"spark.serializer", 
> s"org.apache.spark.serializer.KryoSerializer")
>     sparkConf.set(s"spark.kryo.registrationRequired", s"true")
>     sparkConf.registerKryoClasses(
>       Array(
>         classOf[scala.collection.mutable.WrappedArray.ofInt],
>         classOf[Array[Int]],
>         classOf[Array[Tuple3[_, _, _]]]
>       )
>     )
>     sc.stop
>     val sparkContext = new SparkContext(sparkConf)
>
>     val rdd = sparkContext.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 
> 12, 13, 14, 15, 16), 4)
>     val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => 
> Array(iter.size).iterator, true).collect
>     rddPartitionsSizes.foreach(ps => println(ps))
>     val sortedRdd = rdd.sortBy(e => e, true)
>     val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter 
> => Array(iter.size).iterator, true).collect
>     sortedRddPartitionsSizes.foreach(ps => println(ps))
>   }
>
> }
>
>

Reply via email to