[ 
https://issues.apache.org/jira/browse/SPARK-27062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

William Wong updated SPARK-27062:
---------------------------------
    Description: 
If CatalogImpl.refreshTable() method is invoked against a cached table, this 
method would first uncache corresponding query in the shared state cache 
manager, and then cache it back to refresh the cache copy. 

However, the table was recached with only 'table name'. The database name will 
be missed. Therefore, if cached table is not on the default database, the 
recreated cache may refer to a different table. For example, we may see the 
cached table name in driver's storage page will be changed after table 
refreshing. 

 

Here is related code on github for your reference. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala]
 

 
{code:java}
override def refreshTable(tableName: String): Unit = {
  val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
  val tableMetadata = 
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
  val table = sparkSession.table(tableIdent)

  if (tableMetadata.tableType == CatalogTableType.VIEW) {
    // Temp or persistent views: refresh (or invalidate) any metadata/data 
cached
    // in the plan recursively.
    table.queryExecution.analyzed.refresh()
  } else {
    // Non-temp tables: refresh the metadata cache.
    sessionCatalog.refreshTable(tableIdent)
  }

  // If this table is cached as an InMemoryRelation, drop the original
  // cached version and make the new version cached lazily.
  if (isCached(table)) {
    // Uncache the logicalPlan.
    sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, 
blocking = true)
    // Cache it again.
    sparkSession.sharedState.cacheManager.cacheQuery(table, 
Some(tableIdent.table))
  }
}
{code}
 

 

In Spark SQL module, the database name is registered together with table name 
when "CACHE TABLE" command was executed. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala]
 

and  CatalogImpl register cache with received table name. 
{code:java}
override def cacheTable(tableName: String): Unit = {    
sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), 
Some(tableName)) }
{code}
 

Therefore, I would like to propose aligning the behavior. RefreshTable method 
should reuse the received table name instead. 

 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
{code}
to 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName))
 {code}
 

  was:
If CatalogImpl.refreshTable() method is invoked against a cached table, this 
method would first uncache corresponding query in the shared state cache 
manager, and then cache it back to refresh the cache copy. 

However, the table was recached with only 'table name'. The database name will 
be missed. Therefore, if cached table is not on the default database, the 
recreated cache may refer to a different table. For example, we may see the 
cached table name in driver's storage page will be changed after table 
refreshing. 

 

Here is related code on github for your reference. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala]
 

 
{code:java}
override def refreshTable(tableName: String): Unit = {
  val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
  val tableMetadata = 
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
  val table = sparkSession.table(tableIdent)

  if (tableMetadata.tableType == CatalogTableType.VIEW) {
    // Temp or persistent views: refresh (or invalidate) any metadata/data 
cached
    // in the plan recursively.
    table.queryExecution.analyzed.refresh()
  } else {
    // Non-temp tables: refresh the metadata cache.
    sessionCatalog.refreshTable(tableIdent)
  }

  // If this table is cached as an InMemoryRelation, drop the original
  // cached version and make the new version cached lazily.
  if (isCached(table)) {
    // Uncache the logicalPlan.
    sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, 
blocking = true)
    // Cache it again.
    sparkSession.sharedState.cacheManager.cacheQuery(table, 
Some(tableIdent.table))
  }
}
{code}
 

 

In Spark SQL module, the database name is registered together with table name 
when "CACHE TABLE" command was executed. 

[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala]
 

 

 

Therefore, I would like to propose aligning the behavior. Full table name 
should also be used in RefreshTable case.  We should change the following line 
in CatalogImpl.refreshTable from 

 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table))
{code}
to

 

 
{code:java}
sparkSession.sharedState.cacheManager.cacheQuery(table, 
Some(tableIdent.quotedString))
 {code}
 


> Refresh Table command register table with table name only
> ---------------------------------------------------------
>
>                 Key: SPARK-27062
>                 URL: https://issues.apache.org/jira/browse/SPARK-27062
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2
>            Reporter: William Wong
>            Priority: Major
>              Labels: easyfix
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> If CatalogImpl.refreshTable() method is invoked against a cached table, this 
> method would first uncache corresponding query in the shared state cache 
> manager, and then cache it back to refresh the cache copy. 
> However, the table was recached with only 'table name'. The database name 
> will be missed. Therefore, if cached table is not on the default database, 
> the recreated cache may refer to a different table. For example, we may see 
> the cached table name in driver's storage page will be changed after table 
> refreshing. 
>  
> Here is related code on github for your reference. 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala]
>  
>  
> {code:java}
> override def refreshTable(tableName: String): Unit = {
>   val tableIdent = 
> sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
>   val tableMetadata = 
> sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
>   val table = sparkSession.table(tableIdent)
>   if (tableMetadata.tableType == CatalogTableType.VIEW) {
>     // Temp or persistent views: refresh (or invalidate) any metadata/data 
> cached
>     // in the plan recursively.
>     table.queryExecution.analyzed.refresh()
>   } else {
>     // Non-temp tables: refresh the metadata cache.
>     sessionCatalog.refreshTable(tableIdent)
>   }
>   // If this table is cached as an InMemoryRelation, drop the original
>   // cached version and make the new version cached lazily.
>   if (isCached(table)) {
>     // Uncache the logicalPlan.
>     sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true, 
> blocking = true)
>     // Cache it again.
>     sparkSession.sharedState.cacheManager.cacheQuery(table, 
> Some(tableIdent.table))
>   }
> }
> {code}
>  
>  
> In Spark SQL module, the database name is registered together with table name 
> when "CACHE TABLE" command was executed. 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala]
>  
> and  CatalogImpl register cache with received table name. 
> {code:java}
> override def cacheTable(tableName: String): Unit = {    
> sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName),
>  Some(tableName)) }
> {code}
>  
> Therefore, I would like to propose aligning the behavior. RefreshTable method 
> should reuse the received table name instead. 
>  
> {code:java}
> sparkSession.sharedState.cacheManager.cacheQuery(table, 
> Some(tableIdent.table))
> {code}
> to 
> {code:java}
> sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName))
>  {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to