Ramanjaneya Naidu Nalla created SPARK-28300:
-----------------------------------------------

             Summary: Kmeans is failing when we run parallely passing an RDD
                 Key: SPARK-28300
                 URL: https://issues.apache.org/jira/browse/SPARK-28300
             Project: Spark
          Issue Type: Task
          Components: ML
    Affects Versions: 2.2.0
            Reporter: Ramanjaneya Naidu Nalla


Hi,

I am facing when we run the spark KMEAN algorithm parallelising it by sending 
sample RDD .

KMEAN algorithm run is failing on executor with as the sample RDD passed to 
executors is becoming unavailable at executor side.

Can we pass RDD  to executor to make KMEAN run in parallely?  

Please find  below code snippet and error in the logs

Regards,

Raman.

+Code snippet+

Driver side  code ::

val kmeansCluster = sc.parallelize(List.range(kStart, kEnd + 1)).map(k => {
 val sharedContext = SharedClusteringData[linalg.Vector,KMeansModel](job, 
spark, sampleId, Some(k),
 ClusteringType.KMEANS.name() + "clustering processes for:" + k)
 //val sharedContextLoadSamplesCount = sharedContextLoadSample.clusterSample.get
 //log.info(s"cluster sample count is ${sharedContextLoadSamplesCount.count()}")
 sharedContext.selectedFeatureIdx = 
Some(loadSample.value.selectedFeatureIdx.get)
 sharedContext.dropColIdx = Some(loadSample.value.dropColIdx.get)
 sharedContext.dataset = loadSample.value.dataset)
 sharedContext.clusterSample= loadSample.value.clusterSample
 println("In Driver program :::")
 sharedContext.clusterSample.foreach(x=>println(x))
 println("In Driver program END :::")
 RunClustering.runKMean(sharedContext) match {
 case Success(true) =>
 log.info(s"${ClusteringType.KMEANS.name()} is completed for k =$k ")
 case Success(false) =>
 log.error(s"${ClusteringType.KMEANS.name()} is failed for k = $k")
 case Failure(ex) =>
 log.error(s"${ClusteringType.KMEANS.name} clustering failed for $k")
 log.error(ex.getStackTrace.mkString("\n"))
 }
 (k, sharedContext.isSuccessful, sharedContext.message)
})

+Executor side+ 

 def buildCluster[S, M](k: Int, clusterSample: RDD[S], maxIteration: Int): 
Try[M] = {

Try(KMeans.train(kmeanSample, k, maxIteration).asInstanceOf[M])

}

Logs::

 

org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) 
org.apache.spark.rdd.RDD.count(RDD.scala:1158) 
com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33)
 
com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14)
 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14)
 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
 scala.util.Try$.apply(Try.scala:192) 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11)
 
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81)
 
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
scala.collection.Iterator$class.foreach(Iterator.scala:893) 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
scala.collection.AbstractIterator.to(Iterator.scala:1336) 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
org.apache.spark.scheduler.Task.run(Task.scala:108) 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
java.lang.Thread.run(Thread.java:748) 2019-07-05 12:10:24,862 ERROR [Executor 
task launch worker for task 449] clusteringprocessor.ClusterProcessor: 
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) 
org.apache.spark.rdd.RDD.count(RDD.scala:1158) 
com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33)
 
com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14)
 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14)
 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
 scala.util.Try$.apply(Try.scala:192) 
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11)
 
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81)
 
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
scala.collection.Iterator$class.foreach(Iterator.scala:893) 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
scala.collection.AbstractIterator.to(Iterator.scala:1336) 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936) 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
org.apache.spark.scheduler.Task.run(Task.scala:108) 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to