[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Domagoj updated SPARK-35089:
----------------------------
    Description: 
****   edit 2021-05-18

I have make it  simpler to reproduce; I've put already generated data on s3 
bucket that is publicly available with 24.000.000 records

Now all you need to do is run this code:
{code:java}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val w = Window.partitionBy("user").orderBy("start")
val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))
spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
 withColumn("end", ts_lead).
 withColumn("duration", col("end")-col("start")).
 where("type='TypeA' and duration>4").count()
{code}
 

this were my results:

- run 1: 2547559
- run 2: 2547559
- run 3: 2547560
- run 4: 2547558
- run 5: 2547558
- run 6: 2547559
- run 7: 2547558

****   end edit 2021-05-18

I have found an inconsistency with count function results after lead window 
function and filter.

 

I have a dataframe (this is simplified version, but it's enough to reproduce) 
with millions of records, with these columns:
 * df1:
 ** start(timestamp)
 ** user_id(int)
 ** type(string)

I need to define duration between two rows, and filter on that duration and 
type. I used window lead function to get the next event time (that define end 
for current event), so every row now gets start and stop times. If NULL (last 
row for example), add next midnight as stop. Data is stored in ORC file (tried 
with Parquet format, no difference)

This only happens with multiple cluster nodes, for example AWS EMR cluster or 
local docker cluster setup. If I run it on single instance (local on laptop), I 
get consistent results every time. Spark version is 3.0.1, both in AWS and 
local and docker setup.

Here is some simple code that you can use to reproduce it, I've used jupyterLab 
notebook on AWS EMR. Spark version is 3.0.1.

 

 
{code:java}
import org.apache.spark.sql.expressions.Window

// this dataframe generation code should be executed only once, and data have 
to be saved, and then opened from disk, so it's always same.

val getRandomUser = udf(()=>{
    val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
   users(scala.util.Random.nextInt(7))
})

val getRandomType = udf(()=>{
    val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
    types(scala.util.Random.nextInt(5))
})

val getRandomStart = udf((x:Int)=>{
    x+scala.util.Random.nextInt(47)
})
// for loop is used to avoid out of memory error during creation of dataframe
for( a <- 0 to 23){
        // use iterator a to continue with next million, repeat 1 mil times
        val x=Range(a*1000000,(a*1000000)+1000000).toDF("id").
            withColumn("start",getRandomStart(col("id"))).
            withColumn("user",getRandomUser()).
            withColumn("type",getRandomType()).
            drop("id")

        x.write.mode("append").orc("hdfs:///random.orc")
}

// above code should be run only once, I used a cell in Jupyter

// define window and lead
val w = Window.partitionBy("user").orderBy("start")
// if null, replace with 30.000.000
val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))

// read data to dataframe, create stop column and calculate duration
val fox2 = spark.read.orc("hdfs:///random.orc").
    withColumn("end", ts_lead).
    withColumn("duration", col("end")-col("start"))


// repeated executions of this line returns different results for count 
// I have it in separate cell in JupyterLab
fox2.where("type='TypeA' and duration>4").count()
{code}
My results for three consecutive runs of last line were:
 * run 1: 2551259
 * run 2: 2550756
 * run 3: 2551279

It's very important to say that if I use filter:

fox2.where("type='TypeA' ")

or 

fox2.where("duration>4"),

 

each of them can be executed repeatedly and I get consistent result every time.

I can save dataframe after crating stop and duration columns, and after that, I 
get consistent results every time.

It is not very practical workaround, as I need a lot of space and time to 
implement it.

This dataset is really big (in my eyes at least, aprox 100.000.000 new records 
per day).

If I run this same example on my local machine using master = local[*], 
everything works as expected, it's just on cluster setup. I tried to create 
cluster using docker on my local machine, created 3.0.1 and 3.1.1 clusters with 
one master and two workers, and have successfully reproduced issue.

 

 

 

 

 

  was:
I have found an inconsistency with count function results after lead window 
function and filter.

 

I have a dataframe (this is simplified version, but it's enough to reproduce) 
with millions of records, with these columns:
 * df1:
 ** start(timestamp)
 ** user_id(int)
 ** type(string)

I need to define duration between two rows, and filter on that duration and 
type. I used window lead function to get the next event time (that define end 
for current event), so every row now gets start and stop times. If NULL (last 
row for example), add next midnight as stop. Data is stored in ORC file (tried 
with Parquet format, no difference)

This only happens with multiple cluster nodes, for example AWS EMR cluster or 
local docker cluster setup. If I run it on single instance (local on laptop), I 
get consistent results every time. Spark version is 3.0.1, both in AWS and 
local and docker setup.

Here is some simple code that you can use to reproduce it, I've used jupyterLab 
notebook on AWS EMR. Spark version is 3.0.1.

 

 
{code:java}
import org.apache.spark.sql.expressions.Window

// this dataframe generation code should be executed only once, and data have 
to be saved, and then opened from disk, so it's always same.

val getRandomUser = udf(()=>{
    val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
   users(scala.util.Random.nextInt(7))
})

val getRandomType = udf(()=>{
    val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
    types(scala.util.Random.nextInt(5))
})

val getRandomStart = udf((x:Int)=>{
    x+scala.util.Random.nextInt(47)
})
// for loop is used to avoid out of memory error during creation of dataframe
for( a <- 0 to 23){
        // use iterator a to continue with next million, repeat 1 mil times
        val x=Range(a*1000000,(a*1000000)+1000000).toDF("id").
            withColumn("start",getRandomStart(col("id"))).
            withColumn("user",getRandomUser()).
            withColumn("type",getRandomType()).
            drop("id")

        x.write.mode("append").orc("hdfs:///random.orc")
}

// above code should be run only once, I used a cell in Jupyter

// define window and lead
val w = Window.partitionBy("user").orderBy("start")
// if null, replace with 30.000.000
val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))

// read data to dataframe, create stop column and calculate duration
val fox2 = spark.read.orc("hdfs:///random.orc").
    withColumn("end", ts_lead).
    withColumn("duration", col("end")-col("start"))


// repeated executions of this line returns different results for count 
// I have it in separate cell in JupyterLab
fox2.where("type='TypeA' and duration>4").count()
{code}
My results for three consecutive runs of last line were:
 * run 1: 2551259
 * run 2: 2550756
 * run 3: 2551279

It's very important to say that if I use filter:

fox2.where("type='TypeA' ")

or 

fox2.where("duration>4"),

 

each of them can be executed repeatedly and I get consistent result every time.

I can save dataframe after crating stop and duration columns, and after that, I 
get consistent results every time.

It is not very practical workaround, as I need a lot of space and time to 
implement it.

This dataset is really big (in my eyes at least, aprox 100.000.000 new records 
per day).

If I run this same example on my local machine using master = local[*], 
everything works as expected, it's just on cluster setup. I tried to create 
cluster using docker on my local machine, created 3.0.1 and 3.1.1 clusters with 
one master and two workers, and have successfully reproduced issue.

 

 

 

 

 


> non consistent results running count for same dataset after filter and lead 
> window function
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35089
>                 URL: https://issues.apache.org/jira/browse/SPARK-35089
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.1, 3.1.1
>            Reporter: Domagoj
>            Priority: Major
>
> ****   edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
> - run 1: 2547559
> - run 2: 2547559
> - run 3: 2547560
> - run 4: 2547558
> - run 5: 2547558
> - run 6: 2547559
> - run 7: 2547558
> ****   end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
>     val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>    users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
>     val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
>     types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
>     x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
>         // use iterator a to continue with next million, repeat 1 mil times
>         val x=Range(a*1000000,(a*1000000)+1000000).toDF("id").
>             withColumn("start",getRandomStart(col("id"))).
>             withColumn("user",getRandomUser()).
>             withColumn("type",getRandomType()).
>             drop("id")
>         x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
>     withColumn("end", ts_lead).
>     withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
>  * run 1: 2551259
>  * run 2: 2550756
>  * run 3: 2551279
> It's very important to say that if I use filter:
> fox2.where("type='TypeA' ")
> or 
> fox2.where("duration>4"),
>  
> each of them can be executed repeatedly and I get consistent result every 
> time.
> I can save dataframe after crating stop and duration columns, and after that, 
> I get consistent results every time.
> It is not very practical workaround, as I need a lot of space and time to 
> implement it.
> This dataset is really big (in my eyes at least, aprox 100.000.000 new 
> records per day).
> If I run this same example on my local machine using master = local[*], 
> everything works as expected, it's just on cluster setup. I tried to create 
> cluster using docker on my local machine, created 3.0.1 and 3.1.1 clusters 
> with one master and two workers, and have successfully reproduced issue.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to