Thanks so much! It works! Is it the standard way for Mllib models to be serialized?
Btw. The example I pasted below works if one implements a TestSuite with MLlibTestSparkContext. -----Original Message----- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Monday, March 09, 2015 12:10 PM To: Ulanov, Alexander Cc: Akhil Das; dev Subject: Re: Loading previously serialized object to Spark Could you try `sc.objectFile` instead? sc.parallelize(Seq(model), 1).saveAsObjectFile("path") val sameModel = sc.objectFile[NaiveBayesModel]("path").first() -Xiangrui On Mon, Mar 9, 2015 at 11:52 AM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Just tried, the same happens if I use the internal Spark serializer: > val serializer = SparkEnv.get.closureSerializer.newInstance > > > -----Original Message----- > From: Ulanov, Alexander > Sent: Monday, March 09, 2015 10:37 AM > To: Akhil Das > Cc: dev > Subject: RE: Loading previously serialized object to Spark > > Below is the code with standard MLlib class. Apparently this issue can happen > in the same Spark instance. > > import java.io._ > > import org.apache.spark.mllib.classification.NaiveBayes > import org.apache.spark.mllib.classification.NaiveBayesModel > import org.apache.spark.mllib.util.MLUtils > > val data = MLUtils.loadLibSVMFile(sc, > "hdfs://myserver:9000/data/mnist.scale") > val nb = NaiveBayes.train(data) > // RDD map works fine > val predictionAndLabels = data.map( lp => > (nb.classifierModel.predict(lp.features), lp.label)) > > // serialize the model to file and immediately load it val oos = new > ObjectOutputStream(new FileOutputStream("/home/myuser/nb.bin")) > oos.writeObject(nb) > oos.close > val ois = new ObjectInputStream(new > FileInputStream("/home/myuser/nb.bin")) > val nbSerialized = ois.readObject.asInstanceOf[NaiveBayesModel] > ois.close > // RDD map fails > val predictionAndLabels = data.map( lp => > (nbSerialized.predict(lp.features), lp.label)) > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1453) > at org.apache.spark.rdd.RDD.map(RDD.scala:273) > > > From: Akhil Das [mailto:ak...@sigmoidanalytics.com] > Sent: Sunday, March 08, 2015 3:17 AM > To: Ulanov, Alexander > Cc: dev > Subject: Re: Loading previously serialized object to Spark > > Can you paste the complete code? > > Thanks > Best Regards > > On Sat, Mar 7, 2015 at 2:25 AM, Ulanov, Alexander > <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: > Hi, > > I've implemented class MyClass in MLlib that does some operation on > LabeledPoint. MyClass extends serializable, so I can map this operation on > data of RDD[LabeledPoints], such as data.map(lp => MyClass.operate(lp)). I > write this class in file with ObjectOutputStream.writeObject. Then I stop and > restart Spark. I load this class from file with > ObjectInputStream.readObject.asInstanceOf[MyClass]. When I try to map the > same operation of this class to RDD, Spark throws not serializable exception: > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1453) > at org.apache.spark.rdd.RDD.map(RDD.scala:273) > > Could you suggest why it throws this exception while MyClass is serializable > by definition? > > Best regards, Alexander >