Repository: spark
Updated Branches:
  refs/heads/master 6a0fda2c0 -> 48caec251


[SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastore

## What changes were proposed in this pull request?

This PR split the the single `createPartitions()` call into smaller batches, 
which could prevent Hive metastore from OOM (caused by millions of partitions).

It will also try to gather all the fast stats (number of files and total size 
of all files) in parallel to avoid the bottle neck of listing the files in 
metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled 
by default).

## How was this patch tested?

Tested locally with 10000 partitions and 100 files with embedded metastore, 
without gathering fast stats in parallel, adding partitions took 153 seconds, 
after enable that, gathering the fast stats took about 34 seconds, adding these 
partitions took 25 seconds (most of the time spent in object store), 59 seconds 
in total, 2.5X faster (with larger cluster, gathering will much faster).

Author: Davies Liu <[email protected]>

Closes #14607 from davies/repair_batch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48caec25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48caec25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48caec25

Branch: refs/heads/master
Commit: 48caec2516ef35bfa1a3de2dc0a80d0dc819e6bd
Parents: 6a0fda2
Author: Davies Liu <[email protected]>
Authored: Mon Aug 29 11:23:53 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Mon Aug 29 11:23:53 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |   4 +-
 .../spark/sql/execution/command/ddl.scala       | 156 +++++++++++++++----
 .../org/apache/spark/sql/internal/SQLConf.scala |  10 ++
 .../spark/sql/execution/command/DDLSuite.scala  |  13 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |   4 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |   8 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  38 +++++
 7 files changed, 200 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 83e01f9..8408d76 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -81,10 +81,12 @@ object CatalogStorageFormat {
  *
  * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
+ * @param parameters some parameters for the partition, for example, stats.
  */
 case class CatalogTablePartition(
     spec: CatalogTypes.TablePartitionSpec,
-    storage: CatalogStorageFormat)
+    storage: CatalogStorageFormat,
+    parameters: Map[String, String] = Map.empty)
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 3817f91..53fb684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.command
 
-import scala.collection.GenSeq
+import scala.collection.{GenMap, GenSeq}
 import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -32,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
 
 // Note: The definition of these commands are based on the ones described in
 // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -422,6 +424,9 @@ case class AlterTableDropPartitionCommand(
 
 }
 
+
+case class PartitionStatistics(numFiles: Int, totalSize: Long)
+
 /**
  * Recover Partitions in ALTER TABLE: recover all the partition in the 
directory of a table and
  * update the catalog.
@@ -435,6 +440,31 @@ case class AlterTableDropPartitionCommand(
 case class AlterTableRecoverPartitionsCommand(
     tableName: TableIdentifier,
     cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
+
+  // These are list of statistics that can be collected quickly without 
requiring a scan of the data
+  // see https://github.com/apache/hive/blob/master/
+  //   common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
+  val NUM_FILES = "numFiles"
+  val TOTAL_SIZE = "totalSize"
+  val DDL_TIME = "transient_lastDdlTime"
+
+  private def getPathFilter(hadoopConf: Configuration): PathFilter = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() 
is slow)
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+    new PathFilter {
+      override def accept(path: Path): Boolean = {
+        val name = path.getName
+        if (name != "_SUCCESS" && name != "_temporary" && 
!name.startsWith(".")) {
+          pathFilter == null || pathFilter.accept(path)
+        } else {
+          false
+        }
+      }
+    }
+  }
+
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     if (!catalog.tableExists(tableName)) {
@@ -449,10 +479,6 @@ case class AlterTableRecoverPartitionsCommand(
       throw new AnalysisException(
         s"Operation not allowed: $cmd on datasource tables: $tableName")
     }
-    if (table.tableType != CatalogTableType.EXTERNAL) {
-      throw new AnalysisException(
-        s"Operation not allowed: $cmd only works on external tables: 
$tableName")
-    }
     if (table.partitionColumnNames.isEmpty) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd only works on partitioned tables: 
$tableName")
@@ -463,19 +489,26 @@ case class AlterTableRecoverPartitionsCommand(
     }
 
     val root = new Path(table.storage.locationUri.get)
+    logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() 
is slow)
-    val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, 
this.getClass)
-    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
+    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    val pathFilter = getPathFilter(hadoopConf)
     val partitionSpecsAndLocs = scanPartitions(
-      spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase))
-    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
-      // inherit table storage format (possibly except for location)
-      CatalogTablePartition(spec, table.storage.copy(locationUri = 
Some(location.toUri.toString)))
+      spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames.map(_.toLowerCase), threshold)
+    val total = partitionSpecsAndLocs.length
+    logInfo(s"Found $total partitions in $root")
+
+    val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
+      gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, 
threshold)
+    } else {
+      GenMap.empty[String, PartitionStatistics]
     }
-    spark.sessionState.catalog.createPartitions(tableName,
-      parts.toArray[CatalogTablePartition], ignoreIfExists = true)
+    logInfo(s"Finished to gather the fast stats for all $total partitions.")
+
+    addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+    logInfo(s"Recovered all partitions ($total).")
     Seq.empty[Row]
   }
 
@@ -487,15 +520,16 @@ case class AlterTableRecoverPartitionsCommand(
       filter: PathFilter,
       path: Path,
       spec: TablePartitionSpec,
-      partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
-    if (partitionNames.length == 0) {
+      partitionNames: Seq[String],
+      threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+    if (partitionNames.isEmpty) {
       return Seq(spec -> path)
     }
 
-    val statuses = fs.listStatus(path)
-    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", 
"10").toInt
+    val statuses = fs.listStatus(path, filter)
     val statusPar: GenSeq[FileStatus] =
       if (partitionNames.length > 1 && statuses.length > threshold || 
partitionNames.length > 2) {
+        // parallelize the list of partitions here, then we can have better 
parallelism later.
         val parArray = statuses.par
         parArray.tasksupport = evalTaskSupport
         parArray
@@ -510,21 +544,89 @@ case class AlterTableRecoverPartitionsCommand(
         // TODO: Validate the value
         val value = PartitioningUtils.unescapePathName(ps(1))
         // comparing with case-insensitive, but preserve the case
-        if (columnName == partitionNames(0)) {
-          scanPartitions(
-            spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), 
partitionNames.drop(1))
+        if (columnName == partitionNames.head) {
+          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName 
-> value),
+            partitionNames.drop(1), threshold)
         } else {
-          logWarning(s"expect partition column ${partitionNames(0)}, but got 
${ps(0)}, ignore it")
+          logWarning(s"expect partition column ${partitionNames.head}, but got 
${ps(0)}, ignore it")
           Seq()
         }
       } else {
-        if (name != "_SUCCESS" && name != "_temporary" && 
!name.startsWith(".")) {
-          logWarning(s"ignore ${new Path(path, name)}")
-        }
+        logWarning(s"ignore ${new Path(path, name)}")
         Seq()
       }
     }
   }
+
+  private def gatherPartitionStats(
+      spark: SparkSession,
+      partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
+      fs: FileSystem,
+      pathFilter: PathFilter,
+      threshold: Int): GenMap[String, PartitionStatistics] = {
+    if (partitionSpecsAndLocs.length > threshold) {
+      val hadoopConf = spark.sparkContext.hadoopConfiguration
+      val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+      val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
+
+      // Set the number of parallelism to prevent following file listing from 
generating many tasks
+      // in case of large #defaultParallelism.
+      val numParallelism = Math.min(serializedPaths.length,
+        Math.min(spark.sparkContext.defaultParallelism, 10000))
+      // gather the fast stats for all the partitions otherwise Hive metastore 
will list all the
+      // files for all the new partitions in sequential way, which is super 
slow.
+      logInfo(s"Gather the fast stats in parallel using $numParallelism 
tasks.")
+      spark.sparkContext.parallelize(serializedPaths, numParallelism)
+        .mapPartitions { paths =>
+          val pathFilter = getPathFilter(serializableConfiguration.value)
+          paths.map(new Path(_)).map{ path =>
+            val fs = path.getFileSystem(serializableConfiguration.value)
+            val statuses = fs.listStatus(path, pathFilter)
+            (path.toString, PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
+          }
+        }.collectAsMap()
+    } else {
+      partitionSpecsAndLocs.map { case (_, location) =>
+        val statuses = fs.listStatus(location, pathFilter)
+        (location.toString, PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
+      }.toMap
+    }
+  }
+
+  private def addPartitions(
+      spark: SparkSession,
+      table: CatalogTable,
+      partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
+      partitionStats: GenMap[String, PartitionStatistics]): Unit = {
+    val total = partitionSpecsAndLocs.length
+    var done = 0L
+    // Hive metastore may not have enough memory to handle millions of 
partitions in single RPC,
+    // we should split them into smaller batches. Since Hive client is not 
thread safe, we cannot
+    // do this in parallel.
+    val batchSize = 100
+    partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
+      val now = System.currentTimeMillis() / 1000
+      val parts = batch.map { case (spec, location) =>
+        val params = partitionStats.get(location.toString).map {
+          case PartitionStatistics(numFiles, totalSize) =>
+            // This two fast stat could prevent Hive metastore to list the 
files again.
+            Map(NUM_FILES -> numFiles.toString,
+              TOTAL_SIZE -> totalSize.toString,
+              // Workaround a bug in HiveMetastore that try to mutate a 
read-only parameters.
+              // see 
metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+              DDL_TIME -> now.toString)
+        }.getOrElse(Map.empty)
+        // inherit table storage format (possibly except for location)
+        CatalogTablePartition(
+          spec,
+          table.storage.copy(locationUri = Some(location.toUri.toString)),
+          params)
+      }
+      spark.sessionState.catalog.createPartitions(tableName, parts, 
ignoreIfExists = true)
+      done += parts.length
+      logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f2b1afd..9198827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -310,6 +310,14 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
+      .internal()
+      .doc("When true, fast stats (number of files and total size of all 
files) will be gathered" +
+        " in parallel while repairing table partitions to avoid the sequential 
listing in Hive" +
+        " metastore.")
+      .booleanConf
+      .createWithDefault(true)
+
   // This is used to control the when we will split a schema's JSON string to 
multiple pieces
   // in order to fit the JSON string in metastore's table property (by 
default, the value has
   // a length restriction of 4000 characters). We will split the JSON string 
of a schema
@@ -608,6 +616,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def metastorePartitionPruning: Boolean = 
getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
+  def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
+
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
 
   def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b343454..0073659 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -824,13 +824,13 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
   }
 
   test("alter table: recover partitions (sequential)") {
-    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
       testRecoverPartitions()
     }
   }
 
   test("alter table: recover partition (parallel)") {
-    withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
+    withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
       testRecoverPartitions()
     }
   }
@@ -853,7 +853,14 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
     // valid
     fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS"))  // file
     fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile"))  // 
file
+    fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
     // invalid
     fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
     fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
@@ -867,6 +874,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       sql("ALTER TABLE tab1 RECOVER PARTITIONS")
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
         Set(part1, part2))
+      assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == 
"1")
+      assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == 
"2")
     } finally {
       fs.delete(root, true)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 81d5a12..b45ad30 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -829,6 +829,8 @@ private[hive] class HiveClientImpl(
         serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
         compressed = apiPartition.getSd.isCompressed,
         properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
-          .map(_.asScala.toMap).orNull))
+          .map(_.asScala.toMap).orNull),
+        parameters =
+          if (hp.getParameters() != null) hp.getParameters().asScala.toMap 
else Map.empty)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 41527fc..3238770 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
     val table = hive.getTable(database, tableName)
     parts.foreach { s =>
       val location = s.storage.locationUri.map(new Path(table.getPath, 
_)).orNull
+      val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
       val spec = s.spec.asJava
       if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
         // Ignore this partition since it already exists and ignoreIfExists == 
true
@@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
           table,
           spec,
           location,
-          null, // partParams
+          params, // partParams
           null, // inputFormat
           null, // outputFormat
           -1: JInteger, // numBuckets
@@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
     val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
-    parts.foreach { s =>
+    parts.zipWithIndex.foreach { case (s, i) =>
       addPartitionDesc.addPartition(s.spec.asJava, 
s.storage.locationUri.orNull)
+      if (s.parameters.nonEmpty) {
+        addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
+      }
     }
     hive.createPartitions(addPartitionDesc)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/48caec25/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f00a99b..9019333 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -378,6 +378,44 @@ class HiveDDLSuite
       expectedSerdeProps)
   }
 
+  test("MSCK REPAIR RABLE") {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1")
+    sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b 
INT)")
+    val part1 = Map("a" -> "1", "b" -> "5")
+    val part2 = Map("a" -> "2", "b" -> "6")
+    val root = new 
Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
+    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    // valid
+    fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS"))  // file
+    fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv"))  // file
+    fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile"))  // 
file
+    fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
+
+    // invalid
+    fs.mkdirs(new Path(new Path(root, "a"), "b"))  // bad name
+    fs.mkdirs(new Path(new Path(root, "b=1"), "a=1"))  // wrong order
+    fs.mkdirs(new Path(root, "a=4")) // not enough columns
+    fs.createNewFile(new Path(new Path(root, "a=1"), "b=4"))  // file
+    fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS"))  // _SUCCESS
+    fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary"))  // _temporary
+    fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4"))  // start with .
+
+    try {
+      sql("MSCK REPAIR TABLE tab1")
+      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+        Set(part1, part2))
+      assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == 
"1")
+      assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == 
"2")
+    } finally {
+      fs.delete(root, true)
+    }
+  }
+
   test("drop table using drop view") {
     withTable("tab1") {
       sql("CREATE TABLE tab1(c1 int)")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to