Repository: spark Updated Branches: refs/heads/branch-2.0 5f8c0b742 -> 53cd99f65
[SPARK-18700][SQL][BACKPORT-2.0] Add StripedLock for each table's relation in cache ## What changes were proposed in this pull request? Backport of #16135 to branch-2.0 ## How was this patch tested? Because of the diff between branch-2.0 and master/2.1, here add a multi-thread access table test in `HiveMetadataCacheSuite` and check it only loading once using metrics in `HiveCatalogMetrics` Author: xuanyuanking <[email protected]> Closes #16350 from xuanyuanking/SPARK-18700-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53cd99f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53cd99f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53cd99f6 Branch: refs/heads/branch-2.0 Commit: 53cd99f65667c4d49db000101460a9d266f199e8 Parents: 5f8c0b7 Author: xuanyuanking <[email protected]> Authored: Wed Dec 21 22:55:42 2016 +0100 Committer: Herman van Hovell <[email protected]> Committed: Wed Dec 21 22:55:42 2016 +0100 ---------------------------------------------------------------------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 147 +++++++++++-------- 1 file changed, 82 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/53cd99f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e7d1ed3..670400f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -65,6 +66,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log t.identifier.table.toLowerCase) } + /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ + private val tableCreationLocks = Striped.lazyWeakLock(100) + + /** Acquires a lock on the table cache for the duration of `f`. */ + private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = { + val lock = tableCreationLocks.get(tableName) + lock.lock() + try f finally { + lock.unlock() + } + } + /** A cache of Spark SQL data source tables that have been accessed. */ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { @@ -274,77 +287,81 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log partitionPaths } - val cached = getCached( - tableIdentifier, - paths, - metastoreRelation, - metastoreSchema, - fileFormatClass, - bucketSpec, - Some(partitionSpec)) - - val hadoopFsRelation = cached.getOrElse { - val fileCatalog = new MetaStorePartitionedTableFileCatalog( - sparkSession, - new Path(metastoreRelation.catalogTable.storage.locationUri.get), - partitionSpec) - - val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) - } else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get - } + withTableCreationLock(tableIdentifier, { + val cached = getCached( + tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + Some(partitionSpec)) + + val hadoopFsRelation = cached.getOrElse { + val fileCatalog = new MetaStorePartitionedTableFileCatalog( + sparkSession, + new Path(metastoreRelation.catalogTable.storage.locationUri.get), + partitionSpec) + + val inferredSchema = if (fileType.equals("parquet")) { + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + } else { + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get + } - val relation = HadoopFsRelation( - location = fileCatalog, - partitionSchema = partitionSchema, - dataSchema = inferredSchema, - bucketSpec = bucketSpec, - fileFormat = defaultSource, - options = options)(sparkSession = sparkSession) - - val created = LogicalRelation( - relation, - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - cachedDataSourceTables.put(tableIdentifier, created) - created - } + val relation = HadoopFsRelation( + location = fileCatalog, + partitionSchema = partitionSchema, + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options)(sparkSession = sparkSession) + + val created = LogicalRelation( + relation, + metastoreTableIdentifier = + Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) + cachedDataSourceTables.put(tableIdentifier, created) + created + } - hadoopFsRelation + hadoopFsRelation + }) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - val cached = getCached(tableIdentifier, - paths, - metastoreRelation, - metastoreSchema, - fileFormatClass, - bucketSpec, - None) - val logicalRelation = cached.getOrElse { - val created = - LogicalRelation( - DataSource( - sparkSession = sparkSession, - paths = paths, - userSpecifiedSchema = Some(metastoreRelation.schema), - bucketSpec = bucketSpec, - options = options, - className = fileType).resolveRelation(), - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - - - cachedDataSourceTables.put(tableIdentifier, created) - created - } + withTableCreationLock(tableIdentifier, { + val cached = getCached(tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + None) + val logicalRelation = cached.getOrElse { + val created = + LogicalRelation( + DataSource( + sparkSession = sparkSession, + paths = paths, + userSpecifiedSchema = Some(metastoreRelation.schema), + bucketSpec = bucketSpec, + options = options, + className = fileType).resolveRelation(), + metastoreTableIdentifier = + Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) + + + cachedDataSourceTables.put(tableIdentifier, created) + created + } - logicalRelation + logicalRelation + }) } result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
