[ 
https://issues.apache.org/jira/browse/SPARK-29265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florentino Sainz updated SPARK-29265:
-------------------------------------
    Attachment: Test.scala

> Window orderBy causing full-DF orderBy 
> ---------------------------------------
>
>                 Key: SPARK-29265
>                 URL: https://issues.apache.org/jira/browse/SPARK-29265
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0, 2.4.3, 2.4.4
>         Environment: Any
>            Reporter: Florentino Sainz
>            Priority: Minor
>         Attachments: Test.scala
>
>
> Hi,
>  
> I had this problem in "real" environments and also made a self-contained test 
> (attached).
> Having this Window definition:
> {code:scala}
> val myWindow = Window.partitionBy($"word").orderBy("word") 
> val filt2 = filtrador.withColumn("avg_Time", 
> avg($"number").over(myWindow)){code}
>  
> As a user, I would expect either:
> 1- Error/warning (because trying to sort on one of the columns of the window 
> partitionBy)
> 2- A mostly-useless operation which just orders the rows inside each Window 
> but doesn't affect performance too much.
>  
> Currently what I see:
> *When I use "myWindow" in any DataFrame, somehow that Window.orderBy is 
> performing a global orderBy of the whole DataFrame. Similar to 
> dataframe.orderBy("word").*
> *In my real environment, my program just didn't finish in time/crashed thus 
> causing my program to be very slow or crash (because as it's a global 
> orderBy, it will just go to one executor).*
>  
> In the test I can see how all elements of my DF are in a single partition 
> (side-effect of the global orderBy) 
>  
> Full Code showing the error (see how the mapPartitions shows 99 rows in one 
> partition) <-- You can pase it in Intellij/Any other and it should work:
>  
> {code:scala}
> import java.io.ByteArrayOutputStream
> import java.net.URL
> import java.nio.charset.Charset
> import org.apache.commons.io.IOUtils
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql._
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
> StructType}
> import scala.collection.mutable
> object Test {
>   case class Bank(age:Integer, job:String, marital : String, education : 
> String, balance : Integer)
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession.builder
>       .config("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>       .config("spark.sql.autoBroadcastJoinThreshold", -1)
>       .master("local[4]")
>       .appName("Word Count")
>       .getOrCreate()
>     import org.apache.spark.sql.functions._
>     import spark.implicits._
>     val sc = spark.sparkContext
>     val expectedSchema = List(
>       StructField("number", IntegerType, false),
>       StructField("word", StringType, false),
>       StructField("dummyColumn", StringType, false)
>     )
>     val expectedData = Seq(
>       Row(8, "bat", "test"),
>       Row(64, "mouse", "test"),
>       Row(-27, "horse", "test")
>     )
>     val filtrador = spark.createDataFrame(
>       spark.sparkContext.parallelize(expectedData),
>       StructType(expectedSchema)
>     ).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
>     
> //spark.createDataFrame(bank,Bank.getClass).createOrReplaceTempView("bank")
>     //spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankDos")
>     //spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankTres")
>     //val filtrador2=filtrador.crossJoin(filtrador)
>     //filtrador2.cache()
>     //filtrador2.union(filtrador2).count
>     val myWindow = Window.partitionBy($"word").orderBy("word")
>     val filt2 = filtrador.withColumn("avg_Time", 
> avg($"number").over(myWindow))
>     filt2.show
>     filt2.rdd.mapPartitions(iter => Iterator(iter.size), 
> true).collect().foreach(println)
>   }
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to