[ 
https://issues.apache.org/jira/browse/SPARK-24869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552965#comment-16552965
 ] 

Takeshi Yamamuro commented on SPARK-24869:
------------------------------------------

yea, I made the branch just for this test purpose (sorry for confusing you).
 Actually, I think this query uses cache internally for saving output data.
 For example, to check this, I added a test-purpose metric for counting cache 
hits and the test below passed;
 [https://github.com/apache/spark/compare/master...maropu:SPARK-24869-2]

`SaveIntoDataSourceCommand` has a logical plan (`query`) for saving. It 
internally wraps this plan with `Dataset` and passes this inner dataset into 
`DataSource.createRelation` on runtime (in `RunnableCommand.run`);
 
[https://github.com/apache/spark/blob/ab18b02e66fd04bc8f1a4fb7b6a7f2773902a494/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala#L46]

The inner dataset replaces the logical plan with a cached plan, then 
`DataSource.createRelation` saves the output of the cached plan.
 In case of the JDBC datasource, `DataSource.createRelation` calls 
`JdbcUtils.saveTable`.
 Since `saveTable` directly references the rdd of the inner dataset, 
`QueryExecutionListener` cannot listen to the execution of the logical plan 
with cache data.
 
[https://github.com/apache/spark/blob/ab18b02e66fd04bc8f1a4fb7b6a7f2773902a494/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L834]

Moreover, in the test of the description, `QueryExecution.withCacedData` of the 
save command (SaveIntoDataSourceCommand) is called. But, IIUC this operation 
seems meaningless because `CacheManager` doesn't replace an inner logical plan 
(`innerChildren`) with cached one. So, `SaveIntoDataSourceCommand.query` is 
always just an analyzed logical plan.

> SaveIntoDataSourceCommand's input Dataset does not use Cached Data
> ------------------------------------------------------------------
>
>                 Key: SPARK-24869
>                 URL: https://issues.apache.org/jira/browse/SPARK-24869
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Xiao Li
>            Priority: Major
>
> {code}
>     withTable("t") {
>       withTempPath { path =>
>         var numTotalCachedHit = 0
>         val listener = new QueryExecutionListener {
>           override def onFailure(f: String, qe: QueryExecution, e: 
> Exception):Unit = {}
>           override def onSuccess(funcName: String, qe: QueryExecution, 
> duration: Long): Unit = {
>             qe.withCachedData match {
>               case c: SaveIntoDataSourceCommand
>                   if c.query.isInstanceOf[InMemoryRelation] =>
>                 numTotalCachedHit += 1
>               case _ =>
>                 println(qe.withCachedData)
>             }
>           }
>         }
>         spark.listenerManager.register(listener)
>         val udf1 = udf({ (x: Int, y: Int) => x + y })
>         val df = spark.range(0, 3).toDF("a")
>           .withColumn("b", udf1(col("a"), lit(10)))
>         df.cache()
>         df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", 
> properties)
>         assert(numTotalCachedHit == 1, "expected to be cached in jdbc")
>       }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to