Thank you for your help. After restructuring my code to Seans input, it worked without changing Spark context. I now took the same file format just a bigger file(2.7GB) from s3 to my cluster with 4 c3.xlarge instances and Spark 1.0.2. Unluckly my task freezes again after a short time. I tried it with cached and uncached RDDs. Are there some configurations to be made for such big files and MLlib?
scala> import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.clustering.KMeans scala> import org.apache.spark.mllib.clustering.KMeansModel import org.apache.spark.mllib.clustering.KMeansModel scala> import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vectors scala> scala> // Load and parse the data scala> val data = sc.textFile("s3n://ampcamp-arigge/large_file.new.txt") 14/08/09 14:58:31 INFO storage.MemoryStore: ensureFreeSpace(35666) called with curMem=0, maxMem=309225062 14/08/09 14:58:31 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 34.8 KB, free 294.9 MB) data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:15 scala> val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))) parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at <console>:17 scala> val train = parsedData.repartition(20).cache() train: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[6] at repartition at <console>:19 scala> scala> // Set model and run it scala> val model = new KMeans(). | setInitializationMode("k-means||"). | setK(2).setMaxIterations(2). | setEpsilon(1e-4). | setRuns(1). | run(parsedData) 14/08/09 14:58:33 WARN snappy.LoadSnappy: Snappy native library is available 14/08/09 14:58:33 INFO util.NativeCodeLoader: Loaded the native-hadoop library 14/08/09 14:58:33 INFO snappy.LoadSnappy: Snappy native library loaded 14/08/09 14:58:34 INFO mapred.FileInputFormat: Total input paths to process : 1 14/08/09 14:58:34 INFO spark.SparkContext: Starting job: takeSample at KMeans.scala:260 14/08/09 14:58:34 INFO scheduler.DAGScheduler: Got job 0 (takeSample at KMeans.scala:260) with 2 output partitions (allowLocal=false) 14/08/09 14:58:34 INFO scheduler.DAGScheduler: Final stage: Stage 0(takeSample at KMeans.scala:260) 14/08/09 14:58:34 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/08/09 14:58:34 INFO scheduler.DAGScheduler: Missing parents: List() 14/08/09 14:58:34 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[10] at map at KMeans.scala:123), which has no missing parents 14/08/09 14:58:34 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[10] at map at KMeans.scala:123) 14/08/09 14:58:34 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 14/08/09 14:58:34 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 5: ip-172-31-16-25.ec2.internal (PROCESS_LOCAL) 14/08/09 14:58:34 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2215 bytes in 2 ms 14/08/09 14:58:34 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 4: ip-172-31-16-24.ec2.internal (PROCESS_LOCAL) 14/08/09 14:58:34 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 2215 bytes in 1 ms -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-Input-Format-tp11654p11834.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org