One related question, is mllib jar independent from hadoop version (doesnt use hadoop api directly)? Can I use mllib jar compile for one version of hadoop and use it in another version of hadoop?
Sent from my Google Nexus 5 On Aug 6, 2014 8:29 AM, "Debasish Das" <debasish.da...@gmail.com> wrote: > Hi Xiangrui, > > Maintaining another file will be a pain later so I deployed spark 1.0.1 > without mllib and then my application jar bundles mllib 1.1.0-SNAPSHOT > along with the code changes for quadratic optimization... > > Later the plan is to patch the snapshot mllib with the deployed stable > mllib... > > There are 5 variants that I am experimenting with around 400M ratings > (daily data, monthly data I will update in few days)... > > 1. LS > 2. NNLS > 3. Quadratic with bounds > 4. Quadratic with L1 > 5. Quadratic with equality and positivity > > Now the ALS 1.1.0 snapshot runs fine but after completion on this step > ALS.scala:311 > > // Materialize usersOut and productsOut. > usersOut.count() > > I am getting from one of the executors: java.lang.ClassCastException: > scala.Tuple1 cannot be cast to scala.Product2 > > I am debugging it further but I was wondering if this is due to RDD > compatibility within 1.0.1 and 1.1.0-SNAPSHOT ? > > I have built the jars on my Mac which has Java 1.7.0_55 but the deployed > cluster has Java 1.7.0_45. > > The flow runs fine on my localhost spark 1.0.1 with 1 worker. Can that Java > version mismatch cause this ? > > Stack traces are below > > Thanks. > Deb > > > Executor stacktrace: > > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:156) > > > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$4.apply(CoGroupedRDD.scala:154) > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:154) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:126) > > > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:123) > > > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:123) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > > org.apache.spark.scheduler.Task.run(Task.scala:51) > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:744) > > Driver stacktrace: > > at org.apache.spark.scheduler.DAGScheduler.org > > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) > > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) > > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) > > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) > > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) > > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) > > at scala.Option.foreach(Option.scala:236) > > at > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) > > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > On Tue, Aug 5, 2014 at 5:59 PM, Debasish Das <debasish.da...@gmail.com> > wrote: > > > Hi Xiangrui, > > > > I used your idea and kept a cherry picked version of ALS.scala in my > > application and call it ALSQp.scala...this is a OK workaround for now > till > > a version adds up to master for example... > > > > For the bug with userClassPathFirst, looks like Koert already found this > > issue in the following JIRA: > > > > https://issues.apache.org/jira/browse/SPARK-1863 > > > > By the way the userClassPathFirst feature is very useful since I am sure > > the deployed version of spark on a production cluster will always be the > > last stable (core at 1.0.1 in my case) and people would like to deploy > > SNAPSHOT versions of libraries that build on top of spark core (mllib, > > streaming etc)... > > > > Another way is to have a build option that deploys only the core and not > > the libraries built upon core... > > > > Do we have an option like that in make-distribution script ? > > > > Thanks. > > Deb > > > > > > On Tue, Aug 5, 2014 at 10:37 AM, Xiangrui Meng <men...@gmail.com> wrote: > > > >> If you cannot change the Spark jar deployed on the cluster, an easy > >> solution would be renaming ALS in your jar. If userClassPathFirst > >> doesn't work, could you create a JIRA and attach the log? Thanks! > >> -Xiangrui > >> > >> On Tue, Aug 5, 2014 at 9:10 AM, Debasish Das <debasish.da...@gmail.com> > >> wrote: > >> > I created the assembly file but still it wants to pick the mllib from > >> the > >> > cluster: > >> > > >> > jar tf ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar | grep > >> > QuadraticMinimizer > >> > > >> > org/apache/spark/mllib/optimization/QuadraticMinimizer$$anon$1.class > >> > > >> > /Users/v606014/dist-1.0.1/bin/spark-submit --master > >> > spark://TUSCA09LMLVT00C.local:7077 --class ALSDriver > >> > ./target/ml-0.0.1-SNAPSHOT-jar-with-dependencies.jar inputPath > >> outputPath > >> > > >> > Exception in thread "main" java.lang.NoSuchMethodError: > >> > > >> > org.apache.spark.mllib.recommendation.ALS.setLambdaL1(D)Lorg/apache/spark/mllib/recommendation/ALS; > >> > > >> > Now if I force it to use the jar that I gave using > >> > spark.files.userClassPathFirst, then it fails on some serialization > >> > issues... > >> > > >> > A simple solution is to cherry pick the files I need from spark branch > >> to > >> > the application branch but I am not sure that's the right thing to > do... > >> > > >> > The way userClassPathFirst is behaving, there might be bugs in it... > >> > > >> > Any suggestions will be appreciated.... > >> > > >> > Thanks. > >> > Deb > >> > > >> > > >> > On Sat, Aug 2, 2014 at 11:12 AM, Xiangrui Meng <men...@gmail.com> > >> wrote: > >> >> > >> >> Yes, that should work. spark-mllib-1.1.0 should be compatible with > >> >> spark-core-1.0.1. > >> >> > >> >> On Sat, Aug 2, 2014 at 10:54 AM, Debasish Das < > >> debasish.da...@gmail.com> > >> >> wrote: > >> >> > Let me try it... > >> >> > > >> >> > Will this be fixed if I generate a assembly file with mllib-1.1.0 > >> >> > SNAPSHOT > >> >> > jar and other dependencies with the rest of the application code ? > >> >> > > >> >> > > >> >> > > >> >> > On Sat, Aug 2, 2014 at 10:46 AM, Xiangrui Meng <men...@gmail.com> > >> wrote: > >> >> >> > >> >> >> You can try enabling "spark.files.userClassPathFirst". But I'm not > >> >> >> sure whether it could solve your problem. -Xiangrui > >> >> >> > >> >> >> On Sat, Aug 2, 2014 at 10:13 AM, Debasish Das > >> >> >> <debasish.da...@gmail.com> > >> >> >> wrote: > >> >> >> > Hi, > >> >> >> > > >> >> >> > I have deployed spark stable 1.0.1 on the cluster but I have new > >> code > >> >> >> > that > >> >> >> > I added in mllib-1.1.0-SNAPSHOT. > >> >> >> > > >> >> >> > I am trying to access the new code using spark-submit as > follows: > >> >> >> > > >> >> >> > spark-job --class com.verizon.bda.mllib.recommendation.ALSDriver > >> >> >> > --executor-memory 16g --total-executor-cores 16 --jars > >> >> >> > spark-mllib_2.10-1.1.0-SNAPSHOT.jar,scopt_2.10-3.2.0.jar > >> >> >> > sag-core-0.0.1-SNAPSHOT.jar --rank 25 --numIterations 10 > --lambda > >> 1.0 > >> >> >> > --qpProblem 2 inputPath outputPath > >> >> >> > > >> >> >> > I can see the jars are getting added to httpServer as expected: > >> >> >> > > >> >> >> > 14/08/02 12:50:04 INFO SparkContext: Added JAR > >> >> >> > > >> file:/vzhome/v606014/spark-glm/spark-mllib_2.10-1.1.0-SNAPSHOT.jar at > >> >> >> > > >> http://10.145.84.20:37798/jars/spark-mllib_2.10-1.1.0-SNAPSHOT.jar > >> >> >> > with > >> >> >> > timestamp 1406998204236 > >> >> >> > > >> >> >> > 14/08/02 12:50:04 INFO SparkContext: Added JAR > >> >> >> > file:/vzhome/v606014/spark-glm/scopt_2.10-3.2.0.jar at > >> >> >> > http://10.145.84.20:37798/jars/scopt_2.10-3.2.0.jar with > >> timestamp > >> >> >> > 1406998204237 > >> >> >> > > >> >> >> > 14/08/02 12:50:04 INFO SparkContext: Added JAR > >> >> >> > file:/vzhome/v606014/spark-glm/sag-core-0.0.1-SNAPSHOT.jar at > >> >> >> > http://10.145.84.20:37798/jars/sag-core-0.0.1-SNAPSHOT.jar with > >> >> >> > timestamp > >> >> >> > 1406998204238 > >> >> >> > > >> >> >> > But the job still can't access code form mllib-1.1.0 > >> SNAPSHOT.jar...I > >> >> >> > think > >> >> >> > it's picking up the mllib from cluster which is at 1.0.1... > >> >> >> > > >> >> >> > Please help. I will ask for a PR tomorrow but internally we want > >> to > >> >> >> > generate results from the new code. > >> >> >> > > >> >> >> > Thanks. > >> >> >> > > >> >> >> > Deb > >> >> > > >> >> > > >> > > >> > > >> > > > > >