Ramanjaneya Naidu Nalla created SPARK-28300:
-----------------------------------------------
Summary: Kmeans is failing when we run parallely passing an RDD
Key: SPARK-28300
URL: https://issues.apache.org/jira/browse/SPARK-28300
Project: Spark
Issue Type: Task
Components: ML
Affects Versions: 2.2.0
Reporter: Ramanjaneya Naidu Nalla
Hi,
I am facing when we run the spark KMEAN algorithm parallelising it by sending
sample RDD .
KMEAN algorithm run is failing on executor with as the sample RDD passed to
executors is becoming unavailable at executor side.
Can we pass RDD to executor to make KMEAN run in parallely?
Please find below code snippet and error in the logs
Regards,
Raman.
+Code snippet+
Driver side code ::
val kmeansCluster = sc.parallelize(List.range(kStart, kEnd + 1)).map(k => {
val sharedContext = SharedClusteringData[linalg.Vector,KMeansModel](job,
spark, sampleId, Some(k),
ClusteringType.KMEANS.name() + "clustering processes for:" + k)
//val sharedContextLoadSamplesCount = sharedContextLoadSample.clusterSample.get
//log.info(s"cluster sample count is ${sharedContextLoadSamplesCount.count()}")
sharedContext.selectedFeatureIdx =
Some(loadSample.value.selectedFeatureIdx.get)
sharedContext.dropColIdx = Some(loadSample.value.dropColIdx.get)
sharedContext.dataset = loadSample.value.dataset)
sharedContext.clusterSample= loadSample.value.clusterSample
println("In Driver program :::")
sharedContext.clusterSample.foreach(x=>println(x))
println("In Driver program END :::")
RunClustering.runKMean(sharedContext) match {
case Success(true) =>
log.info(s"${ClusteringType.KMEANS.name()} is completed for k =$k ")
case Success(false) =>
log.error(s"${ClusteringType.KMEANS.name()} is failed for k = $k")
case Failure(ex) =>
log.error(s"${ClusteringType.KMEANS.name} clustering failed for $k")
log.error(ex.getStackTrace.mkString("\n"))
}
(k, sharedContext.isSuccessful, sharedContext.message)
})
+Executor side+
def buildCluster[S, M](k: Int, clusterSample: RDD[S], maxIteration: Int):
Try[M] = {
Try(KMeans.train(kmeanSample, k, maxIteration).asInstanceOf[M])
}
Logs::
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
org.apache.spark.rdd.RDD.count(RDD.scala:1158)
com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33)
com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
scala.util.Try$.apply(Try.scala:192)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11)
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81)
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69)
scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
scala.collection.AbstractIterator.to(Iterator.scala:1336)
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) 2019-07-05 12:10:24,862 ERROR [Executor
task launch worker for task 449] clusteringprocessor.ClusterProcessor:
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
org.apache.spark.rdd.RDD.count(RDD.scala:1158)
com.mplatform.consumer.clustering.buildcluster.BuildKMeansCluster.getClustering(BuildKMeansCluster.scala:33)
com.mplatform.consumer.clustering.buildcluster.BuildCluster.run(BuildCluster.scala:14)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply$mcZ$sp(RunClustering.scala:14)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$$anonfun$runKMean$1.apply(RunClustering.scala:11)
scala.util.Try$.apply(Try.scala:192)
com.mplatform.consumer.clustering.clusteringprocessor.RunClustering$.runKMean(RunClustering.scala:11)
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:81)
com.mplatform.consumer.clustering.clusteringprocessor.ClusterProcessor$$anonfun$1.apply(ClusterProcessor.scala:69)
scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
scala.collection.AbstractIterator.to(Iterator.scala:1336)
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:108)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]