[ 
https://issues.apache.org/jira/browse/MAHOUT-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17065894#comment-17065894
 ] 

Tariq Jawed commented on MAHOUT-2099:
-------------------------------------

[~Andrew_Palumbo] so I have added mahout-hdfs and removed the kryo as you 
suggested, getting the below error: 

 
{code:java}
20/03/24 15:10:18 ERROR ApplicationMaster: User class threw exception: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in 
stage 14.0 (TID 67) had a not serializable result: 
org.apache.mahout.math.DenseVector20/03/24 15:10:18 ERROR ApplicationMaster: 
User class threw exception: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 2.0 in stage 14.0 (TID 67) had a not serializable result: 
org.apache.mahout.math.DenseVectorSerialization stack: - object not 
serializable (class: org.apache.mahout.math.DenseVector, value: 
{22:1.0,23:1.0}) - field (class: scala.Some, name: x, type: class 
java.lang.Object) - object (class scala.Some, 
Some({22:1.0,23:1.0}))org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 2.0 in stage 14.0 (TID 67) had a not serializable result: 
org.apache.mahout.math.DenseVectorSerialization stack: - object not 
serializable (class: org.apache.mahout.math.DenseVector, value: 
{22:1.0,23:1.0}) - field (class: scala.Some, name: x, type: class 
java.lang.Object) - object (class scala.Some, Some({22:1.0,23:1.0})) at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
 at scala.Option.foreach(Option.scala:257) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at 
org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1029) at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at 
org.apache.spark.rdd.RDD.reduce(RDD.scala:1011) at 
org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:81)
 at 
org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:36)
 at 
org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:397)
 at 
org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:77)
 at 
org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:147)
 at xxx.MyDataTransformer.transform(MyDataTransformer.scala:49) 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
{code}
Here is the code, its under development, I want to first get rid of the error 
then I can complete the flow.
{code:java}
def transform(dataFrame: DataFrame, sparkSession: SparkSession): DataFrame = {

 val transformed: DataFrame = dataFrame

 transformed.printSchema()
 transformed.show(10)

 implicit val sc: SparkContext = sparkSession.sparkContext

 implicit val msc: SparkDistributedContext = sc2sdc(sc)

 var rdd: RDD[(String, String)] = transformed.rdd.map(row => (row.getString(0), 
row.getString(1)))

 var newDoc = IndexedDatasetSpark(rdd)(msc)

 val llrDrmList: List[IndexedDataset] = SimilarityAnalysis.cooccurrencesIDSs(
 Array(newDoc),
 maxInterestingItemsPerThing = 25
 )

 val scores: Matrix = llrDrmList(0).matrix.collect
 
 transformed
}{code}
 

> Using Mahout as a Library in Spark Cluster
> ------------------------------------------
>
>                 Key: MAHOUT-2099
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-2099
>             Project: Mahout
>          Issue Type: Question
>          Components: cooccurrence, Math
>         Environment: Spark version 2.3.0.2.6.5.10-2
>  
> [EDIT] AP
>            Reporter: Tariq Jawed
>            Priority: Major
>
> I have a Spark Cluster already setup, and this is the environment not in my 
> direct control, but they do allow FAT JARs to be installed with the 
> dependencies. I tried to package my Spark Application with some mahout code 
> for SimilarityAnalysis, added Mahout library in POM file, and they are 
> successfully packaged.
> The problem however is that I am getting this error while using existing 
> Spark Context to build Distributed Spark Context for
> Mahout
> [EDIT]AP:
> {code:xml}
> pom.xml
> {...}
> dependency>
>  <groupId>org.apache.mahout</groupId>
>  <artifactId>mahout-math</artifactId>
>  <version>0.13.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.mahout</groupId>
>  <artifactId>mahout-math-scala_2.10</artifactId>
>  <version>0.13.0</version>
>  </dependency>
>  <dependency>
>  <groupId>org.apache.mahout</groupId>
>  <artifactId>mahout-spark_2.10</artifactId>
>  <version>0.13.0</version>
>  </dependency>
>  <dependency>
>  <groupId>com.esotericsoftware</groupId>
>  <artifactId>kryo</artifactId>
>  <version>5.0.0-RC5</version>
>  </dependency>
>  {code}
>  
> Code:
> {code}
> implicit val sc: SparkContext = sparkSession.sparkContext
> implicit val msc: SparkDistributedContext = sc2sdc(sc)
> Error:
> ERROR TaskSetManager: Task 7.0 in stage 10.0 (TID 58) had a not serializable 
> result: org.apache.mahout.math.DenseVector
>  
> And if I try to build the context using mahoutSparkContext() then its giving 
> me the error that MAHOUT_HOME not found. 
> Code:
> implicit val msc = mahoutSparkContext(masterUrl = "local", appName = 
> "CooccurrenceDriver")
> Error:
> MAHOUT_HOME is required to spawn mahout-based spark jobs
>  {code}
> My question is that how do I proceed in this situation? should I have to ask 
> the administrators of the Spark environment to install Mahout library, or is 
> there anyway I can proceed packaging my application as fat JAR. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to