Repository: spark Updated Branches: refs/heads/master 8e7b56f3d -> 468da03e2
[SPARK-15678] Add support to REFRESH data source paths ## What changes were proposed in this pull request? Spark currently incorrectly continues to use cached data even if the underlying data is overwritten. Current behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset ``` This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path. Expected behavior: ```scala val dir = "/tmp/test" sqlContext.range(1000).write.mode("overwrite").parquet(dir) val df = sqlContext.read.parquet(dir).cache() df.count() // outputs 1000 sqlContext.range(10).write.mode("overwrite").parquet(dir) spark.catalog.refreshResource(dir) sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset ``` ## How was this patch tested? Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`. Author: Sameer Agarwal <sam...@databricks.com> Closes #13566 from sameeragarwal/refresh-path-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/468da03e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/468da03e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/468da03e Branch: refs/heads/master Commit: 468da03e23a01e02718608f05d778386cbb8416b Parents: 8e7b56f Author: Sameer Agarwal <sam...@databricks.com> Authored: Fri Jun 10 20:43:18 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Fri Jun 10 20:43:18 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../org/apache/spark/sql/catalog/Catalog.scala | 7 +++ .../spark/sql/execution/CacheManager.scala | 51 +++++++++++++++++++- .../spark/sql/execution/SparkSqlParser.scala | 9 +++- .../spark/sql/execution/datasources/ddl.scala | 9 ++++ .../apache/spark/sql/internal/CatalogImpl.scala | 10 ++++ .../datasources/parquet/ParquetQuerySuite.scala | 28 +++++++++++ .../spark/sql/hive/CachedTableSuite.scala | 45 +++++++++++++++++ 8 files changed, 158 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d102559..044f910 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -113,6 +113,7 @@ statement | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? #describeTable | REFRESH TABLE tableIdentifier #refreshTable + | REFRESH .*? #refreshResource | CACHE LAZY? TABLE identifier (AS? query)? #cacheTable | UNCACHE TABLE identifier #uncacheTable | CLEAR CACHE #clearCache http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6ddb1a7..083a63c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -226,4 +226,11 @@ abstract class Catalog { */ def refreshTable(tableName: String): Unit + /** + * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that + * contains the given data source path. + * + * @since 2.0.0 + */ + def refreshByPath(path: String): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c8bdb0d..b584cf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,10 +19,14 @@ package org.apache.spark.sql.execution import java.util.concurrent.locks.ReentrantReadWriteLock +import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -157,4 +161,49 @@ private[sql] class CacheManager extends Logging { case _ => } } + + /** + * Invalidates the cache of any data that contains `resourcePath` in one or more + * `HadoopFsRelation` node(s) as part of its logical plan. + */ + private[sql] def invalidateCachedPath( + sparkSession: SparkSession, resourcePath: String): Unit = writeLock { + val (fs, qualifiedPath) = { + val path = new Path(resourcePath) + val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + + cachedData.foreach { + case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => + val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) + if (dataIndex >= 0) { + data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) + cachedData.remove(dataIndex) + } + sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) + case _ => // Do Nothing + } + } + + /** + * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the + * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes + * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns + * false. + */ + private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem, qualifiedPath: Path): Boolean = { + plan match { + case lr: LogicalRelation => lr.relation match { + case hr: HadoopFsRelation => + val invalidate = hr.location.paths + .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory)) + .contains(qualifiedPath) + if (invalidate) hr.location.refresh() + invalidate + case _ => false + } + case _ => false + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index dc74222..06d8f15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -25,7 +25,6 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -210,6 +209,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * Create a [[RefreshTable]] logical plan. + */ + override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan = withOrigin(ctx) { + val resourcePath = remainder(ctx.REFRESH.getSymbol).trim + RefreshResource(resourcePath) + } + + /** * Create a [[CacheTableCommand]] logical plan. */ override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index aa42eae..31a2075 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -102,6 +102,15 @@ case class RefreshTable(tableIdent: TableIdentifier) } } +case class RefreshResource(path: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.catalog.refreshByPath(path) + Seq.empty[Row] + } +} + /** * Builds a map in which keys are case insensitive */ http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 70e17b1..f42fd17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -373,6 +373,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } } + /** + * Refresh the cache entry and the associated metadata for all dataframes (if any), that contain + * the given data source path. + * + * @group cachemgmt + * @since 2.0.0 + */ + override def refreshByPath(resourcePath: String): Unit = { + sparkSession.sharedState.cacheManager.invalidateCachedPath(sparkSession, resourcePath) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index ea57f71..b4fd0ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -68,6 +68,34 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext TableIdentifier("tmp"), ignoreIfNotExists = true) } + test("SPARK-15678: not use cache on overwrite") { + withTempDir { dir => + val path = dir.toString + spark.range(1000).write.mode("overwrite").parquet(path) + val df = spark.read.parquet(path).cache() + assert(df.count() == 1000) + spark.range(10).write.mode("overwrite").parquet(path) + assert(df.count() == 1000) + spark.catalog.refreshByPath(path) + assert(df.count() == 10) + assert(spark.read.parquet(path).count() == 10) + } + } + + test("SPARK-15678: not use cache on append") { + withTempDir { dir => + val path = dir.toString + spark.range(1000).write.mode("append").parquet(path) + val df = spark.read.parquet(path).cache() + assert(df.count() == 1000) + spark.range(10).write.mode("append").parquet(path) + assert(df.count() == 1000) + spark.catalog.refreshByPath(path) + assert(df.count() == 1010) + assert(spark.read.parquet(path).count() == 1010) + } + } + test("self-join") { // 4 rows, cells of column 1 of row 2 and row 4 are null val data = (1 to 4).map { i => http://git-wip-us.apache.org/repos/asf/spark/blob/468da03e/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 52ba90f..5121440 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { Utils.deleteRecursively(tempPath) } + test("SPARK-15678: REFRESH PATH") { + val tempPath: File = Utils.createTempDir() + tempPath.delete() + table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) + sql("DROP TABLE IF EXISTS refreshTable") + sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Cache the table. + sql("CACHE TABLE refreshTable") + assertCached(table("refreshTable")) + // Append new data. + table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) + // We are still using the old data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").collect()) + // Refresh the table. + sql(s"REFRESH ${tempPath.toString}") + // We are using the new data. + assertCached(table("refreshTable")) + checkAnswer( + table("refreshTable"), + table("src").union(table("src")).collect()) + + // Drop the table and create it again. + sql("DROP TABLE refreshTable") + sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + // Refresh the table. REFRESH command should not make a uncached + // table cached. + sql(s"REFRESH ${tempPath.toString}") + checkAnswer( + table("refreshTable"), + table("src").union(table("src")).collect()) + // It is not cached. + assert(!isCached("refreshTable"), "refreshTable should not be cached.") + + sql("DROP TABLE refreshTable") + Utils.deleteRecursively(tempPath) + } + test("SPARK-11246 cache parquet table") { sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org