[ https://issues.apache.org/jira/browse/SPARK-24869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552965#comment-16552965 ]
Takeshi Yamamuro commented on SPARK-24869: ------------------------------------------ yea, I made the branch just for this test purpose (sorry for confusing you). Actually, I think this query uses cache internally for saving output data. For example, to check this, I added a test-purpose metric for counting cache hits and the test below passed; [https://github.com/apache/spark/compare/master...maropu:SPARK-24869-2] `SaveIntoDataSourceCommand` has a logical plan (`query`) for saving. It internally wraps this plan with `Dataset` and passes this inner dataset into `DataSource.createRelation` on runtime (in `RunnableCommand.run`); [https://github.com/apache/spark/blob/ab18b02e66fd04bc8f1a4fb7b6a7f2773902a494/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala#L46] The inner dataset replaces the logical plan with a cached plan, then `DataSource.createRelation` saves the output of the cached plan. In case of the JDBC datasource, `DataSource.createRelation` calls `JdbcUtils.saveTable`. Since `saveTable` directly references the rdd of the inner dataset, `QueryExecutionListener` cannot listen to the execution of the logical plan with cache data. [https://github.com/apache/spark/blob/ab18b02e66fd04bc8f1a4fb7b6a7f2773902a494/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L834] Moreover, in the test of the description, `QueryExecution.withCacedData` of the save command (SaveIntoDataSourceCommand) is called. But, IIUC this operation seems meaningless because `CacheManager` doesn't replace an inner logical plan (`innerChildren`) with cached one. So, `SaveIntoDataSourceCommand.query` is always just an analyzed logical plan. > SaveIntoDataSourceCommand's input Dataset does not use Cached Data > ------------------------------------------------------------------ > > Key: SPARK-24869 > URL: https://issues.apache.org/jira/browse/SPARK-24869 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1 > Reporter: Xiao Li > Priority: Major > > {code} > withTable("t") { > withTempPath { path => > var numTotalCachedHit = 0 > val listener = new QueryExecutionListener { > override def onFailure(f: String, qe: QueryExecution, e: > Exception):Unit = {} > override def onSuccess(funcName: String, qe: QueryExecution, > duration: Long): Unit = { > qe.withCachedData match { > case c: SaveIntoDataSourceCommand > if c.query.isInstanceOf[InMemoryRelation] => > numTotalCachedHit += 1 > case _ => > println(qe.withCachedData) > } > } > } > spark.listenerManager.register(listener) > val udf1 = udf({ (x: Int, y: Int) => x + y }) > val df = spark.range(0, 3).toDF("a") > .withColumn("b", udf1(col("a"), lit(10))) > df.cache() > df.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", > properties) > assert(numTotalCachedHit == 1, "expected to be cached in jdbc") > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org