Repository: spark
Updated Branches:
  refs/heads/master d5aefa83a -> ee13f3e3d


[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable

## What changes were proposed in this pull request?

Tables in the catalog cache are not invalidated once their statistics are 
updated. As a consequence, existing sessions will use the cached information 
even though it is not valid anymore. Consider and an example below.

```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```

After step 3, the table will be present in the catalog relation cache. Step 4 
will correctly update the metadata inside the catalog but will NOT invalidate 
the cache.

By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 
3 and step 4 would also solve the problem.

## How was this patch tested?

Current and additional unit tests.

Author: aokolnychyi <anton.okolnyc...@sap.com>

Closes #19252 from aokolnychyi/spark-21969.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee13f3e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee13f3e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee13f3e3

Branch: refs/heads/master
Commit: ee13f3e3dc3faa5152cefa91c22f8aaa8e425bb4
Parents: d5aefa8
Author: aokolnychyi <anton.okolnyc...@sap.com>
Authored: Tue Sep 19 14:19:13 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Sep 19 14:19:13 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  2 +
 .../command/AnalyzeColumnCommand.scala          |  3 -
 .../execution/command/AnalyzeTableCommand.scala |  2 -
 .../spark/sql/StatisticsCollectionSuite.scala   | 73 ++++++++++++++++++++
 .../sql/StatisticsCollectionTestBase.scala      | 14 +++-
 5 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0908d68..9407b72 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -377,6 +377,8 @@ class SessionCatalog(
     requireDbExists(db)
     requireTableExists(tableIdentifier)
     externalCatalog.alterTableStats(db, table, newStats)
+    // Invalidate the table relation cache
+    refreshTable(identifier)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 6588993..caf12ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -56,9 +56,6 @@ case class AnalyzeColumnCommand(
 
     sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
 
-    // Refresh the cached data source table in the catalog.
-    sessionState.catalog.refreshTable(tableIdentWithDB)
-
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 04715bd..58b53e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -48,8 +48,6 @@ case class AnalyzeTableCommand(
     val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, 
newTotalSize, newRowCount)
     if (newStats.isDefined) {
       sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
-      // Refresh the cached data source table in the catalog.
-      sessionState.catalog.refreshTable(tableIdentWithDB)
     }
 
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 9e459ed..2fc92f4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -261,6 +261,10 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
           assert(fetched1.get.sizeInBytes == 0)
           assert(fetched1.get.colStats.size == 2)
 
+          // table lookup will make the table cached
+          spark.table(table)
+          assert(isTableInCatalogCache(table))
+
           // insert into command
           sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
           if (autoUpdate) {
@@ -270,9 +274,78 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
           } else {
             checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = 
None)
           }
+
+          // check that tableRelationCache inside the catalog was invalidated 
after insert
+          assert(!isTableInCatalogCache(table))
+        }
+      }
+    }
+  }
+
+  test("invalidation of tableRelationCache after inserts") {
+    val table = "invalidate_catalog_cache_table"
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          spark.range(100).write.saveAsTable(table)
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+          spark.table(table)
+          val initialSizeInBytes = 
getTableFromCatalogCache(table).stats.sizeInBytes
+          spark.range(100).write.mode(SaveMode.Append).saveAsTable(table)
+          spark.table(table)
+          assert(getTableFromCatalogCache(table).stats.sizeInBytes == 2 * 
initialSizeInBytes)
+        }
+      }
+    }
+  }
+
+  test("invalidation of tableRelationCache after table truncation") {
+    val table = "invalidate_catalog_cache_table"
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          spark.range(100).write.saveAsTable(table)
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+          spark.table(table)
+          sql(s"TRUNCATE TABLE $table")
+          spark.table(table)
+          assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0)
         }
       }
     }
   }
 
+  test("invalidation of tableRelationCache after alter table add partition") {
+    val table = "invalidate_catalog_cache_table"
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTempDir { dir =>
+          withTable(table) {
+            val path = dir.getCanonicalPath
+            sql(s"""
+              |CREATE TABLE $table (col1 int, col2 int)
+              |USING PARQUET
+              |PARTITIONED BY (col2)
+              |LOCATION '${dir.toURI}'""".stripMargin)
+            sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+            spark.table(table)
+            assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0)
+            spark.catalog.recoverPartitions(table)
+            val df = Seq((1, 2), (1, 2)).toDF("col2", "col1")
+            df.write.parquet(s"$path/col2=1")
+            sql(s"ALTER TABLE $table ADD PARTITION (col2=1) LOCATION 
'${dir.toURI}'")
+            spark.table(table)
+            val cachedTable = getTableFromCatalogCache(table)
+            val cachedTableSizeInBytes = cachedTable.stats.sizeInBytes
+            val defaultSizeInBytes = conf.defaultSizeInBytes
+            if (autoUpdate) {
+              assert(cachedTableSizeInBytes != defaultSizeInBytes && 
cachedTableSizeInBytes > 0)
+            } else {
+              assert(cachedTableSizeInBytes == defaultSizeInBytes)
+            }
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 5916cd7..a2f63ed 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -23,9 +23,9 @@ import java.sql.{Date, Timestamp}
 import scala.collection.mutable
 import scala.util.Random
 
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
HiveTableRelation}
-import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.StaticSQLConf
@@ -85,6 +85,16 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
     spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
   }
 
+  def getTableFromCatalogCache(tableName: String): LogicalPlan = {
+    val catalog = spark.sessionState.catalog
+    val qualifiedTableName = QualifiedTableName(catalog.getCurrentDatabase, 
tableName)
+    catalog.getCachedTable(qualifiedTableName)
+  }
+
+  def isTableInCatalogCache(tableName: String): Boolean = {
+    getTableFromCatalogCache(tableName) != null
+  }
+
   def getCatalogStatistics(tableName: String): CatalogStatistics = {
     getCatalogTable(tableName).stats.get
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to