Repository: spark Updated Branches: refs/heads/branch-2.1 8145c82bc -> 41d698ece
[SPARK-18661][SQL] Creating a partitioned datasource table should not scan all files for table ## What changes were proposed in this pull request? Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason. We should avoid doing this when the user specifies a schema. ## How was this patch tested? Perf stat tests. Author: Eric Liang <[email protected]> Closes #16090 from ericl/spark-18661. (cherry picked from commit d9eb4c7215f26dd05527c0b9980af35087ab9d64) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41d698ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41d698ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41d698ec Branch: refs/heads/branch-2.1 Commit: 41d698ecead46979e9a77b21e6a9c8f27cff63ac Parents: 8145c82 Author: Eric Liang <[email protected]> Authored: Sun Dec 4 20:44:04 2016 +0800 Committer: Wenchen Fan <[email protected]> Committed: Sun Dec 4 20:44:16 2016 +0800 ---------------------------------------------------------------------- .../command/createDataSourceTables.scala | 10 +++- .../sql/execution/datasources/DataSource.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 11 ++++- .../hive/PartitionedTablePerfStatsSuite.scala | 51 ++++++++++++++++++-- 4 files changed, 66 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/41d698ec/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 422700c..193a2a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. val pathOption = table.storage.locationUri.map("path" -> _) + // Fill in some default table options from the session conf + val tableWithDefaultOptions = table.copy( + identifier = table.identifier.copy( + database = Some( + table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))), + tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions) val dataSource: BaseRelation = DataSource( sparkSession = sparkSession, userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), + partitionColumns = table.partitionColumnNames, className = table.provider.get, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption).resolveRelation() + options = table.storage.properties ++ pathOption, + catalogTable = Some(tableWithDefaultOptions)).resolveRelation() dataSource match { case fs: HadoopFsRelation => http://git-wip-us.apache.org/repos/asf/spark/blob/41d698ec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ccfc759..f47eb84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -132,7 +132,7 @@ case class DataSource( }.toArray new InMemoryFileIndex(sparkSession, globbedPaths, options, None) } - val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) { + val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning // columns properly unless it is a Hive DataSource val resolved = tempFileIndex.partitionSchema.map { partitionField => http://git-wip-us.apache.org/repos/asf/spark/blob/41d698ec/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 10843e9..6593fa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { pathToNonPartitionedTable, userSpecifiedSchema = Option("num int, str string"), userSpecifiedPartitionCols = partitionCols, - expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedSchema = if (partitionCols.isDefined) { + // we skipped inference, so the partition col is ordered at the end + new StructType().add("str", StringType).add("num", IntegerType) + } else { + // no inferred partitioning, so schema is in original order + new StructType().add("num", IntegerType).add("str", StringType) + }, expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String])) } } @@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val table = catalog.getTableMetadata(TableIdentifier("tbl")) assert(table.tableType == CatalogTableType.MANAGED) assert(table.provider == Some("parquet")) - assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType)) + // a is ordered last since it is a user-specified partitioning column + assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType)) assert(table.partitionColumnNames == Seq("a")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/41d698ec/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 9838b9a..65c02d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -60,36 +60,52 @@ class PartitionedTablePerfStatsSuite setupPartitionedHiveTable(tableName, dir, 5) } - private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = { + private def setupPartitionedHiveTable( + tableName: String, dir: File, scale: Int, + clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = { spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write .partitionBy("partCol1", "partCol2") .mode("overwrite") .parquet(dir.getAbsolutePath) + if (clearMetricsBeforeCreate) { + HiveCatalogMetrics.reset() + } + spark.sql(s""" |create external table $tableName (fieldOne long) |partitioned by (partCol1 int, partCol2 int) |stored as parquet |location "${dir.getAbsolutePath}"""".stripMargin) - spark.sql(s"msck repair table $tableName") + if (repair) { + spark.sql(s"msck repair table $tableName") + } } private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = { setupPartitionedDatasourceTable(tableName, dir, 5) } - private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int): Unit = { + private def setupPartitionedDatasourceTable( + tableName: String, dir: File, scale: Int, + clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = { spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write .partitionBy("partCol1", "partCol2") .mode("overwrite") .parquet(dir.getAbsolutePath) + if (clearMetricsBeforeCreate) { + HiveCatalogMetrics.reset() + } + spark.sql(s""" |create table $tableName (fieldOne long, partCol1 int, partCol2 int) |using parquet |options (path "${dir.getAbsolutePath}") |partitioned by (partCol1, partCol2)""".stripMargin) - spark.sql(s"msck repair table $tableName") + if (repair) { + spark.sql(s"msck repair table $tableName") + } } genericTest("partitioned pruned table reports only selected files") { spec => @@ -250,6 +266,33 @@ class PartitionedTablePerfStatsSuite } } + test("datasource table: table setup does not scan filesystem") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { + withTable("test") { + withTempDir { dir => + setupPartitionedDatasourceTable( + "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + + test("hive table: table setup does not scan filesystem") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { + withTable("test") { + withTempDir { dir => + HiveCatalogMetrics.reset() + setupPartitionedHiveTable( + "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + test("hive table: num hive client calls does not scale with partition count") { withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { withTable("test") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
