Spark SQL doesn't support the DISTINCT well currently, particularly the case 
you described, it will leads all of the data fall into a single node and keep 
them in memory only.
Dev community actually has solutions for this, it probably will be solved after 
the release of Spark 1.2.

-----Original Message-----
From: SK [mailto:skrishna...@gmail.com] 
Sent: Wednesday, November 26, 2014 4:17 PM
To: u...@spark.incubator.apache.org
Subject: Spark SQL performance and data size constraints

Hi,

I use the following code to read in data and extract the unique users using 
Spark SQL. The data is 1.2 TB and I am running this on a cluster with 3 TB 
memory. It appears that there is enough memory, but the program just freezes 
after sometime where it maps the rdd to the case class Play.  (If I dont use 
the Spark SQL portion (i.e dont map to the case class and register the table
etc.)  and merely load the data (first 3 lines of the code below) then the 
program completes.)

I  tried with  spark.storage.memoryFraction=0.5 and 0.6 (default) as suggested 
in the Tuning guide. but that did not help.
According to the logs, total # of partitions/tasks is 38688 and size of each 
rdd partition for the mapping to the case class is around 31 MB. So total rdd 
size is 38688*31 = 1.2 TB. This is less than the 3 TB memory on the cluster. At 
the time the program stops, the total number of tasks is a little < 38688 with 
some of them appearing as failed. There are no details for why the tasks 
failed. 

Are there any maximum data size constraints in Spark SQL or table creation that 
might be causing the program to hang? Are there any performance optimizations I 
could try with Spark SQL that might allow the completion of the task?


     val data = sc.textFile("shared_dir/*.dat")
                        .map(_.split("\t"))
                        .persist(MEMORY_AND_DISK_SER)


     val play =     data.map(f => Play(f(0).trim,f(1).trim, f(2).trim,
f(3).trim))
                               .persist(MEMORY_AND_DISK_SER)

     // register the RDD as a table
     play.registerTempTable("play")

     val ids = sql_cxt.sql("SELECT  DISTINCT id  FROM play")

     println("Number of unique account ID = %d".format(ids.count()))
     println("Number of RDDs = %d".format(play.count()))

thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-and-data-size-constraints-tp19843.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to