Izek Greenfield created SPARK-37321:
---------------------------------------
Summary: Wrong size estimation that leads to Cannot broadcast the
table that is larger than 8GB: 8 GB
Key: SPARK-37321
URL: https://issues.apache.org/jira/browse/SPARK-37321
Project: Spark
Issue Type: Bug
Components: Optimizer
Affects Versions: 3.2.0, 3.1.1
Reporter: Izek Greenfield
When CBO is enabled then a situation occurs where spark tries to broadcast very
large DataFrame due to wrong output size estimation.
In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will
use `DataType.defaultSize`.
In the case where the output contains `functions.concat_ws`, the
`getSizePerRow` function will estimate the size to be 20 bytes, while in our
case the actual size can be a lot larger.
As a result, we in some cases end up with an estimated size of < 300K while the
actual size can be > 8GB, thus leading to exceptions as spark thinks the tables
may be broadcast but later realizes the data size is too large.
Code sample to reproduce:
{code:scala}
import spark.implicits._
(1 to 100000).toDF("index").withColumn("index",
col("index").cast("string")).write.parquet("/tmp/a")
(1 to 1000).toDF("index_b").withColumn("index_b",
col("index_b").cast("string")).write.parquet("/tmp/b")
val a = spark.read
.parquet("/tmp/a")
.withColumn("b", col("index"))
.withColumn("l1", functions.concat_ws("/", col("index"),
functions.current_date(), functions.current_date(), functions.current_date(),
functions.current_date()))
.withColumn("l2", functions.concat_ws("/", col("index"),
functions.current_date(), functions.current_date(), functions.current_date(),
functions.current_date()))
.withColumn("l3", functions.concat_ws("/", col("index"),
functions.current_date(), functions.current_date(), functions.current_date(),
functions.current_date()))
.withColumn("l4", functions.concat_ws("/", col("index"),
functions.current_date(), functions.current_date(), functions.current_date(),
functions.current_date()))
.withColumn("l5", functions.concat_ws("/", col("index"),
functions.current_date(), functions.current_date(), functions.current_date(),
functions.current_date()))
val r = Random.alphanumeric
val l = 220
val i = 2800
val b = spark.read
.parquet("/tmp/b")
.withColumn("l1", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
.withColumn("l2", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
.withColumn("l3", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
.withColumn("l4", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
.withColumn("l5", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
.withColumn("l6", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
.withColumn("l7", functions.concat_ws("/", (0 to i).flatMap(a =>
List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*))
a.join(b, col("index") === col("index_b")).show(2000)
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]