[ 
https://issues.apache.org/jira/browse/SPARK-29265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-29265:
-------------------------------------
    Description: 
Hi,

 

I had this problem in "real" environments and also made a self-contained test ( 
[^Test.scala] attached).

Having this Window definition:
{code:scala}
val myWindow = Window.partitionBy($"word").orderBy("word") 
val filt2 = filtrador.withColumn("avg_Time", 
avg($"number").over(myWindow)){code}
 

As a user, I would expect either:

1- Error/warning (because trying to sort on one of the columns of the window 
partitionBy)

2- A mostly-useless operation which just orders the rows inside each Window but 
doesn't affect performance too much.

 

Currently what I see:

*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is 
performing a global orderBy of the whole DataFrame. Similar to 
dataframe.orderBy("word").*

*In my real environment, my program just didn't finish in time/crashed thus 
causing my program to be very slow or crash (because as it's a global orderBy, 
it will just go to one executor).*

 

In the test I can see how all elements of my DF are in a single partition 
(side-effect of the global orderBy) 

 

Full Code showing the error (see how the mapPartitions shows 99 rows in one 
partition) attached in Test.scala

  was:
Hi,

 

I had this problem in "real" environments and also made a self-contained test 
(attached).

Having this Window definition:
{code:scala}
val myWindow = Window.partitionBy($"word").orderBy("word") 
val filt2 = filtrador.withColumn("avg_Time", 
avg($"number").over(myWindow)){code}
 

As a user, I would expect either:

1- Error/warning (because trying to sort on one of the columns of the window 
partitionBy)

2- A mostly-useless operation which just orders the rows inside each Window but 
doesn't affect performance too much.

 

Currently what I see:

*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is 
performing a global orderBy of the whole DataFrame. Similar to 
dataframe.orderBy("word").*

*In my real environment, my program just didn't finish in time/crashed thus 
causing my program to be very slow or crash (because as it's a global orderBy, 
it will just go to one executor).*

 

In the test I can see how all elements of my DF are in a single partition 
(side-effect of the global orderBy) 

 

Full Code showing the error (see how the mapPartitions shows 99 rows in one 
partition) <-- You can pase it in Intellij/Any other and it should work:

 
{code:scala}

import java.io.ByteArrayOutputStream
import java.net.URL
import java.nio.charset.Charset

import org.apache.commons.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}

import scala.collection.mutable
object Test {

  case class Bank(age:Integer, job:String, marital : String, education : 
String, balance : Integer)


  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.autoBroadcastJoinThreshold", -1)
      .master("local[4]")
      .appName("Word Count")
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import spark.implicits._

    val sc = spark.sparkContext
    val expectedSchema = List(
      StructField("number", IntegerType, false),
      StructField("word", StringType, false),
      StructField("dummyColumn", StringType, false)
    )
    val expectedData = Seq(
      Row(8, "bat", "test"),
      Row(64, "mouse", "test"),
      Row(-27, "horse", "test")
    )

    val filtrador = spark.createDataFrame(
      spark.sparkContext.parallelize(expectedData),
      StructType(expectedSchema)
    ).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))

    //spark.createDataFrame(bank,Bank.getClass).createOrReplaceTempView("bank")
    //spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankDos")
    //spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankTres")


    //val filtrador2=filtrador.crossJoin(filtrador)
    //filtrador2.cache()
    //filtrador2.union(filtrador2).count


    val myWindow = Window.partitionBy($"word").orderBy("word")
    val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow))
    filt2.show

    filt2.rdd.mapPartitions(iter => Iterator(iter.size), 
true).collect().foreach(println)

  }
}

 {code}
 


> Window orderBy causing full-DF orderBy 
> ---------------------------------------
>
>                 Key: SPARK-29265
>                 URL: https://issues.apache.org/jira/browse/SPARK-29265
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0, 2.4.3, 2.4.4
>         Environment: Any
>            Reporter: Florentino Sainz
>            Priority: Minor
>         Attachments: Test.scala
>
>
> Hi,
>  
> I had this problem in "real" environments and also made a self-contained test 
> ( [^Test.scala] attached).
> Having this Window definition:
> {code:scala}
> val myWindow = Window.partitionBy($"word").orderBy("word") 
> val filt2 = filtrador.withColumn("avg_Time", 
> avg($"number").over(myWindow)){code}
>  
> As a user, I would expect either:
> 1- Error/warning (because trying to sort on one of the columns of the window 
> partitionBy)
> 2- A mostly-useless operation which just orders the rows inside each Window 
> but doesn't affect performance too much.
>  
> Currently what I see:
> *When I use "myWindow" in any DataFrame, somehow that Window.orderBy is 
> performing a global orderBy of the whole DataFrame. Similar to 
> dataframe.orderBy("word").*
> *In my real environment, my program just didn't finish in time/crashed thus 
> causing my program to be very slow or crash (because as it's a global 
> orderBy, it will just go to one executor).*
>  
> In the test I can see how all elements of my DF are in a single partition 
> (side-effect of the global orderBy) 
>  
> Full Code showing the error (see how the mapPartitions shows 99 rows in one 
> partition) attached in Test.scala



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to