my assumption, which is apparently incorrect, was that the SQL gets translated into a catalyst plan that is executed in spark. the dataframe operations (referred to by Mich as the FP results) also get translated into a catalyst plan that is executed on the exact same spark platform. so unless the SQL gets translated into a much better plan (perhaps thanks to some pushdown into ORC?), i dont see why it can be much faster.
On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers <ko...@tresata.com> wrote: > i am still missing something. if it is executed in the source database, > which is hive in this case, then it does need hive, no? how can you execute > in hive without needing hive? > > On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan < > sabarish.sasidha...@manthan.com> wrote: > >> I never said it needs one. All I said is that when calling context.sql() >> the sql is executed in the source database (assuming datasource is Hive or >> some RDBMS) >> >> Regards >> Sab >> >> Regards >> Sab >> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <man...@gmail.com> wrote: >> >>> That is incorrect HiveContext does not need a hive instance to run. >>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" < >>> sabarish.sasidha...@manthan.com> wrote: >>> >>>> Yes >>>> >>>> Regards >>>> Sab >>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote: >>>> >>>>> are you saying that HiveContext.sql(...) runs on hive, and not on >>>>> spark sql? >>>>> >>>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan < >>>>> sabarish.sasidha...@manthan.com> wrote: >>>>> >>>>>> When using SQL your full query, including the joins, were executed in >>>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. >>>>>> In >>>>>> the FP case, the data for the 3 tables is first pulled into the Spark >>>>>> cluster and then the join is executed. >>>>>> >>>>>> Thus the time difference. >>>>>> >>>>>> It's not immediately obvious why the results are different. >>>>>> >>>>>> Regards >>>>>> Sab >>>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" < >>>>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> First thanks everyone for their suggestions. Much appreciated. >>>>>>> >>>>>>> This was the original queries written in SQL and run against >>>>>>> Spark-shell >>>>>>> >>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT >>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>>> ").collect.foreach(println) >>>>>>> HiveContext.sql("use oraclehadoop") >>>>>>> >>>>>>> val rs = HiveContext.sql( >>>>>>> """ >>>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS >>>>>>> TotalSales >>>>>>> FROM smallsales s >>>>>>> INNER JOIN times t >>>>>>> ON s.time_id = t.time_id >>>>>>> INNER JOIN channels c >>>>>>> ON s.channel_id = c.channel_id >>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc >>>>>>> """) >>>>>>> rs.registerTempTable("tmp") >>>>>>> println ("\nfirst query") >>>>>>> HiveContext.sql(""" >>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, >>>>>>> TotalSales >>>>>>> from tmp >>>>>>> ORDER BY MONTH, CHANNEL LIMIT 5 >>>>>>> """).collect.foreach(println) >>>>>>> println ("\nsecond query") >>>>>>> HiveContext.sql(""" >>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >>>>>>> FROM tmp >>>>>>> GROUP BY channel_desc >>>>>>> order by SALES DESC LIMIT 5 >>>>>>> """).collect.foreach(println) >>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT >>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>>> ").collect.foreach(println) >>>>>>> sys.exit >>>>>>> >>>>>>> The second queries were written in FP as much as I could as below >>>>>>> >>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT >>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>>> ").collect.foreach(println) >>>>>>> HiveContext.sql("use oraclehadoop") >>>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID >>>>>>> FROM sales") >>>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM >>>>>>> channels") >>>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM >>>>>>> times") >>>>>>> val rs = >>>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales")) >>>>>>> println ("\nfirst query") >>>>>>> val rs1 = >>>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println) >>>>>>> println ("\nsecond query") >>>>>>> val rs2 >>>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println) >>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT >>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>>>>>> ").collect.foreach(println) >>>>>>> sys.exit >>>>>>> >>>>>>> >>>>>>> >>>>>>> However The first query results are slightly different in SQL and FP >>>>>>> (may be the first query code in FP is not exactly correct?) and more >>>>>>> importantly the FP takes order of magnitude longer compared to SQL (8 >>>>>>> minutes compared to less than a minute). I am not surprised as I >>>>>>> expected >>>>>>> Functional Programming has to flatten up all those method calls and >>>>>>> convert >>>>>>> them to SQL? >>>>>>> >>>>>>> *The standard SQL results* >>>>>>> >>>>>>> >>>>>>> >>>>>>> Started at >>>>>>> [23/02/2016 23:55:30.30] >>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string] >>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >>>>>>> channel_desc: string, TotalSales: decimal(20,0)] >>>>>>> >>>>>>> first query >>>>>>> [1998-01,Direct Sales,9161730] >>>>>>> [1998-01,Internet,1248581] >>>>>>> [1998-01,Partners,2409776] >>>>>>> [1998-02,Direct Sales,9161840] >>>>>>> [1998-02,Internet,1533193] >>>>>>> >>>>>>> >>>>>>> >>>>>>> second query >>>>>>> [Direct Sales,9161840] >>>>>>> [Internet,3977374] >>>>>>> [Partners,3976291] >>>>>>> [Tele Sales,328760] >>>>>>> >>>>>>> Finished at >>>>>>> [23/02/2016 23:56:11.11] >>>>>>> >>>>>>> *The FP results* >>>>>>> >>>>>>> Started at >>>>>>> [23/02/2016 23:45:58.58] >>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string] >>>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), >>>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint] >>>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, >>>>>>> CHANNEL_DESC: string] >>>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp, >>>>>>> CALENDAR_MONTH_DESC: string] >>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string, >>>>>>> channel_desc: string, TotalSales: decimal(20,0)] >>>>>>> >>>>>>> first query >>>>>>> [1998-01,Direct Sales,9086830] >>>>>>> [1998-01,Internet,1247641] >>>>>>> [1998-01,Partners,2393567] >>>>>>> [1998-02,Direct Sales,9161840] >>>>>>> [1998-02,Internet,1533193] >>>>>>> rs1: Unit = () >>>>>>> >>>>>>> second query >>>>>>> [Direct Sales,9161840] >>>>>>> [Internet,3977374] >>>>>>> [Partners,3976291] >>>>>>> [Tele Sales,328760] >>>>>>> rs2: Unit = () >>>>>>> >>>>>>> Finished at >>>>>>> [23/02/2016 23:53:42.42] >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I have data stored in Hive tables that I want to do simple >>>>>>> manipulation. >>>>>>> >>>>>>> Currently in Spark I perform the following with getting the result >>>>>>> set using SQL from Hive tables, registering as a temporary table in >>>>>>> Spark >>>>>>> >>>>>>> Now Ideally I can get the result set into a DF and work on DF to >>>>>>> slice and dice the data using functional programming with filter, map. >>>>>>> split etc. >>>>>>> >>>>>>> I wanted to get some ideas on how to go about it. >>>>>>> >>>>>>> thanks >>>>>>> >>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>>>>>> >>>>>>> HiveContext.sql("use oraclehadoop") >>>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, >>>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales >>>>>>> FROM smallsales s, times t, channels c >>>>>>> WHERE s.time_id = t.time_id >>>>>>> AND s.channel_id = c.channel_id >>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc >>>>>>> """) >>>>>>> *rs.registerTempTable("tmp")* >>>>>>> >>>>>>> >>>>>>> HiveContext.sql(""" >>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, >>>>>>> TotalSales >>>>>>> from tmp >>>>>>> ORDER BY MONTH, CHANNEL >>>>>>> """).collect.foreach(println) >>>>>>> HiveContext.sql(""" >>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES >>>>>>> FROM tmp >>>>>>> GROUP BY channel_desc >>>>>>> order by SALES DESC >>>>>>> """).collect.foreach(println) >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> LinkedIn >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> NOTE: The information in this email is proprietary and confidential. >>>>>>> This message is for the designated recipient only, if you are not the >>>>>>> intended recipient, you should destroy it immediately. Any information >>>>>>> in this message shall not be understood as given or endorsed by Cloud >>>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless >>>>>>> expressly so stated. It is the responsibility of the recipient to >>>>>>> ensure that this email is virus free, therefore neither Cloud >>>>>>> Technology partners Ltd, its subsidiaries nor their employees accept >>>>>>> any responsibility. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> LinkedIn >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> NOTE: The information in this email is proprietary and confidential. >>>>>>> This message is for the designated recipient only, if you are not the >>>>>>> intended recipient, you should destroy it immediately. Any information >>>>>>> in this message shall not be understood as given or endorsed by Cloud >>>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless >>>>>>> expressly so stated. It is the responsibility of the recipient to >>>>>>> ensure that this email is virus free, therefore neither Cloud >>>>>>> Technology partners Ltd, its subsidiaries nor their employees accept >>>>>>> any responsibility. >>>>>>> >>>>>>> >>>>>>> >>>>> >