[
https://issues.apache.org/jira/browse/SPARK-37442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17579969#comment-17579969
]
EmmaYang commented on SPARK-37442:
----------------------------------
Hello, I exactly have this issue. and I am usign spark2.4
so my broadcast dataframe is built on top of files. and the oveall files size
is > 12gb, but I only use the sub dataframe, and in STORAGE, it showed only 2.5
GB, but still give me the broadcast hit 8GB error
so any workaround solution for it ?
Thank you.
: : : +- ResolvedHint (broadcast) : : : +- Filter isnotnull(invlv_pty_id#5078)
: : : +- InMemoryRelation [invlv_pty_id#5078, invlv_pty_id#5078],
StorageLevel(disk, memory, deserialized, 1 replicas) : : : +- *(1) Project
[invlv_pty_id#5078, invlv_pty_id#5078] : : : +- *(1) FileScan csv
[invlv_pty_id#5078] Batched: false, Format: CSV, Location:
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<invlv_pty_id:string> : : +- ResolvedHint (broadcast) : : +- Filter
isnotnull(invlv_pty_id#5078) : : +- InMemoryRelation [invlv_pty_id#5078,
invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : : +-
*(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : : +- *(1) FileScan csv
[invlv_pty_id#5078] Batched: false, Format: CSV, Location:
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<invlv_pty_id:string> : +- ResolvedHint (broadcast) : +- Filter
isnotnull(invlv_pty_id#5078) : +- InMemoryRelation [invlv_pty_id#5078,
invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) : +-
*(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] : +- *(1) FileScan csv
[invlv_pty_id#5078] Batched: false, Format: CSV, Location:
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<invlv_pty_id:string> +- ResolvedHint (broadcast) +- Filter
isnotnull(invlv_pty_id#5078) +- InMemoryRelation [invlv_pty_id#5078,
invlv_pty_id#5078], StorageLevel(disk, memory, deserialized, 1 replicas) +-
*(1) Project [invlv_pty_id#5078, invlv_pty_id#5078] +- *(1) FileScan csv
[invlv_pty_id#5078] Batched: false, Format: CSV, Location:
InMemoryFileIndex[hdfs://gftsdev/data/gfrrsnsd/standardization/hive/gfrrsnsd_standardization/trl_...,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<invlv_pty_id:string>
> In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the
> table that is larger than 8GB: 8 GB" failure
> ------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-37442
> URL: https://issues.apache.org/jira/browse/SPARK-37442
> Project: Spark
> Issue Type: Sub-task
> Components: Optimizer, SQL
> Affects Versions: 3.1.1, 3.2.0
> Reporter: Michael Chen
> Assignee: Michael Chen
> Priority: Major
> Fix For: 3.2.1, 3.3.0
>
>
> There is a period in time where an InMemoryRelation will have the cached
> buffers loaded, but the statistics will be inaccurate (anywhere between 0 ->
> size in bytes reported by accumulators). When AQE is enabled, it is possible
> that join planning strategies will happen in this window. In this scenario,
> join children sizes including InMemoryRelation are greatly underestimated and
> a broadcast join can be planned when it shouldn't be. We have seen scenarios
> where a broadcast join is planned with the builder size greater than 8GB
> because at planning time, the optimizer believes the InMemoryRelation is 0
> bytes.
> Here is an example test case where the broadcast threshold is being ignored.
> It can mimic the 8GB error by increasing the size of the tables.
> {code:java}
> withSQLConf(
> SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
> SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
> // Spark estimates a string column as 20 bytes so with 60k rows, these
> relations should be
> // estimated at ~120m bytes which is greater than the broadcast join
> threshold
> Seq.fill(60000)("a").toDF("key")
> .createOrReplaceTempView("temp")
> Seq.fill(60000)("b").toDF("key")
> .createOrReplaceTempView("temp2")
> Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
> spark.sql("SELECT key as newKey FROM temp").persist()
> val query =
> s"""
> |SELECT t3.newKey
> |FROM
> | (SELECT t1.newKey
> | FROM (SELECT key as newKey FROM temp) as t1
> | JOIN
> | (SELECT key FROM smallTemp) as t2
> | ON t1.newKey = t2.key
> | ) as t3
> | JOIN
> | (SELECT key FROM temp2) as t4
> | ON t3.newKey = t4.key
> |UNION
> |SELECT t1.newKey
> |FROM
> | (SELECT key as newKey FROM temp) as t1
> | JOIN
> | (SELECT key FROM temp2) as t2
> | ON t1.newKey = t2.key
> |""".stripMargin
> val df = spark.sql(query)
> df.collect()
> val adaptivePlan = df.queryExecution.executedPlan
> val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
> assert(bhj.length == 1) {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]