Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3080f995c -> fc1b25660


[SPARK-18700][SQL] Add StripedLock for each table's relation in cache

## What changes were proposed in this pull request?

As the scenario describe in 
[SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when 
cachedDataSourceTables invalided, the coming few queries will fetch all 
FileStatus in listLeafFiles function. In the condition of table has many 
partitions, these jobs will occupy much memory of driver finally may cause 
driver OOM.

In this patch, add StripedLock for each table's relation in cache not for the 
whole cachedDataSourceTables, each table's load cache operation protected by it.

## How was this patch tested?

Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and 
check it only loading once using metrics in `HiveCatalogMetrics`

Author: xuanyuanking <xyliyuanj...@gmail.com>

Closes #16135 from xuanyuanking/SPARK-18700.

(cherry picked from commit 24482858e05bea84cacb41c62be0a9aaa33897ee)
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc1b2566
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc1b2566
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc1b2566

Branch: refs/heads/branch-2.1
Commit: fc1b25660d8d2ac676c0b020208bcb9b711978c8
Parents: 3080f99
Author: xuanyuanking <xyliyuanj...@gmail.com>
Authored: Mon Dec 19 20:31:43 2016 +0100
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Mon Dec 19 20:32:02 2016 +0100

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 134 +++++++++++--------
 .../hive/PartitionedTablePerfStatsSuite.scala   |  31 +++++
 2 files changed, 106 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fc1b2566/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index edbde5d..0407cf6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -32,7 +33,6 @@ import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.types._
 
-
 /**
  * Legacy catalog for interacting with the Hive metastore.
  *
@@ -53,6 +53,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       tableIdent.table.toLowerCase)
   }
 
+  /** These locks guard against multiple attempts to instantiate a table, 
which wastes memory. */
+  private val tableCreationLocks = Striped.lazyWeakLock(100)
+
+  /** Acquires a lock on the table cache for the duration of `f`. */
+  private def withTableCreationLock[A](tableName: QualifiedTableName, f: => 
A): A = {
+    val lock = tableCreationLocks.get(tableName)
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
   /** A cache of Spark SQL data source tables that have been accessed. */
   protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, 
LogicalPlan] = {
     val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
@@ -209,72 +221,76 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
         }
       }
 
-      val cached = getCached(
-        tableIdentifier,
-        rootPaths,
-        metastoreRelation,
-        metastoreSchema,
-        fileFormatClass,
-        bucketSpec,
-        Some(partitionSchema))
-
-      val logicalRelation = cached.getOrElse {
-        val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
-        val fileCatalog = {
-          val catalog = new CatalogFileIndex(
-            sparkSession, metastoreRelation.catalogTable, sizeInBytes)
-          if (lazyPruningEnabled) {
-            catalog
-          } else {
-            catalog.filterPartitions(Nil)  // materialize all the partitions 
in memory
+      withTableCreationLock(tableIdentifier, {
+        val cached = getCached(
+          tableIdentifier,
+          rootPaths,
+          metastoreRelation,
+          metastoreSchema,
+          fileFormatClass,
+          bucketSpec,
+          Some(partitionSchema))
+
+        val logicalRelation = cached.getOrElse {
+          val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
+          val fileCatalog = {
+            val catalog = new CatalogFileIndex(
+              sparkSession, metastoreRelation.catalogTable, sizeInBytes)
+            if (lazyPruningEnabled) {
+              catalog
+            } else {
+              catalog.filterPartitions(Nil)  // materialize all the partitions 
in memory
+            }
           }
+          val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
+          val dataSchema =
+            StructType(metastoreSchema
+              .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
+
+          val relation = HadoopFsRelation(
+            location = fileCatalog,
+            partitionSchema = partitionSchema,
+            dataSchema = dataSchema,
+            bucketSpec = bucketSpec,
+            fileFormat = defaultSource,
+            options = options)(sparkSession = sparkSession)
+
+          val created = LogicalRelation(relation,
+            catalogTable = Some(metastoreRelation.catalogTable))
+          cachedDataSourceTables.put(tableIdentifier, created)
+          created
         }
-        val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet
-        val dataSchema =
-          StructType(metastoreSchema
-            .filterNot(field => 
partitionSchemaColumnNames.contains(field.name.toLowerCase)))
-
-        val relation = HadoopFsRelation(
-          location = fileCatalog,
-          partitionSchema = partitionSchema,
-          dataSchema = dataSchema,
-          bucketSpec = bucketSpec,
-          fileFormat = defaultSource,
-          options = options)(sparkSession = sparkSession)
-
-        val created = LogicalRelation(relation, catalogTable = 
Some(metastoreRelation.catalogTable))
-        cachedDataSourceTables.put(tableIdentifier, created)
-        created
-      }
 
-      logicalRelation
+        logicalRelation
+      })
     } else {
       val rootPath = metastoreRelation.hiveQlTable.getDataLocation
-
-      val cached = getCached(tableIdentifier,
-        Seq(rootPath),
-        metastoreRelation,
-        metastoreSchema,
-        fileFormatClass,
-        bucketSpec,
-        None)
-      val logicalRelation = cached.getOrElse {
-        val created =
-          LogicalRelation(
-            DataSource(
-              sparkSession = sparkSession,
-              paths = rootPath.toString :: Nil,
-              userSpecifiedSchema = Some(metastoreRelation.schema),
-              bucketSpec = bucketSpec,
-              options = options,
-              className = fileType).resolveRelation(),
+      withTableCreationLock(tableIdentifier, {
+        val cached = getCached(tableIdentifier,
+          Seq(rootPath),
+          metastoreRelation,
+          metastoreSchema,
+          fileFormatClass,
+          bucketSpec,
+          None)
+        val logicalRelation = cached.getOrElse {
+          val created =
+            LogicalRelation(
+              DataSource(
+                sparkSession = sparkSession,
+                paths = rootPath.toString :: Nil,
+                userSpecifiedSchema = Some(metastoreRelation.schema),
+                bucketSpec = bucketSpec,
+                options = options,
+                className = fileType).resolveRelation(),
               catalogTable = Some(metastoreRelation.catalogTable))
 
-        cachedDataSourceTables.put(tableIdentifier, created)
-        created
-      }
+          cachedDataSourceTables.put(tableIdentifier, created)
+          created
+        }
 
-      logicalRelation
+        logicalRelation
+      })
     }
     result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fc1b2566/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 65c02d4..55b72c6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
+import java.util.concurrent.{Executors, TimeUnit}
 
 import org.scalatest.BeforeAndAfterEach
 
@@ -395,4 +396,34 @@ class PartitionedTablePerfStatsSuite
       }
     }
   }
+
+  test("SPARK-18700: table loaded only once even when resolved concurrently") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+      withTable("test") {
+        withTempDir { dir =>
+          HiveCatalogMetrics.reset()
+          setupPartitionedHiveTable("test", dir, 50)
+          // select the table in multi-threads
+          val executorPool = Executors.newFixedThreadPool(10)
+          (1 to 10).map(threadId => {
+            val runnable = new Runnable {
+              override def run(): Unit = {
+                spark.sql("select * from test where partCol1 = 999").count()
+              }
+            }
+            executorPool.execute(runnable)
+            None
+          })
+          executorPool.shutdown()
+          executorPool.awaitTermination(30, TimeUnit.SECONDS)
+          // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED 
and
+          // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock 
take effect,
+          // only one thread can really do the build, so the listing job count 
is 2, the other
+          // one is cache.load func. Also METRIC_FILES_DISCOVERED is 
$partition_num * 2
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to