[
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346704#comment-17346704
]
Domagoj commented on SPARK-35089:
---------------------------------
I've made test data available via public s3 bucket, so it's easier to reproduce
now.
> non consistent results running count for same dataset after filter and lead
> window function
> -------------------------------------------------------------------------------------------
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.1, 3.1.1
> Reporter: Domagoj
> Priority: Major
>
> **** edit 2021-05-18
> I have make it simpler to reproduce; I've put already generated data on s3
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start")).
> where("type='TypeA' and duration>4").count()
> {code}
>
> this were my results:
> - run 1: 2547559
> - run 2: 2547559
> - run 3: 2547560
> - run 4: 2547558
> - run 5: 2547558
> - run 6: 2547559
> - run 7: 2547558
> **** end edit 2021-05-18
> I have found an inconsistency with count function results after lead window
> function and filter.
>
> I have a dataframe (this is simplified version, but it's enough to reproduce)
> with millions of records, with these columns:
> * df1:
> ** start(timestamp)
> ** user_id(int)
> ** type(string)
> I need to define duration between two rows, and filter on that duration and
> type. I used window lead function to get the next event time (that define end
> for current event), so every row now gets start and stop times. If NULL (last
> row for example), add next midnight as stop. Data is stored in ORC file
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or
> local docker cluster setup. If I run it on single instance (local on laptop),
> I get consistent results every time. Spark version is 3.0.1, both in AWS and
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>
>
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
> users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*1000000,(a*1000000)+1000000).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
> * run 1: 2551259
> * run 2: 2550756
> * run 3: 2551279
> It's very important to say that if I use filter:
> fox2.where("type='TypeA' ")
> or
> fox2.where("duration>4"),
>
> each of them can be executed repeatedly and I get consistent result every
> time.
> I can save dataframe after crating stop and duration columns, and after that,
> I get consistent results every time.
> It is not very practical workaround, as I need a lot of space and time to
> implement it.
> This dataset is really big (in my eyes at least, aprox 100.000.000 new
> records per day).
> If I run this same example on my local machine using master = local[*],
> everything works as expected, it's just on cluster setup. I tried to create
> cluster using docker on my local machine, created 3.0.1 and 3.1.1 clusters
> with one master and two workers, and have successfully reproduced issue.
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]