For your second question, bq. Class is not registered: scala.Tuple3[]
The above IllegalArgumentException has stated the class Scala was expecting registration. Meaning the type of components in the tuple is insignificant. BTW what Spark release are you using ? Cheers On Thu, Dec 31, 2015 at 9:49 AM, Russ <russ.br...@yahoo.com.invalid> wrote: > The ScalaTest code that is enclosed at the end of this email message > demonstrates what appears to be a bug in the KryoSerializer. This code was > executed from IntelliJ IDEA (community edition) under Mac OS X 10.11.2 > > The KryoSerializer is enabled by updating the original SparkContext (that > is supplied by the ScalaTest) via: > > 1. reading the SparkConf from the SparkContext, > 2. updating the SparkConf to enable the KryoSerializer, > 3. stopping the original SparkContext, and > 4. creating a new SparkContext from the updated SparkConf. > > Following enabling of the KryoSerializer, execution of the following line > (line 56): > > val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => > Array(iter.size).iterator, true).collect > > threw the following three instances of IllegalArgumentException: > > java.lang.IllegalArgumentException: Class is not registered: > scala.collection.mutable.WrappedArray$ofInt > java.lang.IllegalArgumentException: Class is not registered: int[] > java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[] > > which prompted registration of the following three classes with the > KryoSerializer via the SparkConf.registerKryoClasses() method: > > classOf[scala.collection.mutable.WrappedArray.ofInt], > classOf[Array[Int]], > classOf[Array[Tuple3[_, _, _]]] > > Following registration of these three classes with the KryoSerializer, the > above-indicated 'val rddPartitionsSizes...' line (line 56) executed without > throwing an IllegalArgumentException. > > However, execution of the following line (line 59): > > val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter => > Array(iter.size).iterator, true).collect > > threw the following SparkException: > > Task not serializable > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2030) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1847) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.collect(RDD.scala:904) > at > kryo.KryoSerializerTest$$anonfun$1.apply$mcV$sp(KryoSerializerTest.scala:59) > at > kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39) > at > kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39) > at > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) > at org.scalatest.Suite$class.withFixture(Suite.scala:1122) > at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) > at > org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) > at > org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) > at > org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) > at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) > at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) > at scala.collection.immutable.List.foreach(List.scala:381) > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) > at org.scalatest.SuperEngine.org > $scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) > at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) > at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) > at org.scalatest.Suite$class.run(Suite.scala:1424) > at org.scalatest.FunSuite.org > $scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) > at > org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) > at > org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) > at org.scalatest.SuperEngine.runImpl(Engine.scala:545) > at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) > at kryo.KryoSerializerTest.org > $scalatest$BeforeAndAfterAll$$super$run(KryoSerializerTest.scala:37) > at > org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) > at > org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) > at kryo.KryoSerializerTest.run(KryoSerializerTest.scala:37) > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) > at scala.collection.immutable.List.foreach(List.scala:381) > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) > at > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) > at > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) > at org.scalatest.tools.Runner$.run(Runner.scala:883) > at org.scalatest.tools.Runner.run(Runner.scala) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) > at > org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: java.io.IOException: java.lang.IllegalArgumentException: Class > is not registered: scala.reflect.ClassTag$$anon$1 > Note: To register this class use: > kryo.register(scala.reflect.ClassTag$$anon$1.class); > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) > at org.apache.spark.RangePartitioner.writeObject(Partitioner.scala:209) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) > at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) > at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > ... 65 more > Caused by: java.lang.IllegalArgumentException: Class is not registered: > scala.reflect.ClassTag$$anon$1 > Note: To register this class use: > kryo.register(scala.reflect.ClassTag$$anon$1.class); > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) > at > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158) > at > org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:220) > at > org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:219) > at > org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:128) > at > org.apache.spark.RangePartitioner$$anonfun$writeObject$1.apply$mcV$sp(Partitioner.scala:219) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) > ... 124 more > > > Moreover, registering the following class did not eliminate the > SparkException: > > classOf[scala.reflect.ClassTag[_]] > > My guess is that the IllegalArgumentException associated with > scala.reflect.ClassTag$$anon$1 has nothing to do with the SparkException > but I'm not certain. > > Note that none of the exceptions that are discussed above occur if the > KryoSerializer is not enabled. Also, none of the exceptions occur if > spark.kryo.registrationRequired is not set to true on line 44. So, even > the SparkException, which does not complain of an unregistered class, > appears to be related to the requirement for class registration that is > specified on line 44. > > Note also that the SparkException occurs only for the sorted RDD. In > addition, no complaint of an unregistered class accompanies this > SparkException, other than the complaint about the > scala.reflect.ClassTag$$anon$1 class that I think isn't relevant (although > I'm not certain). > > So, I have two questions: > > First, why does line 59 give rise to the SparkException for the sorted > RDD, and in particular, in the context of class registration that > eliminated complaints about unregistered classes for the unsorted RDD? How > might this SparkException be eliminated? > > Second, why does: > > classOf[Array[Tuple3[_, _, _]]] > > eliminate the 'java.lang.IllegalArgumentException: Class is not > registered: scala.Tuple3[]' when in fact drilling down into the debugger > suggests that a more thorough class registration would be: > > classOf[Array[Tuple3[Int, Int, Array[Long]]]] > > That is, why does the wildcard specification '_' suffice? And would the > more thorough specification be preferred, that is, would it result in a > smaller Kryo serialized result? > > Thanks in advance for any insight that you can provide into this problem. > > > package kryo > > import context.SharedSparkContext > import org.apache.spark.{SparkContext, SparkConf} > import org.scalatest.FunSuite > > class KryoSerializerTest extends FunSuite with SharedSparkContext with > Serializable { > > test("kryo serializer") { > > // Update the SparkContext to specify the KryoSerializer > val sparkConf: SparkConf = sc.getConf > sparkConf.set(s"spark.serializer", > s"org.apache.spark.serializer.KryoSerializer") > sparkConf.set(s"spark.kryo.registrationRequired", s"true") > sparkConf.registerKryoClasses( > Array( > classOf[scala.collection.mutable.WrappedArray.ofInt], > classOf[Array[Int]], > classOf[Array[Tuple3[_, _, _]]] > ) > ) > sc.stop > val sparkContext = new SparkContext(sparkConf) > > val rdd = sparkContext.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, > 12, 13, 14, 15, 16), 4) > val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter => > Array(iter.size).iterator, true).collect > rddPartitionsSizes.foreach(ps => println(ps)) > val sortedRdd = rdd.sortBy(e => e, true) > val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter > => Array(iter.size).iterator, true).collect > sortedRddPartitionsSizes.foreach(ps => println(ps)) > } > > } > >