[ https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355088#comment-17355088 ]
Robert Joseph Evans commented on SPARK-35089: --------------------------------------------- I should add that the above "solution" is fragile because it relies on Spark to keep the call to {{monotonically_increasing_id}} in the same task the reads in the ORC data. It really would be best if Spark could automatically insert something like this automatically and then drop it later before writing/returning results. > non consistent results running count for same dataset after filter and lead > window function > ------------------------------------------------------------------------------------------- > > Key: SPARK-35089 > URL: https://issues.apache.org/jira/browse/SPARK-35089 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.1, 3.1.1 > Reporter: Domagoj > Priority: Major > > **** edit 2021-05-18 > I have make it simpler to reproduce; I've put already generated data on s3 > bucket that is publicly available with 24.000.000 records > Now all you need to do is run this code: > {code:java} > import org.apache.spark.sql.expressions.Window > import org.apache.spark.sql._ > import org.apache.spark.sql.functions._ > val w = Window.partitionBy("user").orderBy("start") > val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000)) > spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc"). > withColumn("end", ts_lead). > withColumn("duration", col("end")-col("start")). > where("type='TypeA' and duration>4").count() > {code} > > this were my results: > - run 1: 2547559 > - run 2: 2547559 > - run 3: 2547560 > - run 4: 2547558 > - run 5: 2547558 > - run 6: 2547559 > - run 7: 2547558 > This results are from new EMR cluster, version 6.3.0, so nothing changed. > **** end edit 2021-05-18 > I have found an inconsistency with count function results after lead window > function and filter. > > I have a dataframe (this is simplified version, but it's enough to reproduce) > with millions of records, with these columns: > * df1: > ** start(timestamp) > ** user_id(int) > ** type(string) > I need to define duration between two rows, and filter on that duration and > type. I used window lead function to get the next event time (that define end > for current event), so every row now gets start and stop times. If NULL (last > row for example), add next midnight as stop. Data is stored in ORC file > (tried with Parquet format, no difference) > This only happens with multiple cluster nodes, for example AWS EMR cluster or > local docker cluster setup. If I run it on single instance (local on laptop), > I get consistent results every time. Spark version is 3.0.1, both in AWS and > local and docker setup. > Here is some simple code that you can use to reproduce it, I've used > jupyterLab notebook on AWS EMR. Spark version is 3.0.1. > > > {code:java} > import org.apache.spark.sql.expressions.Window > // this dataframe generation code should be executed only once, and data have > to be saved, and then opened from disk, so it's always same. > val getRandomUser = udf(()=>{ > val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy") > users(scala.util.Random.nextInt(7)) > }) > val getRandomType = udf(()=>{ > val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE") > types(scala.util.Random.nextInt(5)) > }) > val getRandomStart = udf((x:Int)=>{ > x+scala.util.Random.nextInt(47) > }) > // for loop is used to avoid out of memory error during creation of dataframe > for( a <- 0 to 23){ > // use iterator a to continue with next million, repeat 1 mil times > val x=Range(a*1000000,(a*1000000)+1000000).toDF("id"). > withColumn("start",getRandomStart(col("id"))). > withColumn("user",getRandomUser()). > withColumn("type",getRandomType()). > drop("id") > x.write.mode("append").orc("hdfs:///random.orc") > } > // above code should be run only once, I used a cell in Jupyter > // define window and lead > val w = Window.partitionBy("user").orderBy("start") > // if null, replace with 30.000.000 > val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000)) > // read data to dataframe, create stop column and calculate duration > val fox2 = spark.read.orc("hdfs:///random.orc"). > withColumn("end", ts_lead). > withColumn("duration", col("end")-col("start")) > // repeated executions of this line returns different results for count > // I have it in separate cell in JupyterLab > fox2.where("type='TypeA' and duration>4").count() > {code} > My results for three consecutive runs of last line were: > * run 1: 2551259 > * run 2: 2550756 > * run 3: 2551279 > It's very important to say that if I use filter: > fox2.where("type='TypeA' ") > or > fox2.where("duration>4"), > > each of them can be executed repeatedly and I get consistent result every > time. > I can save dataframe after crating stop and duration columns, and after that, > I get consistent results every time. > It is not very practical workaround, as I need a lot of space and time to > implement it. > This dataset is really big (in my eyes at least, aprox 100.000.000 new > records per day). > If I run this same example on my local machine using master = local[*], > everything works as expected, it's just on cluster setup. I tried to create > cluster using docker on my local machine, created 3.0.1 and 3.1.1 clusters > with one master and two workers, and have successfully reproduced issue. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org