my assumption, which is apparently incorrect, was that the SQL gets
translated into a catalyst plan that is executed in spark. the dataframe
operations (referred to by Mich as the FP results) also get translated into
a catalyst plan that is executed on the exact same spark platform. so
unless the SQL gets translated into a much better plan (perhaps thanks to
some pushdown into ORC?), i dont see why it can be much faster.




On Wed, Feb 24, 2016 at 2:59 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i am still missing something. if it is executed in the source database,
> which is hive in this case, then it does need hive, no? how can you execute
> in hive without needing hive?
>
> On Wed, Feb 24, 2016 at 1:25 PM, Sabarish Sasidharan <
> sabarish.sasidha...@manthan.com> wrote:
>
>> I never said it needs one. All I said is that when calling context.sql()
>> the sql is executed in the source database (assuming datasource is Hive or
>> some RDBMS)
>>
>> Regards
>> Sab
>>
>> Regards
>> Sab
>> On 24-Feb-2016 11:49 pm, "Mohannad Ali" <man...@gmail.com> wrote:
>>
>>> That is incorrect HiveContext does not need a hive instance to run.
>>> On Feb 24, 2016 19:15, "Sabarish Sasidharan" <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
>>>> Yes
>>>>
>>>> Regards
>>>> Sab
>>>> On 24-Feb-2016 9:15 pm, "Koert Kuipers" <ko...@tresata.com> wrote:
>>>>
>>>>> are you saying that HiveContext.sql(...) runs on hive, and not on
>>>>> spark sql?
>>>>>
>>>>> On Wed, Feb 24, 2016 at 1:27 AM, Sabarish Sasidharan <
>>>>> sabarish.sasidha...@manthan.com> wrote:
>>>>>
>>>>>> When using SQL your full query, including the joins, were executed in
>>>>>> Hive(or RDBMS) and only the results were brought into the Spark cluster. 
>>>>>> In
>>>>>> the FP case, the data for the 3 tables is first pulled into the Spark
>>>>>> cluster and then the join is executed.
>>>>>>
>>>>>> Thus the time difference.
>>>>>>
>>>>>> It's not immediately obvious why the results are different.
>>>>>>
>>>>>> Regards
>>>>>> Sab
>>>>>> On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
>>>>>> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> First thanks everyone for their suggestions. Much appreciated.
>>>>>>>
>>>>>>> This was the original queries written in SQL and run against
>>>>>>> Spark-shell
>>>>>>>
>>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>>
>>>>>>> val rs = HiveContext.sql(
>>>>>>> """
>>>>>>> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
>>>>>>> TotalSales
>>>>>>> FROM smallsales s
>>>>>>> INNER JOIN times t
>>>>>>> ON s.time_id = t.time_id
>>>>>>> INNER JOIN channels c
>>>>>>> ON s.channel_id = c.channel_id
>>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>>> """)
>>>>>>> rs.registerTempTable("tmp")
>>>>>>> println ("\nfirst query")
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>>> TotalSales
>>>>>>> from tmp
>>>>>>> ORDER BY MONTH, CHANNEL LIMIT 5
>>>>>>> """).collect.foreach(println)
>>>>>>> println ("\nsecond query")
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>>> FROM tmp
>>>>>>> GROUP BY channel_desc
>>>>>>> order by SALES DESC LIMIT 5
>>>>>>> """).collect.foreach(println)
>>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> sys.exit
>>>>>>>
>>>>>>> The second queries were written in FP as much as I could as below
>>>>>>>
>>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID
>>>>>>> FROM sales")
>>>>>>> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM
>>>>>>> channels")
>>>>>>> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
>>>>>>> times")
>>>>>>> val rs =
>>>>>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>>>>>> println ("\nfirst query")
>>>>>>> val rs1 =
>>>>>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>>>>> println ("\nsecond query")
>>>>>>> val rs2
>>>>>>> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
>>>>>>> println ("\nFinished at"); HiveContext.sql("SELECT
>>>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>>>> ").collect.foreach(println)
>>>>>>> sys.exit
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> However The first query results are slightly different in SQL and FP
>>>>>>> (may be the first query code in FP is not exactly correct?) and more
>>>>>>> importantly the FP takes order of magnitude longer compared to SQL (8
>>>>>>> minutes compared to less than a minute). I am not surprised as I 
>>>>>>> expected
>>>>>>> Functional Programming has to flatten up all those method calls and 
>>>>>>> convert
>>>>>>> them to SQL?
>>>>>>>
>>>>>>> *The standard SQL results*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Started at
>>>>>>> [23/02/2016 23:55:30.30]
>>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>>
>>>>>>> first query
>>>>>>> [1998-01,Direct Sales,9161730]
>>>>>>> [1998-01,Internet,1248581]
>>>>>>> [1998-01,Partners,2409776]
>>>>>>> [1998-02,Direct Sales,9161840]
>>>>>>> [1998-02,Internet,1533193]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> second query
>>>>>>> [Direct Sales,9161840]
>>>>>>> [Internet,3977374]
>>>>>>> [Partners,3976291]
>>>>>>> [Tele Sales,328760]
>>>>>>>
>>>>>>> Finished at
>>>>>>> [23/02/2016 23:56:11.11]
>>>>>>>
>>>>>>> *The FP results*
>>>>>>>
>>>>>>> Started at
>>>>>>> [23/02/2016 23:45:58.58]
>>>>>>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>>>>>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
>>>>>>> TIME_ID: timestamp, CHANNEL_ID: bigint]
>>>>>>> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double,
>>>>>>> CHANNEL_DESC: string]
>>>>>>> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
>>>>>>> CALENDAR_MONTH_DESC: string]
>>>>>>> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
>>>>>>> channel_desc: string, TotalSales: decimal(20,0)]
>>>>>>>
>>>>>>> first query
>>>>>>> [1998-01,Direct Sales,9086830]
>>>>>>> [1998-01,Internet,1247641]
>>>>>>> [1998-01,Partners,2393567]
>>>>>>> [1998-02,Direct Sales,9161840]
>>>>>>> [1998-02,Internet,1533193]
>>>>>>> rs1: Unit = ()
>>>>>>>
>>>>>>> second query
>>>>>>> [Direct Sales,9161840]
>>>>>>> [Internet,3977374]
>>>>>>> [Partners,3976291]
>>>>>>> [Tele Sales,328760]
>>>>>>> rs2: Unit = ()
>>>>>>>
>>>>>>> Finished at
>>>>>>> [23/02/2016 23:53:42.42]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have data stored in Hive tables that I want to do simple
>>>>>>> manipulation.
>>>>>>>
>>>>>>> Currently in Spark I perform the following with getting the result
>>>>>>> set using SQL from Hive tables, registering as a temporary table in 
>>>>>>> Spark
>>>>>>>
>>>>>>> Now Ideally I can get the result set into a DF and work on DF to
>>>>>>> slice and dice the data using functional programming with filter, map.
>>>>>>> split etc.
>>>>>>>
>>>>>>> I wanted to get some ideas on how to go about it.
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>>>
>>>>>>> HiveContext.sql("use oraclehadoop")
>>>>>>> val rs = HiveContext.sql("""SELECT t.calendar_month_desc,
>>>>>>> c.channel_desc, SUM(s.amount_sold) AS TotalSales
>>>>>>> FROM smallsales s, times t, channels c
>>>>>>> WHERE s.time_id = t.time_id
>>>>>>> AND   s.channel_id = c.channel_id
>>>>>>> GROUP BY t.calendar_month_desc, c.channel_desc
>>>>>>> """)
>>>>>>> *rs.registerTempTable("tmp")*
>>>>>>>
>>>>>>>
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL,
>>>>>>> TotalSales
>>>>>>> from tmp
>>>>>>> ORDER BY MONTH, CHANNEL
>>>>>>> """).collect.foreach(println)
>>>>>>> HiveContext.sql("""
>>>>>>> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
>>>>>>> FROM tmp
>>>>>>> GROUP BY channel_desc
>>>>>>> order by SALES DESC
>>>>>>> """).collect.foreach(println)
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>> LinkedIn  
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>> NOTE: The information in this email is proprietary and confidential. 
>>>>>>> This message is for the designated recipient only, if you are not the 
>>>>>>> intended recipient, you should destroy it immediately. Any information 
>>>>>>> in this message shall not be understood as given or endorsed by Cloud 
>>>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless 
>>>>>>> expressly so stated. It is the responsibility of the recipient to 
>>>>>>> ensure that this email is virus free, therefore neither Cloud 
>>>>>>> Technology partners Ltd, its subsidiaries nor their employees accept 
>>>>>>> any responsibility.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>> LinkedIn  
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>> NOTE: The information in this email is proprietary and confidential. 
>>>>>>> This message is for the designated recipient only, if you are not the 
>>>>>>> intended recipient, you should destroy it immediately. Any information 
>>>>>>> in this message shall not be understood as given or endorsed by Cloud 
>>>>>>> Technology Partners Ltd, its subsidiaries or their employees, unless 
>>>>>>> expressly so stated. It is the responsibility of the recipient to 
>>>>>>> ensure that this email is virus free, therefore neither Cloud 
>>>>>>> Technology partners Ltd, its subsidiaries nor their employees accept 
>>>>>>> any responsibility.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>

Reply via email to