I have data streaming into my spark scala application in this format

id    mark1 mark2 mark3 time
uuid1 100   200   300   Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:58 PDT 2017

I have it read into columns id, mark1, mark2, mark3 and time. The time is
converted to datetime format as well. I want to get this grouped by id and
get the lag for mark1 which gives the previous row's mark1 value. Something
like this:

id    mark1 mark2 mark3 prev_mark time
uuid1 100   200   300   null      Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   100       Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   null      Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   150       Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   150       Tue Aug  8 14:06:58 PDT 2017

Consider the dataframe to be markDF. I have tried:

val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF =
newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

which says non time windows cannot be applied on streaming/appending
datasets/frames.

I have also tried:

val window =
Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val
newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

To get a window for few rows which did not work either. The streaming
window something like: window("timestamp", "10 minutes") cannot be used to
send over the lag. I am super confused on how to do this. Any help would be
awesome!!

Reply via email to