One optimization is to reduce the shuffle by first aggregate locally (only keep 
the max for each name), and then reduceByKey.

Thanks.

Zhan Zhang

On Apr 24, 2015, at 10:03 PM, ayan guha 
<guha.a...@gmail.com<mailto:guha.a...@gmail.com>> wrote:

Here you go....

t = 
[["A",10,"A10"],["A",20,"A20"],["A",30,"A30"],["B",15,"B15"],["C",10,"C10"],["C",20,"C200"]]
    TRDD = sc.parallelize(t).map(lambda t: 
Row(name=str(t[0]),age=int(t[1]),other=str(t[2])))
    TDF = ssc.createDataFrame(TRDD)
    print TDF.printSchema()
    TDF.registerTempTable("tab")
    JN = ssc.sql("select t.name<http://t.name/>,t.age,t.other from tab t inner 
join (select name,max(age) age from tab group by name) t1 on 
t.name<http://t.name/>=t1.name<http://t1.name/> and t.age=t1.age")
    for i in JN.collect():
        print i

Result:
Row(name=u'A', age=30, other=u'A30')
Row(name=u'B', age=15, other=u'B15')
Row(name=u'C', age=20, other=u'C200')

On Sat, Apr 25, 2015 at 2:48 PM, Wenlei Xie 
<wenlei....@gmail.com<mailto:wenlei....@gmail.com>> wrote:
Sure. A simple example of data would be (there might be many other columns)

Name             Age        Other
A                   10            A10
A                    20           A20
A                    30           A30
B                    15           B15
C                    10            C10
C                    20           C20

The desired output would be
Name          Age        Other
A                 30           A30
B                 15           B15
C                 20           C20

Thank you so much for the help!

On Sat, Apr 25, 2015 at 12:41 AM, ayan guha 
<guha.a...@gmail.com<mailto:guha.a...@gmail.com>> wrote:
can you give an example set of data and desired output>

On Sat, Apr 25, 2015 at 2:32 PM, Wenlei Xie 
<wenlei....@gmail.com<mailto:wenlei....@gmail.com>> wrote:
Hi,

I would like to answer the following customized aggregation query on Spark SQL
1. Group the table by the value of Name
2. For each group, choose the tuple with the max value of Age (the ages are 
distinct for every name)

I am wondering what's the best way to do it on Spark SQL? Should I use UDAF? 
Previously I am doing something like the following on Spark:

personRDD.map(t => (t.name<http://t.name/>, t))
    .reduceByKey((a, b) => if (a.age > b.age) a else b)

Thank you!

Best,
Wenlei



--
Best Regards,
Ayan Guha



--
Wenlei Xie (谢文磊)

Ph.D. Candidate
Department of Computer Science
456 Gates Hall, Cornell University
Ithaca, NY 14853, USA
Email: wenlei....@gmail.com<mailto:wenlei....@gmail.com>



--
Best Regards,
Ayan Guha

Reply via email to