Repository: spark
Updated Branches:
refs/heads/branch-2.0 798825c09 -> a08715c7a
[SPARK-15678] Add support to REFRESH data source paths
## What changes were proposed in this pull request?
Spark currently incorrectly continues to use cached data even if the underlying
data is overwritten.
Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using
the cached dataset
```
This patch fixes this bug by adding support for `REFRESH path` that invalidates
and refreshes all the cached data (and the associated metadata) for any
dataframe that contains the given data source path.
Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the
cached dataset
```
## How was this patch tested?
Unit tests for overwrites and appends in `ParquetQuerySuite` and
`CachedTableSuite`.
Author: Sameer Agarwal <[email protected]>
Closes #13566 from sameeragarwal/refresh-path-2.
(cherry picked from commit 468da03e23a01e02718608f05d778386cbb8416b)
Signed-off-by: Davies Liu <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a08715c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a08715c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a08715c7
Branch: refs/heads/branch-2.0
Commit: a08715c7a79ce1953b8d64a9cf0ec1c513d56eec
Parents: 798825c
Author: Sameer Agarwal <[email protected]>
Authored: Fri Jun 10 20:43:18 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Fri Jun 10 20:43:26 2016 -0700
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 +
.../org/apache/spark/sql/catalog/Catalog.scala | 7 +++
.../spark/sql/execution/CacheManager.scala | 51 +++++++++++++++++++-
.../spark/sql/execution/SparkSqlParser.scala | 9 +++-
.../spark/sql/execution/datasources/ddl.scala | 9 ++++
.../apache/spark/sql/internal/CatalogImpl.scala | 10 ++++
.../datasources/parquet/ParquetQuerySuite.scala | 28 +++++++++++
.../spark/sql/hive/CachedTableSuite.scala | 45 +++++++++++++++++
8 files changed, 158 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index d102559..044f910 100644
---
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -113,6 +113,7 @@ statement
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
tableIdentifier partitionSpec? describeColName?
#describeTable
| REFRESH TABLE tableIdentifier
#refreshTable
+ | REFRESH .*?
#refreshResource
| CACHE LAZY? TABLE identifier (AS? query)?
#cacheTable
| UNCACHE TABLE identifier
#uncacheTable
| CLEAR CACHE
#clearCache
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6ddb1a7..083a63c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -226,4 +226,11 @@ abstract class Catalog {
*/
def refreshTable(tableName: String): Unit
+ /**
+ * Invalidate and refresh all the cached data (and the associated metadata)
for any dataframe that
+ * contains the given data source path.
+ *
+ * @since 2.0.0
+ */
+ def refreshByPath(path: String): Unit
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c8bdb0d..b584cf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -19,10 +19,14 @@ package org.apache.spark.sql.execution
import java.util.concurrent.locks.ReentrantReadWriteLock
+import org.apache.hadoop.fs.{FileSystem, Path}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -157,4 +161,49 @@ private[sql] class CacheManager extends Logging {
case _ =>
}
}
+
+ /**
+ * Invalidates the cache of any data that contains `resourcePath` in one or
more
+ * `HadoopFsRelation` node(s) as part of its logical plan.
+ */
+ private[sql] def invalidateCachedPath(
+ sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
+ val (fs, qualifiedPath) = {
+ val path = new Path(resourcePath)
+ val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+ }
+
+ cachedData.foreach {
+ case data if data.plan.find(lookupAndRefresh(_, fs,
qualifiedPath)).isDefined =>
+ val dataIndex = cachedData.indexWhere(cd =>
data.plan.sameResult(cd.plan))
+ if (dataIndex >= 0) {
+ data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking =
true)
+ cachedData.remove(dataIndex)
+ }
+
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession,
data.plan))
+ case _ => // Do Nothing
+ }
+ }
+
+ /**
+ * Traverses a given `plan` and searches for the occurrences of
`qualifiedPath` in the
+ * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any
[[HadoopFsRelation]] nodes
+ * in the plan. If found, we refresh the metadata and return true.
Otherwise, this method returns
+ * false.
+ */
+ private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem,
qualifiedPath: Path): Boolean = {
+ plan match {
+ case lr: LogicalRelation => lr.relation match {
+ case hr: HadoopFsRelation =>
+ val invalidate = hr.location.paths
+ .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
+ .contains(qualifiedPath)
+ if (invalidate) hr.location.refresh()
+ invalidate
+ case _ => false
+ }
+ case _ => false
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index dc74222..06d8f15 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,7 +25,6 @@ import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -210,6 +209,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder
{
}
/**
+ * Create a [[RefreshTable]] logical plan.
+ */
+ override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan
= withOrigin(ctx) {
+ val resourcePath = remainder(ctx.REFRESH.getSymbol).trim
+ RefreshResource(resourcePath)
+ }
+
+ /**
* Create a [[CacheTableCommand]] logical plan.
*/
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan =
withOrigin(ctx) {
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index aa42eae..31a2075 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -102,6 +102,15 @@ case class RefreshTable(tableIdent: TableIdentifier)
}
}
+case class RefreshResource(path: String)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.catalog.refreshByPath(path)
+ Seq.empty[Row]
+ }
+}
+
/**
* Builds a map in which keys are case insensitive
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 70e17b1..f42fd17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -373,6 +373,16 @@ class CatalogImpl(sparkSession: SparkSession) extends
Catalog {
}
}
+ /**
+ * Refresh the cache entry and the associated metadata for all dataframes
(if any), that contain
+ * the given data source path.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def refreshByPath(resourcePath: String): Unit = {
+ sparkSession.sharedState.cacheManager.invalidateCachedPath(sparkSession,
resourcePath)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index ea57f71..b4fd0ef 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -68,6 +68,34 @@ class ParquetQuerySuite extends QueryTest with ParquetTest
with SharedSQLContext
TableIdentifier("tmp"), ignoreIfNotExists = true)
}
+ test("SPARK-15678: not use cache on overwrite") {
+ withTempDir { dir =>
+ val path = dir.toString
+ spark.range(1000).write.mode("overwrite").parquet(path)
+ val df = spark.read.parquet(path).cache()
+ assert(df.count() == 1000)
+ spark.range(10).write.mode("overwrite").parquet(path)
+ assert(df.count() == 1000)
+ spark.catalog.refreshByPath(path)
+ assert(df.count() == 10)
+ assert(spark.read.parquet(path).count() == 10)
+ }
+ }
+
+ test("SPARK-15678: not use cache on append") {
+ withTempDir { dir =>
+ val path = dir.toString
+ spark.range(1000).write.mode("append").parquet(path)
+ val df = spark.read.parquet(path).cache()
+ assert(df.count() == 1000)
+ spark.range(10).write.mode("append").parquet(path)
+ assert(df.count() == 1000)
+ spark.catalog.refreshByPath(path)
+ assert(df.count() == 1010)
+ assert(spark.read.parquet(path).count() == 1010)
+ }
+ }
+
test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 52ba90f..5121440 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with
TestHiveSingleton {
Utils.deleteRecursively(tempPath)
}
+ test("SPARK-15678: REFRESH PATH") {
+ val tempPath: File = Utils.createTempDir()
+ tempPath.delete()
+ table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
+ sql("DROP TABLE IF EXISTS refreshTable")
+ sparkSession.catalog.createExternalTable("refreshTable",
tempPath.toString, "parquet")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Cache the table.
+ sql("CACHE TABLE refreshTable")
+ assertCached(table("refreshTable"))
+ // Append new data.
+ table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
+ // We are still using the old data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Refresh the table.
+ sql(s"REFRESH ${tempPath.toString}")
+ // We are using the new data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").union(table("src")).collect())
+
+ // Drop the table and create it again.
+ sql("DROP TABLE refreshTable")
+ sparkSession.catalog.createExternalTable("refreshTable",
tempPath.toString, "parquet")
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+ // Refresh the table. REFRESH command should not make a uncached
+ // table cached.
+ sql(s"REFRESH ${tempPath.toString}")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").union(table("src")).collect())
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+ sql("DROP TABLE refreshTable")
+ Utils.deleteRecursively(tempPath)
+ }
+
test("SPARK-11246 cache parquet table") {
sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]