[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305162#comment-17305162
 ] 

Michael Chen edited comment on SPARK-34780 at 3/19/21, 8:00 PM:
----------------------------------------------------------------

I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms). For example, when building the file readers in 
DataSourceScanExec, relation.sparkSession is passed along so configs in these 
readers could be wrong.


was (Author: mikechen):
I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms). For example, when building the file readers in 
DataSourceScanExec, the fsRelation.sparkSession is passed along so configs in 
these readers could be wrong.

> Cached Table (parquet) with old Configs Used
> --------------------------------------------
>
>                 Key: SPARK-34780
>                 URL: https://issues.apache.org/jira/browse/SPARK-34780
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4, 3.1.1
>            Reporter: Michael Chen
>            Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
>     val tableDir = dir.getAbsoluteFile + "/table"
>     val df = Seq("a").toDF("key")
>     df.write.parquet(tableDir)
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1Stats = spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
>     val df2 = spark.read.parquet(tableDir).select("key")
>     df2.cache()
>     val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     // I expect these stats to be the same because file compression factor is 
> the same
>     assert(compression1Stats == compression1StatsWithCache)
>     // Instead, we can see the file compression factor is being cached and 
> used along with
>     // the logical plan
>     assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to