[
https://issues.apache.org/jira/browse/SPARK-28300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-28300.
----------------------------------
Resolution: Invalid
Looks like a question. Let's interact with mailing list first before filing it
as an issue. Check out [https://spark.apache.org/community.html].
Also, if possible, test it at least above 2.3.x. Lower versions are EOL
releases.
> Kmeans is failing when we run parallely passing an RDD
> ------------------------------------------------------
>
> Key: SPARK-28300
> URL: https://issues.apache.org/jira/browse/SPARK-28300
> Project: Spark
> Issue Type: Task
> Components: ML
> Affects Versions: 2.2.0
> Reporter: Ramanjaneya Naidu Nalla
> Priority: Major
>
> Hi,
> I am facing when we run the spark KMEAN algorithm parallelising it by sending
> sample RDD .
> KMEAN algorithm run is failing on executor when we pass cluster sample as RDD
> type (
> RDD[linalg.Vector] to executors. It is failing because RDD[linalg.Vector]
> unavailable at executor side.
> Can we pass RDD to executor to make KMEAN run in parallely ?
> Please suggest any suggestion how to achieve KMEAN running parrelly on
> executors?
> Please find below code snippet and error in the logs
> Regards,
> Raman.
> +Code snippet+
> Driver side code ::
> val kmeansCluster = sc.parallelize(List.range(kStart, kEnd + 1)).map(k => {
> val sharedContext = SharedClusteringData[linalg.Vector,KMeansModel](job,
> spark, sampleId, Some(k),
> ClusteringType.KMEANS.name() + "clustering processes for:" + k)
> //val sharedContextLoadSamplesCount =
> sharedContextLoadSample.clusterSample.get
> //log.info(s"cluster sample count is
> ${sharedContextLoadSamplesCount.count()}")
> sharedContext.selectedFeatureIdx =
> Some(loadSample.value.selectedFeatureIdx.get)
> sharedContext.dropColIdx = Some(loadSample.value.dropColIdx.get)
> sharedContext.dataset = loadSample.value.dataset)
> sharedContext.clusterSample= loadSample.value.clusterSample
> println("In Driver program :::")
> sharedContext.clusterSample.foreach(x=>println(x))
> println("In Driver program END :::")
> RunClustering.runKMean(sharedContext) match {
> case Success(true) =>
> log.info(s"${ClusteringType.KMEANS.name()} is completed for k =$k ")
> case Success(false) =>
> log.error(s"${ClusteringType.KMEANS.name()} is failed for k = $k")
> case Failure(ex) =>
> log.error(s"${ClusteringType.KMEANS.name} clustering failed for $k")
> log.error(ex.getStackTrace.mkString("\n"))
> }
> (k, sharedContext.isSuccessful, sharedContext.message)
> })
> +Executor side+
> def buildCluster[S, M](k: Int, clusterSample: RDD[S], maxIteration: Int):
> Try[M] =
> { Try(KMeans.train(kmeanSample, k, maxIteration).asInstanceOf[M]) }
> Logs::
>
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
> org.apache.spark.rdd.RDD.count(RDD.scala:1158)
> com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33)
>
> com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14)
>
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14)
>
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
>
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
> scala.util.Try$.apply(Try.scala:192)
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11)
>
> com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81)
>
> com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> scala.collection.AbstractIterator.to(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:108)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748) 2019-07-05 12:10:24,862 ERROR
> [Executor task launch worker for task 449]
> clusteringprocessor.ClusterProcessor:
> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
> org.apache.spark.rdd.RDD.count(RDD.scala:1158)
> com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33)
>
> com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14)
>
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14)
>
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
>
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
> scala.util.Try$.apply(Try.scala:192)
> com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11)
>
> com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81)
>
> com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> scala.collection.AbstractIterator.to(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:108)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]