[
https://issues.apache.org/jira/browse/MAHOUT-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17065894#comment-17065894
]
Tariq Jawed commented on MAHOUT-2099:
-------------------------------------
[~Andrew_Palumbo] so I have added mahout-hdfs and removed the kryo as you
suggested, getting the below error:
{code:java}
20/03/24 15:10:18 ERROR ApplicationMaster: User class threw exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in
stage 14.0 (TID 67) had a not serializable result:
org.apache.mahout.math.DenseVector20/03/24 15:10:18 ERROR ApplicationMaster:
User class threw exception: org.apache.spark.SparkException: Job aborted due to
stage failure: Task 2.0 in stage 14.0 (TID 67) had a not serializable result:
org.apache.mahout.math.DenseVectorSerialization stack: - object not
serializable (class: org.apache.mahout.math.DenseVector, value:
{22:1.0,23:1.0}) - field (class: scala.Some, name: x, type: class
java.lang.Object) - object (class scala.Some,
Some({22:1.0,23:1.0}))org.apache.spark.SparkException: Job aborted due to stage
failure: Task 2.0 in stage 14.0 (TID 67) had a not serializable result:
org.apache.mahout.math.DenseVectorSerialization stack: - object not
serializable (class: org.apache.mahout.math.DenseVector, value:
{22:1.0,23:1.0}) - field (class: scala.Some, name: x, type: class
java.lang.Object) - object (class scala.Some, Some({22:1.0,23:1.0})) at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at
org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1029) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at
org.apache.spark.rdd.RDD.reduce(RDD.scala:1011) at
org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:81)
at
org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:36)
at
org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:397)
at
org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:77)
at
org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:147)
at xxx.MyDataTransformer.transform(MyDataTransformer.scala:49)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
{code}
Here is the code, its under development, I want to first get rid of the error
then I can complete the flow.
{code:java}
def transform(dataFrame: DataFrame, sparkSession: SparkSession): DataFrame = {
val transformed: DataFrame = dataFrame
transformed.printSchema()
transformed.show(10)
implicit val sc: SparkContext = sparkSession.sparkContext
implicit val msc: SparkDistributedContext = sc2sdc(sc)
var rdd: RDD[(String, String)] = transformed.rdd.map(row => (row.getString(0),
row.getString(1)))
var newDoc = IndexedDatasetSpark(rdd)(msc)
val llrDrmList: List[IndexedDataset] = SimilarityAnalysis.cooccurrencesIDSs(
Array(newDoc),
maxInterestingItemsPerThing = 25
)
val scores: Matrix = llrDrmList(0).matrix.collect
transformed
}{code}
> Using Mahout as a Library in Spark Cluster
> ------------------------------------------
>
> Key: MAHOUT-2099
> URL: https://issues.apache.org/jira/browse/MAHOUT-2099
> Project: Mahout
> Issue Type: Question
> Components: cooccurrence, Math
> Environment: Spark version 2.3.0.2.6.5.10-2
>
> [EDIT] AP
> Reporter: Tariq Jawed
> Priority: Major
>
> I have a Spark Cluster already setup, and this is the environment not in my
> direct control, but they do allow FAT JARs to be installed with the
> dependencies. I tried to package my Spark Application with some mahout code
> for SimilarityAnalysis, added Mahout library in POM file, and they are
> successfully packaged.
> The problem however is that I am getting this error while using existing
> Spark Context to build Distributed Spark Context for
> Mahout
> [EDIT]AP:
> {code:xml}
> pom.xml
> {...}
> dependency>
> <groupId>org.apache.mahout</groupId>
> <artifactId>mahout-math</artifactId>
> <version>0.13.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.mahout</groupId>
> <artifactId>mahout-math-scala_2.10</artifactId>
> <version>0.13.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.mahout</groupId>
> <artifactId>mahout-spark_2.10</artifactId>
> <version>0.13.0</version>
> </dependency>
> <dependency>
> <groupId>com.esotericsoftware</groupId>
> <artifactId>kryo</artifactId>
> <version>5.0.0-RC5</version>
> </dependency>
> {code}
>
> Code:
> {code}
> implicit val sc: SparkContext = sparkSession.sparkContext
> implicit val msc: SparkDistributedContext = sc2sdc(sc)
> Error:
> ERROR TaskSetManager: Task 7.0 in stage 10.0 (TID 58) had a not serializable
> result: org.apache.mahout.math.DenseVector
>
> And if I try to build the context using mahoutSparkContext() then its giving
> me the error that MAHOUT_HOME not found.
> Code:
> implicit val msc = mahoutSparkContext(masterUrl = "local", appName =
> "CooccurrenceDriver")
> Error:
> MAHOUT_HOME is required to spawn mahout-based spark jobs
> {code}
> My question is that how do I proceed in this situation? should I have to ask
> the administrators of the Spark environment to install Mahout library, or is
> there anyway I can proceed packaging my application as fat JAR.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)