[
https://issues.apache.org/jira/browse/SPARK-29265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Florentino Sainz updated SPARK-29265:
-------------------------------------
Description:
Hi,
I had this problem in "real" environments and also made a self-contained test
(attached).
Having this Window definition:
{code:scala}
val myWindow = Window.partitionBy($"word").orderBy("word")
val filt2 = filtrador.withColumn("avg_Time",
avg($"number").over(myWindow)){code}
As a user, I would expect either:
1- Error/warning (because trying to sort on one of the columns of the window
partitionBy)
2- A mostly-useless operation which just orders the rows inside each Window but
doesn't affect performance too much.
Currently what I see:
*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is
performing a global orderBy of the whole DataFrame. Similar to
dataframe.orderBy("word").*
*In my real environment, my program just didn't finish in time/crashed thus
causing my program to be very slow or crash (because as it's a global orderBy,
it will just go to one executor).*
In the test I can see how all elements of my DF are in a single partition
(side-effect of the global orderBy)
Full Code showing the error (see how the mapPartitions shows 99 rows in one
partition) <-- You can pase it in Intellij/Any other and it should work:
{code:scala}
import java.io.ByteArrayOutputStream
import java.net.URL
import java.nio.charset.Charset
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
import scala.collection.mutable
object Test {
case class Bank(age:Integer, job:String, marital : String, education :
String, balance : Integer)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.master("local[4]")
.appName("Word Count")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val sc = spark.sparkContext
val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false),
StructField("dummyColumn", StringType, false)
)
val expectedData = Seq(
Row(8, "bat", "test"),
Row(64, "mouse", "test"),
Row(-27, "horse", "test")
)
val filtrador = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
//spark.createDataFrame(bank,Bank.getClass).createOrReplaceTempView("bank")
//spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankDos")
//spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankTres")
//val filtrador2=filtrador.crossJoin(filtrador)
//filtrador2.cache()
//filtrador2.union(filtrador2).count
val myWindow = Window.partitionBy($"word").orderBy("word")
val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow))
filt2.show
filt2.rdd.mapPartitions(iter => Iterator(iter.size),
true).collect().foreach(println)
}
}
{code}
was:
Hi,
I had this problem in "real" environments and also made a self-contained test
(attached).
Having this Window definition:
{code:java}
val myWindow = Window.partitionBy($"word").orderBy("word")
val filt2 = filtrador.withColumn("avg_Time",
avg($"number").over(myWindow)){code}
As a user, I would expect either:
1- Error/warning (because trying to sort on one of the columns of the window
partitionBy)
2- A mostly-useless operation which just orders the rows inside each Window but
doesn't affect performance too much.
Currently what I see:
*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is
performing a global orderBy of the whole DataFrame. Similar to
dataframe.orderBy("word").*
*In my real environment, my program just didn't finish in time/crashed thus
causing my program to be very slow or crash (because as it's a global orderBy,
it will just go to one executor).*
In the test I can see how all elements of my DF are in a single partition
(side-effect of the global orderBy)
Full Code showing the error (see how the mapPartitions shows 99 rows in one
partition) <-- You can pase it in Intellij/Any other and it should work:
{code: scala}
import java.io.ByteArrayOutputStream
import java.net.URL
import java.nio.charset.Charset
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
import scala.collection.mutable
object Test {
case class Bank(age:Integer, job:String, marital : String, education :
String, balance : Integer)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.master("local[4]")
.appName("Word Count")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val sc = spark.sparkContext
val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false),
StructField("dummyColumn", StringType, false)
)
val expectedData = Seq(
Row(8, "bat", "test"),
Row(64, "mouse", "test"),
Row(-27, "horse", "test")
)
val filtrador = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
//spark.createDataFrame(bank,Bank.getClass).createOrReplaceTempView("bank")
//spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankDos")
//spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankTres")
//val filtrador2=filtrador.crossJoin(filtrador)
//filtrador2.cache()
//filtrador2.union(filtrador2).count
val myWindow = Window.partitionBy($"word").orderBy("word")
val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow))
filt2.show
filt2.rdd.mapPartitions(iter => Iterator(iter.size),
true).collect().foreach(println)
}
}
{code}
> Window orderBy causing full-DF orderBy
> ---------------------------------------
>
> Key: SPARK-29265
> URL: https://issues.apache.org/jira/browse/SPARK-29265
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.3.0, 2.4.3, 2.4.4
> Environment: Any
> Reporter: Florentino Sainz
> Priority: Minor
>
> Hi,
>
> I had this problem in "real" environments and also made a self-contained test
> (attached).
> Having this Window definition:
> {code:scala}
> val myWindow = Window.partitionBy($"word").orderBy("word")
> val filt2 = filtrador.withColumn("avg_Time",
> avg($"number").over(myWindow)){code}
>
> As a user, I would expect either:
> 1- Error/warning (because trying to sort on one of the columns of the window
> partitionBy)
> 2- A mostly-useless operation which just orders the rows inside each Window
> but doesn't affect performance too much.
>
> Currently what I see:
> *When I use "myWindow" in any DataFrame, somehow that Window.orderBy is
> performing a global orderBy of the whole DataFrame. Similar to
> dataframe.orderBy("word").*
> *In my real environment, my program just didn't finish in time/crashed thus
> causing my program to be very slow or crash (because as it's a global
> orderBy, it will just go to one executor).*
>
> In the test I can see how all elements of my DF are in a single partition
> (side-effect of the global orderBy)
>
> Full Code showing the error (see how the mapPartitions shows 99 rows in one
> partition) <-- You can pase it in Intellij/Any other and it should work:
>
> {code:scala}
> import java.io.ByteArrayOutputStream
> import java.net.URL
> import java.nio.charset.Charset
> import org.apache.commons.io.IOUtils
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql._
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
> StructType}
> import scala.collection.mutable
> object Test {
> case class Bank(age:Integer, job:String, marital : String, education :
> String, balance : Integer)
> def main(args: Array[String]): Unit = {
> val spark = SparkSession.builder
> .config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .config("spark.sql.autoBroadcastJoinThreshold", -1)
> .master("local[4]")
> .appName("Word Count")
> .getOrCreate()
> import org.apache.spark.sql.functions._
> import spark.implicits._
> val sc = spark.sparkContext
> val expectedSchema = List(
> StructField("number", IntegerType, false),
> StructField("word", StringType, false),
> StructField("dummyColumn", StringType, false)
> )
> val expectedData = Seq(
> Row(8, "bat", "test"),
> Row(64, "mouse", "test"),
> Row(-27, "horse", "test")
> )
> val filtrador = spark.createDataFrame(
> spark.sparkContext.parallelize(expectedData),
> StructType(expectedSchema)
> ).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
>
> //spark.createDataFrame(bank,Bank.getClass).createOrReplaceTempView("bank")
> //spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankDos")
> //spark.createDataFrame(bank,Bank.getClass).registerTempTable("bankTres")
> //val filtrador2=filtrador.crossJoin(filtrador)
> //filtrador2.cache()
> //filtrador2.union(filtrador2).count
> val myWindow = Window.partitionBy($"word").orderBy("word")
> val filt2 = filtrador.withColumn("avg_Time",
> avg($"number").over(myWindow))
> filt2.show
> filt2.rdd.mapPartitions(iter => Iterator(iter.size),
> true).collect().foreach(println)
> }
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]