Thank you for your help. After restructuring my code to Seans input, it
worked without changing Spark context.  I now took the same file format just
a bigger file(2.7GB) from s3 to my cluster with 4 c3.xlarge instances and
Spark 1.0.2. Unluckly my task freezes again after a short time. I tried it
with cached and uncached RDDs. Are there some configurations to be made for
such big files and MLlib?

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.clustering.KMeansModel
import org.apache.spark.mllib.clustering.KMeansModel

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> 

scala> // Load and parse the data

scala> val data = sc.textFile("s3n://ampcamp-arigge/large_file.new.txt")
14/08/09 14:58:31 INFO storage.MemoryStore: ensureFreeSpace(35666) called
with curMem=0, maxMem=309225062
14/08/09 14:58:31 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 34.8 KB, free 294.9 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
<console>:15

scala> val parsedData = data.map(s => Vectors.dense(s.split('
').map(_.toDouble)))
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] =
MappedRDD[2] at map at <console>:17

scala> val train = parsedData.repartition(20).cache() 
train: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] =
MappedRDD[6] at repartition at <console>:19

scala> 

scala> // Set model and run it

scala> val model = new KMeans().
     | setInitializationMode("k-means||").
     | setK(2).setMaxIterations(2).
     | setEpsilon(1e-4).
     | setRuns(1).
     | run(parsedData)
14/08/09 14:58:33 WARN snappy.LoadSnappy: Snappy native library is available
14/08/09 14:58:33 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
14/08/09 14:58:33 INFO snappy.LoadSnappy: Snappy native library loaded
14/08/09 14:58:34 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/08/09 14:58:34 INFO spark.SparkContext: Starting job: takeSample at
KMeans.scala:260
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Got job 0 (takeSample at
KMeans.scala:260) with 2 output partitions (allowLocal=false)
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Final stage: Stage
0(takeSample at KMeans.scala:260)
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Missing parents: List()
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[10] at map at KMeans.scala:123), which has no missing parents
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 0 (MappedRDD[10] at map at KMeans.scala:123)
14/08/09 14:58:34 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
2 tasks
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
0 on executor 5: ip-172-31-16-25.ec2.internal (PROCESS_LOCAL)
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
2215 bytes in 2 ms
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
1 on executor 4: ip-172-31-16-24.ec2.internal (PROCESS_LOCAL)
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
2215 bytes in 1 ms





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-Input-Format-tp11654p11834.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to