Repository: spark
Updated Branches:
  refs/heads/master 4e3f3cebe -> aff8f15c1


[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS

## What changes were proposed in this pull request?

In the PR, I propose to not perform recursive parallel listening of files in 
the `scanPartitions` method because it can cause a deadlock. Instead of that I 
propose to do `scanPartitions` in parallel for top level partitions only.

## How was this patch tested?

I extended an existing test to trigger the deadlock.

Author: Maxim Gekk <maxim.g...@databricks.com>

Closes #22233 from MaxGekk/fix-recover-partitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aff8f15c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aff8f15c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aff8f15c

Branch: refs/heads/master
Commit: aff8f15c153f8031ceaffa237c60e040c6f8115f
Parents: 4e3f3ce
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Tue Aug 28 11:29:05 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Tue Aug 28 11:29:05 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/command/ddl.scala       | 34 +++++------
 .../spark/sql/execution/command/DDLSuite.scala  | 59 ++++++++++++--------
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 15 ++---
 3 files changed, 61 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7a6f574..e1faece 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.util.Locale
 
 import scala.collection.{GenMap, GenSeq}
-import scala.concurrent.ExecutionContext
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
@@ -40,7 +40,6 @@ import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
-import org.apache.spark.util.ThreadUtils.parmap
 
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
     val evalPool = 
ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
     val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
       try {
-        implicit val ec = ExecutionContext.fromExecutor(evalPool)
         scanPartitions(spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames, threshold,
-          spark.sessionState.conf.resolver)
+          spark.sessionState.conf.resolver, new 
ForkJoinTaskSupport(evalPool)).seq
       } finally {
         evalPool.shutdown()
       }
@@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
       spec: TablePartitionSpec,
       partitionNames: Seq[String],
       threshold: Int,
-      resolver: Resolver)(implicit ec: ExecutionContext): 
Seq[(TablePartitionSpec, Path)] = {
+      resolver: Resolver,
+      evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, 
Path)] = {
     if (partitionNames.isEmpty) {
       return Seq(spec -> path)
     }
 
-    val statuses = fs.listStatus(path, filter).toSeq
-    def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
+    val statuses = fs.listStatus(path, filter)
+    val statusPar: GenSeq[FileStatus] =
+      if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+        // parallelize the list of partitions here, then we can have better 
parallelism later.
+        val parArray = statuses.par
+        parArray.tasksupport = evalTaskSupport
+        parArray
+      } else {
+        statuses
+      }
+    statusPar.flatMap { st =>
       val name = st.getPath.getName
       if (st.isDirectory && name.contains("=")) {
         val ps = name.split("=", 2)
@@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand(
         val value = ExternalCatalogUtils.unescapePathName(ps(1))
         if (resolver(columnName, partitionNames.head)) {
           scanPartitions(spark, fs, filter, st.getPath, spec ++ 
Map(partitionNames.head -> value),
-            partitionNames.drop(1), threshold, resolver)
+            partitionNames.drop(1), threshold, resolver, evalTaskSupport)
         } else {
           logWarning(
             s"expected partition column ${partitionNames.head}, but got 
${ps(0)}, ignoring it")
@@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand(
         Seq.empty
       }
     }
-    val result = if (partitionNames.length > 1 &&
-        statuses.length > threshold || partitionNames.length > 2) {
-      parmap(statuses)(handleStatus _)
-    } else {
-      statuses.map(handleStatus)
-    }
-
-    result.flatten
   }
 
   private def gatherPartitionStats(

http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 78df1db..f8d98de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with 
SharedSQLContext with Befo
   protected override def generateTable(
       catalog: SessionCatalog,
       name: TableIdentifier,
-      isDataSource: Boolean = true): CatalogTable = {
+      isDataSource: Boolean = true,
+      partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
     val storage =
       CatalogStorageFormat.empty.copy(locationUri = 
Some(catalog.defaultTablePath(name)))
     val metadata = new MetadataBuilder()
       .putString("key", "value")
       .build()
+    val schema = new StructType()
+      .add("col1", "int", nullable = true, metadata = metadata)
+      .add("col2", "string")
     CatalogTable(
       identifier = name,
       tableType = CatalogTableType.EXTERNAL,
       storage = storage,
-      schema = new StructType()
-        .add("col1", "int", nullable = true, metadata = metadata)
-        .add("col2", "string")
-        .add("a", "int")
-        .add("b", "int"),
+      schema = schema.copy(
+        fields = schema.fields ++ partitionCols.map(StructField(_, 
IntegerType))),
       provider = Some("parquet"),
-      partitionColumnNames = Seq("a", "b"),
+      partitionColumnNames = partitionCols,
       createTime = 0L,
       createVersion = org.apache.spark.SPARK_VERSION,
       tracksPartitionsInCatalog = true)
@@ -176,7 +177,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils 
{
   protected def generateTable(
       catalog: SessionCatalog,
       name: TableIdentifier,
-      isDataSource: Boolean = true): CatalogTable
+      isDataSource: Boolean = true,
+      partitionCols: Seq[String] = Seq("a", "b")): CatalogTable
 
   private val escapedIdentifier = "`(.+)`".r
 
@@ -228,8 +230,10 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
   private def createTable(
       catalog: SessionCatalog,
       name: TableIdentifier,
-      isDataSource: Boolean = true): Unit = {
-    catalog.createTable(generateTable(catalog, name, isDataSource), 
ignoreIfExists = false)
+      isDataSource: Boolean = true,
+      partitionCols: Seq[String] = Seq("a", "b")): Unit = {
+    catalog.createTable(
+      generateTable(catalog, name, isDataSource, partitionCols), 
ignoreIfExists = false)
   }
 
   private def createTablePartition(
@@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("alter table: recover partition (parallel)") {
-    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "0") {
       testRecoverPartitions()
     }
   }
@@ -1144,23 +1148,32 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
     }
 
     val tableIdent = TableIdentifier("tab1")
-    createTable(catalog, tableIdent)
-    val part1 = Map("a" -> "1", "b" -> "5")
+    createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c"))
+    val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19")
     createTablePartition(catalog, part1, tableIdent)
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
-    val part2 = Map("a" -> "2", "b" -> "6")
+    val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31")
     val root = new Path(catalog.getTableMetadata(tableIdent).location)
     val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
     // valid
-    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
-    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv"))  // file
-    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS"))  // file
-    fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
-    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv"))  // file
-    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv"))  // file
-    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile"))  // 
file
-    fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+    fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv"))  // 
file
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS"))  // 
file
+
+    fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv"))  // 
file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv"))  // 
file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile"))  
// file
+    fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))
+
+    val parts = (10 to 100).map { a =>
+      val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
+      fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
+      fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv"))  
// file
+      createTablePartition(catalog, part, tableIdent)
+      part
+    }
 
     // invalid
     fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
@@ -1174,7 +1187,7 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
     try {
       sql("ALTER TABLE tab1 RECOVER PARTITIONS")
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2))
+        Set(part1, part2) ++ parts)
       if (!isUsingHiveMetastore) {
         assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") 
== "1")
         assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") 
== "2")

http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 7288177..6708a50 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   protected override def generateTable(
       catalog: SessionCatalog,
       name: TableIdentifier,
-      isDataSource: Boolean): CatalogTable = {
+      isDataSource: Boolean,
+      partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
     val storage =
       if (isDataSource) {
         val serde = HiveSerDe.sourceToSerDe("parquet")
@@ -84,17 +85,17 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
     val metadata = new MetadataBuilder()
       .putString("key", "value")
       .build()
+    val schema = new StructType()
+      .add("col1", "int", nullable = true, metadata = metadata)
+      .add("col2", "string")
     CatalogTable(
       identifier = name,
       tableType = CatalogTableType.EXTERNAL,
       storage = storage,
-      schema = new StructType()
-        .add("col1", "int", nullable = true, metadata = metadata)
-        .add("col2", "string")
-        .add("a", "int")
-        .add("b", "int"),
+      schema = schema.copy(
+        fields = schema.fields ++ partitionCols.map(StructField(_, 
IntegerType))),
       provider = if (isDataSource) Some("parquet") else Some("hive"),
-      partitionColumnNames = Seq("a", "b"),
+      partitionColumnNames = partitionCols,
       createTime = 0L,
       createVersion = org.apache.spark.SPARK_VERSION,
       tracksPartitionsInCatalog = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to