Well spotted Sab. You are correct. An oversight by me. They should both
use "sales". 

The results are now comparable 

The following statement 

"On the other hand using SQL the query 1 takes 19 seconds compared to
just under 4 minutes for functional programming 

The seconds query using SQL takes 28 seconds. Using FP it takes around 4
minutes." 

Should be amended to 

"Using SQL query 1 takes 3 min, 39 sec compared to 3 min, 44 sec using
FP 

Using SQL query 2 takes 3 min, 36 sec compared to 3 min, 53 sec using
FP" 

FP lags slightly behind SQL but not by any significant margin. 

Thanks 

On 24/02/2016 18:20, Sabarish Sasidharan wrote: 

> One more, you are referring to 2 different sales tables. That might account 
> for the difference in numbers. 
> 
> Regards
> Sab 
> On 24-Feb-2016 9:50 pm, "Mich Talebzadeh" 
> <mich.talebza...@cloudtechnologypartners.co.uk> wrote:
> 
> HI, 
> 
> TOOLS 
> 
> SPARK 1.5.2, HADOOP 2.6, HIVE 2.0, SPARK-SHELL, HIVE DATABASE 
> 
> OBJECTIVES: TIMING DIFFERENCES BETWEEN RUNNING SPARK USING SQL AND RUNNING 
> SPARK USING FUNCTIONAL PROGRAMING (FP) (FUNCTIONAL CALLS) ON HIVE TABLES 
> 
> UNDERLYING TABLES: THREE TABLES IN HIVE DATABASE USING ORC FORMAT 
> 
> The main differences in timings come from running the queries and fetching 
> data. If you look the transformation part that is 
> 
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>  
> 
> Takes I second. On the other hand using SQL the query 1 takes 19 seconds 
> compared to just under 4 minutes for functional programming 
> 
> The seconds query using SQL takes 28 seconds. Using FP it takes around 4 
> minutes. 
> 
> These are my assumptions. 
> 
> * Running SQL the full query is executed in Hive which means that Hive can 
> take advantage of ORC optimization/storage index etc?
> * Running FP requires that data is fetched from the underlying tables in Hive 
> and brought back to Spark cluster (standalone here) and the joins etc are 
> done there
> 
> The next step for me would be to: 
> 
> * Look at the query plans in Spark
> * Run the same code on Hive alone and compare results
> 
> Any other suggestions are welcome. 
> 
> STANDARD SQL CODE 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> println ("ncreating data set at "); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc
> , c.channel_desc
> , SUM(s.amount_sold) AS TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("nfirst query at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 09:00:50.50]
> res1: org.apache.spark.sql.DataFrame = [result: string] 
> 
> creating data set at [24/02/2016 09:00:53.53]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, 
> channel_desc: string, TotalSales: decimal(20,0) 
> 
> First query at [24/02/2016 09:00:54.54]
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193] 
> 
> second query at [24/02/2016 09:01:13.13]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760] 
> 
> Finished at [24/02/2016 09:01:31.31 
> 
> CODE USING FUNCTIONAL PROGRAMMING 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("nStarted at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = 
> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> println ("ncreating data set at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> val rs = 
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("nfirst query at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> val rs1 = 
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("nsecond query at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> val rs2 
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("nFinished at"); HiveContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') 
> ").collect.foreach(println)
> sys.exit 
> 
> RESULTS 
> 
> Started at [24/02/2016 08:52:27.27]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID: 
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC: string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, CALENDAR_MONTH_DESC: 
> string] 
> 
> creating data set at [24/02/2016 08:52:30.30]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, 
> channel_desc: string, TotalSales: decimal(20,0)] 
> 
> first query at [24/02/2016 08:52:31.31]
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = () 
> 
> second query at [24/02/2016 08:56:17.17]
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = () 
> 
> Finished at
> [24/02/2016 09:00:14.14] 
> 
> On 24/02/2016 06:27, Sabarish Sasidharan wrote: 
> 
> When using SQL your full query, including the joins, were executed in Hive(or 
> RDBMS) and only the results were brought into the Spark cluster. In the FP 
> case, the data for the 3 tables is first pulled into the Spark cluster and 
> then the join is executed. 
> 
> Thus the time difference. 
> 
> It's not immediately obvious why the results are different. 
> 
> Regards
> Sab

-- 

Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

NOTE: The information in this email is proprietary and confidential.
This message is for the designated recipient only, if you are not the
intended recipient, you should destroy it immediately. Any information
in this message shall not be understood as given or endorsed by Cloud
Technology Partners Ltd, its subsidiaries or their employees, unless
expressly so stated. It is the responsibility of the recipient to ensure
that this email is virus free, therefore neither Cloud Technology
partners Ltd, its subsidiaries nor their employees accept any
responsibility.

 

Reply via email to