Hi, if you need the last value from income in window function you can use last_value. No tested but meaby with @ayan sql
spark.sql("select *, row_number(), last_value(income) over (partition by id order by income_age_ts desc) r from t") On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com> wrote: > @ayan, > > Thanks for your response > > I would like to have functions in this case calculateIncome and the > reason why I need function is to reuse in other parts of the application > ..that's the reason I'm planning for mapgroups with function as argument > which takes rowiterator ..but not sure if this is the best to implement as > my initial dataframe is very large > > On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: > >> Hi >> >> the tool you are looking for is window function. Example: >> >> >>> df.show() >> +--------+----+---+------+-------------+ >> |JoinDate|dept| id|income|income_age_ts| >> +--------+----+---+------+-------------+ >> | 4/20/13| ES|101| 19000| 4/20/17| >> | 4/20/13| OS|101| 10000| 10/3/15| >> | 4/20/12| DS|102| 13000| 5/9/17| >> | 4/20/12| CS|102| 12000| 5/8/17| >> | 4/20/10| EQ|103| 10000| 5/9/17| >> | 4/20/10| MD|103| 9000| 5/8/17| >> +--------+----+---+------+-------------+ >> >> >>> res = spark.sql("select *, row_number() over (partition by id order >> by income_age_ts desc) r from t") >> >>> res.show() >> +--------+----+---+------+-------------+---+ >> |JoinDate|dept| id|income|income_age_ts| r| >> +--------+----+---+------+-------------+---+ >> | 4/20/10| EQ|103| 10000| 5/9/17| 1| >> | 4/20/10| MD|103| 9000| 5/8/17| 2| >> | 4/20/13| ES|101| 19000| 4/20/17| 1| >> | 4/20/13| OS|101| 10000| 10/3/15| 2| >> | 4/20/12| DS|102| 13000| 5/9/17| 1| >> | 4/20/12| CS|102| 12000| 5/8/17| 2| >> +--------+----+---+------+-------------+---+ >> >> >>> res = spark.sql("select * from (select *, row_number() over >> (partition by id order by income_age_ts desc) r from t) x where r=1") >> >>> res.show() >> +--------+----+---+------+-------------+---+ >> |JoinDate|dept| id|income|income_age_ts| r| >> +--------+----+---+------+-------------+---+ >> | 4/20/10| EQ|103| 10000| 5/9/17| 1| >> | 4/20/13| ES|101| 19000| 4/20/17| 1| >> | 4/20/12| DS|102| 13000| 5/9/17| 1| >> +--------+----+---+------+-------------+---+ >> >> This should be better because it uses all in-built optimizations in Spark. >> >> Best >> Ayan >> >> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Please click on unnamed text/html link for better view >>> >>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> >>>> ---------- Forwarded message --------- >>>> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@ >>>> capitalone.com> >>>> Date: Tue, Aug 29, 2017 at 8:08 PM >>>> Subject: Spark question >>>> To: purna pradeep <purna2prad...@gmail.com> >>>> >>>> Below is the input Dataframe(In real this is a very large Dataframe) >>>> >>>> >>>> >>>> EmployeeID >>>> >>>> INCOME >>>> >>>> INCOME AGE TS >>>> >>>> JoinDate >>>> >>>> Dept >>>> >>>> 101 >>>> >>>> 19000 >>>> >>>> 4/20/17 >>>> >>>> 4/20/13 >>>> >>>> ES >>>> >>>> 101 >>>> >>>> 10000 >>>> >>>> 10/3/15 >>>> >>>> 4/20/13 >>>> >>>> OS >>>> >>>> 102 >>>> >>>> 13000 >>>> >>>> 5/9/17 >>>> >>>> 4/20/12 >>>> >>>> DS >>>> >>>> 102 >>>> >>>> 12000 >>>> >>>> 5/8/17 >>>> >>>> 4/20/12 >>>> >>>> CS >>>> >>>> 103 >>>> >>>> 10000 >>>> >>>> 5/9/17 >>>> >>>> 4/20/10 >>>> >>>> EQ >>>> >>>> 103 >>>> >>>> 9000 >>>> >>>> 5/8/15 >>>> >>>> 4/20/10 >>>> >>>> MD >>>> >>>> Get the latest income of an employee which has Income_age ts <10 months >>>> >>>> Expected output Dataframe >>>> >>>> EmployeeID >>>> >>>> INCOME >>>> >>>> INCOME AGE TS >>>> >>>> JoinDate >>>> >>>> Dept >>>> >>>> 101 >>>> >>>> 19000 >>>> >>>> 4/20/17 >>>> >>>> 4/20/13 >>>> >>>> ES >>>> >>>> 102 >>>> >>>> 13000 >>>> >>>> 5/9/17 >>>> >>>> 4/20/12 >>>> >>>> DS >>>> >>>> 103 >>>> >>>> 10000 >>>> >>>> 5/9/17 >>>> >>>> 4/20/10 >>>> >>>> EQ >>>> >>>> >>>> >>> >>> >>> >>> >>> >>> Below is what im planning to implement >>>> >>>> >>>> >>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, >>>> *JOINDATE*: Int,DEPT:String) >>>> >>>> >>>> >>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( >>>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, >>>> *"Date"*). add(*"DEPT"*,*"String"*) >>>> >>>> >>>> >>>> *//Reading from the File **import *sparkSession.implicits._ >>>> >>>> *val *readEmpFile = sparkSession.read >>>> .option(*"sep"*, *","*) >>>> .schema(empSchema) >>>> .csv(INPUT_DIRECTORY) >>>> >>>> >>>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] >>>> >>>> >>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* >>>> EmployeeID*) >>>> >>>> >>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value)) >>>> >>>> >>>> >>>> >>>> >>>> *def *performETL(empData: Iterator[employee]) : new employee = { >>>> >>>> *val *empList = empData.toList >>>> >>>> >>>> *//calculate income has Logic to figureout latest income for an account >>>> and returns latest income val *income = calculateIncome(empList) >>>> >>>> >>>> *for *(i <- empList) { >>>> >>>> *val *row = i >>>> >>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income) >>>> } >>>> *return "Done"* >>>> >>>> >>>> >>>> } >>>> >>>> >>>> >>>> Is this a better approach or even the right approach to implement the >>>> same.If not please suggest a better way to implement the same? >>>> >>>> >>>> >>>> ------------------------------ >>>> >>>> The information contained in this e-mail is confidential and/or >>>> proprietary to Capital One and/or its affiliates and may only be used >>>> solely in performance of work or services for Capital One. The information >>>> transmitted herewith is intended only for use by the individual or entity >>>> to which it is addressed. If the reader of this message is not the intended >>>> recipient, you are hereby notified that any review, retransmission, >>>> dissemination, distribution, copying or other use of, or taking of any >>>> action in reliance upon this information is strictly prohibited. If you >>>> have received this communication in error, please contact the sender and >>>> delete the material from your computer. >>>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > -- Ing. Ivaldi Andres