This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6566fc6625 [HUDI-3500] Add call procedure for RepairsCommand (#6053)
6566fc6625 is described below

commit 6566fc6625072b76dd121f0b28ce7b1ef11b6259
Author: superche <[email protected]>
AuthorDate: Sat Jul 9 09:29:14 2022 +0800

    [HUDI-3500] Add call procedure for RepairsCommand (#6053)
---
 .../org/apache/spark/sql/hudi/DeDupeType.scala     |  28 ++
 .../org/apache/spark/sql/hudi/DedupeSparkJob.scala | 245 ++++++++++
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   | 134 ++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   5 +
 .../RepairAddpartitionmetaProcedure.scala          |  89 ++++
 .../RepairCorruptedCleanFilesProcedure.scala       |  86 ++++
 .../procedures/RepairDeduplicateProcedure.scala    |  86 ++++
 .../RepairMigratePartitionMetaProcedure.scala      | 112 +++++
 .../RepairOverwriteHoodiePropsProcedure.scala      |  89 ++++
 .../src/test/resources/table-config.properties     |  21 +
 .../sql/hudi/procedure/TestRepairsProcedure.scala  | 507 +++++++++++++++++++++
 11 files changed, 1402 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DeDupeType.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DeDupeType.scala
new file mode 100644
index 0000000000..93cec470ec
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DeDupeType.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+object DeDupeType extends Enumeration {
+
+  type dedupeType = Value
+
+  val INSERT_TYPE = Value("insert_type")
+  val UPDATE_TYPE = Value("update_type")
+  val UPSERT_TYPE = Value("upsert_type")
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
new file mode 100644
index 0000000000..b6f610e7d7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DedupeSparkJob.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{HoodieBaseFile, HoodieRecord}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+import java.util.stream.Collectors
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{Buffer, HashMap, HashSet, ListBuffer}
+
+/**
+  * Spark job to de-duplicate data present in a partition path
+  */
+class DedupeSparkJob(basePath: String,
+                     duplicatedPartitionPath: String,
+                     repairOutputPath: String,
+                     sqlContext: SQLContext,
+                     fs: FileSystem,
+                     dedupeType: DeDupeType.Value) {
+
+  val sparkHelper = new SparkHelper(sqlContext, fs)
+  val LOG = Logger.getLogger(this.getClass)
+
+  /**
+    *
+    * @param tblName
+    * @return
+    */
+  def getDupeKeyDF(tblName: String): DataFrame = {
+    val dupeSql =
+      s"""
+      select  `${HoodieRecord.RECORD_KEY_METADATA_FIELD}` as dupe_key,
+      count(*) as dupe_cnt
+      from ${tblName}
+      group by `${HoodieRecord.RECORD_KEY_METADATA_FIELD}`
+      having dupe_cnt > 1
+      """
+    sqlContext.sql(dupeSql)
+  }
+
+  /**
+    *
+    * Check a given partition for duplicates and suggest the deletions that 
need to be done in each file,
+    * in order to set things right.
+    *
+    * @return
+    */
+  private def planDuplicateFix(): HashMap[String, HashSet[String]] = {
+    val tmpTableName = s"htbl_${System.currentTimeMillis()}"
+    val dedupeTblName = s"${tmpTableName}_dupeKeys"
+
+    val metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
+
+    val allFiles = fs.listStatus(new 
org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
+    val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), 
allFiles)
+    val latestFiles: java.util.List[HoodieBaseFile] = 
fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
+    val filteredStatuses = latestFiles.map(f => f.getPath)
+    LOG.info(s" List of files under partition: ${} =>  
${filteredStatuses.mkString(" ")}")
+
+    val df = sqlContext.parquetFile(filteredStatuses: _*)
+    df.registerTempTable(tmpTableName)
+    val dupeKeyDF = getDupeKeyDF(tmpTableName)
+    dupeKeyDF.registerTempTable(dedupeTblName)
+
+    // Obtain necessary satellite information for duplicate rows
+    val dupeDataSql =
+      s"""
+        SELECT `_hoodie_record_key`, `_hoodie_partition_path`, 
`_hoodie_file_name`, `_hoodie_commit_time`
+        FROM $tmpTableName h
+        JOIN $dedupeTblName d
+        ON h.`_hoodie_record_key` = d.dupe_key
+                      """
+    val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => 
r.getString(0))
+    getDedupePlan(dupeMap)
+  }
+
+  private def getDedupePlan(dupeMap: Map[String, Buffer[Row]]): 
HashMap[String, HashSet[String]] = {
+    val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()
+    dupeMap.foreach(rt => {
+      val (key, rows) = rt
+
+      dedupeType match {
+        case DeDupeType.UPDATE_TYPE =>
+          /*
+          This corresponds to the case where all duplicates have been updated 
at least once.
+          Once updated, duplicates are bound to have same commit time unless 
forcefully modified.
+          */
+          rows.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+
+        case DeDupeType.INSERT_TYPE =>
+          /*
+          This corresponds to the case where duplicates got created due to 
INSERT and have never been updated.
+          */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c != maxCommit) {
+              val f = r(2).asInstanceOf[String].split("_")(0)
+              if (!fileToDeleteKeyMap.contains(f)) {
+                fileToDeleteKeyMap(f) = HashSet[String]()
+              }
+              fileToDeleteKeyMap(f).add(key)
+            }
+          })
+
+        case DeDupeType.UPSERT_TYPE =>
+          /*
+          This corresponds to the case where duplicates got created as a 
result of inserts as well as updates,
+          i.e few duplicate records have been updated, while others were never 
updated.
+           */
+          var maxCommit = -1L
+
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c > maxCommit)
+              maxCommit = c
+          })
+          val rowsWithMaxCommit = new ListBuffer[Row]()
+          rows.foreach(r => {
+            val c = r(3).asInstanceOf[String].toLong
+            if (c != maxCommit) {
+              val f = r(2).asInstanceOf[String].split("_")(0)
+              if (!fileToDeleteKeyMap.contains(f)) {
+                fileToDeleteKeyMap(f) = HashSet[String]()
+              }
+              fileToDeleteKeyMap(f).add(key)
+            } else {
+              rowsWithMaxCommit += r
+            }
+          })
+
+          rowsWithMaxCommit.toList.init.foreach(r => {
+            val f = r(2).asInstanceOf[String].split("_")(0)
+            if (!fileToDeleteKeyMap.contains(f)) {
+              fileToDeleteKeyMap(f) = HashSet[String]()
+            }
+            fileToDeleteKeyMap(f).add(key)
+          })
+
+        case _ => throw new IllegalArgumentException("Please provide valid 
type for deduping!")
+      }
+    })
+    LOG.debug(s"fileToDeleteKeyMap size: ${fileToDeleteKeyMap.size}, map: 
$fileToDeleteKeyMap")
+    fileToDeleteKeyMap
+  }
+
+  def fixDuplicates(dryRun: Boolean = true) = {
+    val metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
+
+    val allFiles = fs.listStatus(new 
Path(s"$basePath/$duplicatedPartitionPath"))
+    val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), 
allFiles)
+
+    val latestFiles: java.util.List[HoodieBaseFile] = 
fsView.getLatestBaseFiles().collect(Collectors.toList[HoodieBaseFile]())
+
+    val fileNameToPathMap = latestFiles.map(f => (f.getFileId, new 
Path(f.getPath))).toMap
+    val dupeFixPlan = planDuplicateFix()
+
+    // 1. Copy all latest files into the temp fix path
+    fileNameToPathMap.foreach { case (fileName, filePath) =>
+      val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else ""
+      val dstPath = new 
Path(s"$repairOutputPath/${filePath.getName}$badSuffix")
+      LOG.info(s"Copying from $filePath to $dstPath")
+      FileUtil.copy(fs, filePath, fs, dstPath, false, true, fs.getConf)
+    }
+
+    // 2. Remove duplicates from the bad files
+    dupeFixPlan.foreach { case (fileName, keysToSkip) =>
+      val instantTime = 
FSUtils.getCommitTime(fileNameToPathMap(fileName).getName)
+      val badFilePath = new 
Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}.bad")
+      val newFilePath = new 
Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}")
+      LOG.info(" Skipping and writing new file for : " + fileName)
+      SparkHelpers.skipKeysAndWriteNewFile(instantTime, fs, badFilePath, 
newFilePath, dupeFixPlan(fileName))
+      fs.delete(badFilePath, true)
+    }
+
+    // 3. Check that there are no duplicates anymore.
+    val df = sqlContext.read.parquet(s"$repairOutputPath/*.parquet")
+    df.registerTempTable("fixedTbl")
+    val dupeKeyDF = getDupeKeyDF("fixedTbl")
+    val dupeCnt = dupeKeyDF.count()
+    if (dupeCnt != 0) {
+      dupeKeyDF.show()
+      throw new HoodieException("Still found some duplicates!!.. Inspect 
output")
+    }
+
+    // 4. Additionally ensure no record keys are left behind.
+    val sourceDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => 
t._2.toString).toList)
+    val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => 
s"$repairOutputPath/${t._2.getName}").toList)
+    val missedRecordKeysDF = sourceDF.except(fixedDF)
+    val missedCnt = missedRecordKeysDF.count()
+    if (missedCnt != 0) {
+      missedRecordKeysDF.show()
+      throw new HoodieException("Some records in source are not found in fixed 
files. Inspect output!!")
+    }
+
+    println("No duplicates found & counts are in check!!!! ")
+    // 5. Prepare to copy the fixed files back.
+    fileNameToPathMap.foreach { case (_, filePath) =>
+      val srcPath = new Path(s"$repairOutputPath/${filePath.getName}")
+      val dstPath = new 
Path(s"$basePath/$duplicatedPartitionPath/${filePath.getName}")
+      if (dryRun) {
+        LOG.info(s"[JUST KIDDING!!!] Copying from $srcPath to $dstPath")
+      } else {
+        // for real
+        LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath")
+        FileUtil.copy(fs, srcPath, fs, dstPath, false, true, fs.getConf)
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
new file mode 100644
index 0000000000..1ed0e5e1a4
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.IndexedRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.avro.HoodieAvroWriteSupport
+import org.apache.hudi.client.SparkTaskContextSupplier
+import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.util.BaseFileUtils
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
+import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, 
HoodieParquetConfig}
+import org.apache.parquet.avro.AvroSchemaConverter
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable._
+
+object SparkHelpers {
+  @throws[Exception]
+  def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: 
Path, destinationFile: Path, keysToSkip: Set[String]) {
+    val sourceRecords = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, 
sourceFile)
+    val schema: Schema = sourceRecords.get(0).getSchema
+    val filter: BloomFilter = 
BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt,
 HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
+      
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, 
HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
+    val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new 
AvroSchemaConverter(fs.getConf).convert(schema), schema, 
org.apache.hudi.common.util.Option.of(filter))
+    val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new 
HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, 
HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, 
HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, 
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, 
HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
+
+    // Add current classLoad for config, if not will throw classNotFound of 
'HoodieWrapperFileSystem'.
+    
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
+
+    val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, 
parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
+    for (rec <- sourceRecords) {
+      val key: String = 
rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
+      if (!keysToSkip.contains(key)) {
+
+        writer.writeAvro(key, rec)
+      }
+    }
+    writer.close
+  }
+}
+
+/**
+  * Bunch of Spark Shell/Scala stuff useful for debugging
+  */
+class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
+
+  /**
+    * Print keys from a file
+    *
+    * @param file
+    */
+  def printKeysFromFile(file: String) = {
+    getRowKeyDF(file).collect().foreach(println(_))
+  }
+
+  /**
+    *
+    * @param file
+    * @return
+    */
+  def getRowKeyDF(file: String): DataFrame = {
+    
sqlContext.read.parquet(file).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
+  }
+
+  /**
+    * Does the rowKey actually exist in the file.
+    *
+    * @param rowKey
+    * @param file
+    * @return
+    */
+  def isFileContainsKey(rowKey: String, file: String): Boolean = {
+    println(s"Checking $file for key $rowKey")
+    val ff = 
getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = 
'$rowKey'")
+    if (ff.count() > 0) true else false
+  }
+
+  /**
+    * Number of keys in a given file
+    *
+    * @param file
+    * @param sqlContext
+    */
+  def getKeyCount(file: String, sqlContext: org.apache.spark.sql.SQLContext) = 
{
+    val keyCount = getRowKeyDF(file).collect().length
+    println(keyCount)
+    keyCount
+  }
+
+  /**
+    *
+    * Checks that all the keys in the file, have been added to the bloom filter
+    * in the footer
+    *
+    * @param conf
+    * @param sqlContext
+    * @param file
+    * @return
+    */
+  def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: 
String): Boolean = {
+    val bf = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf,
 new Path(file))
+    val foundCount = sqlContext.parquetFile(file)
+      .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
+      .collect().count(r => !bf.mightContain(r.getString(0)))
+    val totalCount = getKeyCount(file, sqlContext)
+    println(s"totalCount: $totalCount, foundCount: $foundCount")
+    totalCount == foundCount
+  }
+
+  def getDistinctKeyDF(paths: List[String]): DataFrame = {
+    sqlContext.read.parquet(paths: 
_*).select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`").distinct()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 5f2728597e..1eb82d97c5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -75,6 +75,11 @@ object HoodieProcedures {
     mapBuilder.put(ValidateMetadataFilesProcedure.NAME, 
ValidateMetadataFilesProcedure.builder)
     mapBuilder.put(ShowFsPathDetailProcedure.NAME, 
ShowFsPathDetailProcedure.builder)
     mapBuilder.put(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
+    mapBuilder.put(RepairAddpartitionmetaProcedure.NAME, 
RepairAddpartitionmetaProcedure.builder)
+    mapBuilder.put(RepairCorruptedCleanFilesProcedure.NAME, 
RepairCorruptedCleanFilesProcedure.builder)
+    mapBuilder.put(RepairDeduplicateProcedure.NAME, 
RepairDeduplicateProcedure.builder)
+    mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, 
RepairMigratePartitionMetaProcedure.builder)
+    mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, 
RepairOverwriteHoodiePropsProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
new file mode 100644
index 0000000000..bb65174c4b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodiePartitionMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+
+class RepairAddpartitionmetaProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("partition_path", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("metadata_is_present", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val dryRun = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
+    val tablePath = getBasePath(tableName)
+
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
+
+    val latestCommit: String = 
metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp
+    val partitionPaths: util.List[String] = 
FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getFs, tablePath);
+    val basePath: Path = new Path(tablePath)
+
+    val rows = new util.ArrayList[Row](partitionPaths.size)
+    for (partition <- partitionPaths) {
+      val partitionPath: Path = FSUtils.getPartitionPath(basePath, partition)
+      var isPresent = "Yes"
+      var action = "None"
+      if (!HoodiePartitionMetadata.hasPartitionMetadata(metaClient.getFs, 
partitionPath)) {
+        isPresent = "No"
+        if (!dryRun) {
+          val partitionMetadata: HoodiePartitionMetadata = new 
HoodiePartitionMetadata(metaClient.getFs, latestCommit, basePath, 
partitionPath, metaClient.getTableConfig.getPartitionMetafileFormat)
+          partitionMetadata.trySave(0)
+          action = "Repaired"
+        }
+      }
+      rows.add(Row(partition, isPresent, action))
+    }
+
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new RepairAddpartitionmetaProcedure()
+}
+
+object RepairAddpartitionmetaProcedure {
+  val NAME = "repair_add_partition_meta"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RepairAddpartitionmetaProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
new file mode 100644
index 0000000000..ff185d1bdf
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.avro.AvroRuntimeException
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant}
+import org.apache.hudi.common.util.CleanerUtils
+import org.apache.hudi.exception.HoodieIOException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.io.IOException
+import java.util.function.Supplier
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class RepairCorruptedCleanFilesProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.BooleanType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getBasePath(tableName)
+
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
+
+    val cleanerTimeline = metaClient.getActiveTimeline.getCleanerTimeline
+    logInfo("Inspecting pending clean metadata in timeline for corrupted 
files")
+    var result = true
+    
cleanerTimeline.filterInflightsAndRequested.getInstants.iterator().asScala.foreach((instant:
 HoodieInstant) => {
+      try {
+        CleanerUtils.getCleanerPlan(metaClient, instant)
+      } catch {
+        case e: AvroRuntimeException =>
+          logWarning("Corruption found. Trying to remove corrupted clean 
instant file: " + instant)
+          HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, 
metaClient.getMetaPath, instant)
+        case ioe: IOException =>
+          if (ioe.getMessage.contains("Not an Avro data file")) {
+            logWarning("Corruption found. Trying to remove corrupted clean 
instant file: " + instant)
+            HoodieActiveTimeline.deleteInstantFile(metaClient.getFs, 
metaClient.getMetaPath, instant)
+          } else {
+            result = false
+            throw new HoodieIOException(ioe.getMessage, ioe)
+          }
+      }
+    })
+    Seq(Row(result))
+  }
+
+  override def build: Procedure = new RepairCorruptedCleanFilesProcedure()
+}
+
+object RepairCorruptedCleanFilesProcedure {
+  val NAME = "repair_corrupted_clean_files"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RepairCorruptedCleanFilesProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
new file mode 100644
index 0000000000..8ee5055e1f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+import java.util.function.Supplier
+
+import org.apache.spark.sql.hudi.{DeDupeType, DedupeSparkJob}
+
+import scala.util.{Failure, Success, Try}
+
+class RepairDeduplicateProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "duplicated_partition_path", 
DataTypes.StringType, None),
+    ProcedureParameter.required(2, "repaired_output_path", 
DataTypes.StringType, None),
+    ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, true),
+    ProcedureParameter.optional(4, "dedupe_type", DataTypes.StringType, 
"insert_type")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.StringType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val duplicatedPartitionPath = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
+    val repairedOutputPath = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[String]
+    val dryRun = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val dedupeType = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+
+    if (!DeDupeType.values.contains(DeDupeType.withName(dedupeType))) {
+      throw new IllegalArgumentException("Please provide valid dedupe type!")
+    }
+    val basePath = getBasePath(tableName)
+
+    Try {
+      val job = new DedupeSparkJob(basePath, duplicatedPartitionPath, 
repairedOutputPath, spark.sqlContext,
+        FSUtils.getFs(basePath, jsc.hadoopConfiguration), 
DeDupeType.withName(dedupeType))
+      job.fixDuplicates(dryRun)
+    } match {
+      case Success(_) =>
+        if (dryRun){
+          Seq(Row(s"Deduplicated files placed in: $repairedOutputPath."))
+        } else {
+          Seq(Row(s"Deduplicated files placed in: $duplicatedPartitionPath."))
+        }
+      case Failure(e) =>
+        throw new HoodieException(s"Deduplication failed!", e)
+    }
+  }
+  override def build: Procedure = new RepairDeduplicateProcedure()
+}
+
+object RepairDeduplicateProcedure {
+  val NAME = "repair_deduplicate"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RepairDeduplicateProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
new file mode 100644
index 0000000000..7daacb2f18
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodiePartitionMetadata
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.util.Option
+import org.apache.hudi.exception.HoodieIOException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.io.IOException
+import java.util
+import java.util.Properties
+import java.util.function.{Consumer, Supplier}
+import scala.collection.JavaConversions._
+
+class RepairMigratePartitionMetaProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("partition_path", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("text_metafile_present", DataTypes.StringType, nullable = 
true, Metadata.empty),
+    StructField("base_metafile_present", DataTypes.StringType, nullable = 
true, Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val dryRun = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
+    val tablePath = getBasePath(tableName)
+
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
+
+    val engineContext: HoodieLocalEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf)
+    val partitionPaths: util.List[String] = 
FSUtils.getAllPartitionPaths(engineContext, tablePath, false, false)
+    val basePath: Path = new Path(tablePath)
+
+    val rows = new util.ArrayList[Row](partitionPaths.size)
+    for (partitionPath <- partitionPaths) {
+      val partition: Path = FSUtils.getPartitionPath(tablePath, partitionPath)
+      val textFormatFile: Option[Path] = 
HoodiePartitionMetadata.textFormatMetaPathIfExists(metaClient.getFs, partition)
+      val baseFormatFile: Option[Path] = 
HoodiePartitionMetadata.baseFormatMetaPathIfExists(metaClient.getFs, partition)
+      val latestCommit: String = 
metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp
+      var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE"
+      if (!dryRun) {
+        if (!baseFormatFile.isPresent) {
+          val partitionMetadata: HoodiePartitionMetadata = new 
HoodiePartitionMetadata(metaClient.getFs, latestCommit,
+            basePath, partition, 
Option.of(metaClient.getTableConfig.getBaseFileFormat))
+          partitionMetadata.trySave(0)
+        }
+        // delete it, in case we failed midway last time.
+        textFormatFile.ifPresent(
+          new Consumer[Path] {
+            override def accept(p: Path): Unit = {
+              try metaClient.getFs.delete(p, false)
+              catch {
+                case e: IOException =>
+                  throw new HoodieIOException(e.getMessage, e)
+              }
+            }
+          })
+        action = "MIGRATED"
+      }
+      rows.add(Row(partitionPath, String.valueOf(textFormatFile.isPresent),
+        String.valueOf(baseFormatFile.isPresent), action))
+    }
+    val props: Properties = new Properties
+    
props.setProperty(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key, 
"true")
+    HoodieTableConfig.update(metaClient.getFs, new 
Path(metaClient.getMetaPath), props)
+
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new RepairMigratePartitionMetaProcedure()
+}
+
+object RepairMigratePartitionMetaProcedure {
+  val NAME = "repair_migrate_partition_meta"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RepairMigratePartitionMetaProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
new file mode 100644
index 0000000000..043217cf2d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.io.FileInputStream
+import java.util
+import java.util.Properties
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "new_props_file_path", 
DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("property", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("old_value", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("new_value", DataTypes.StringType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val overwriteFilePath = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
+    val tablePath = getBasePath(tableName)
+
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(tablePath).build
+
+    var newProps = new Properties
+    newProps.load(new FileInputStream(overwriteFilePath))
+    val oldProps = metaClient.getTableConfig.propsMap
+    val metaPathDir = new Path(tablePath, METAFOLDER_NAME)
+    HoodieTableConfig.create(metaClient.getFs, metaPathDir, newProps)
+    // reload new props as checksum would have been added
+    newProps = HoodieTableMetaClient.reload(metaClient).getTableConfig.getProps
+
+    val allPropKeys = new util.TreeSet[String]
+    allPropKeys.addAll(newProps.keySet.stream.iterator().asScala.map(key => 
key.toString).toList)
+    allPropKeys.addAll(oldProps.keySet)
+
+    val rows = new util.ArrayList[Row](allPropKeys.size)
+    for (propKey <- allPropKeys) {
+      rows.add(Row(propKey, oldProps.getOrDefault(propKey, "null"),
+        newProps.getOrDefault(propKey, "null").toString))
+    }
+
+    rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new RepairOverwriteHoodiePropsProcedure()
+}
+
+object RepairOverwriteHoodiePropsProcedure {
+  val NAME = "repair_overwrite_hoodie_props"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new RepairOverwriteHoodiePropsProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/table-config.properties 
b/hudi-spark-datasource/hudi-spark/src/test/resources/table-config.properties
new file mode 100644
index 0000000000..d74c0444a5
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/table-config.properties
@@ -0,0 +1,21 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+hoodie.table.name=test_table
+hoodie.table.type=COPY_ON_WRITE
+hoodie.archivelog.folder=archive
+hoodie.timeline.layout.version=1
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
new file mode 100644
index 0000000000..587f7a4bdd
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
SchemaTestUtil}
+import org.apache.hudi.testutils.HoodieSparkWriteableTestTable
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+import java.io.IOException
+import java.net.URL
+import java.nio.file.{Files, Paths}
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class TestRepairsProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call repair_add_partition_meta Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // create commit instant
+      Files.createFile(Paths.get(tablePath, ".hoodie", "100.commit"))
+
+      val metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      // create partition path
+      val partition1 = Paths.get(tablePath, "2016/03/15").toString
+      val partition2 = Paths.get(tablePath, "2015/03/16").toString
+      val partition3 = Paths.get(tablePath, "2015/03/17").toString
+      assertResult(metaClient.getFs.mkdirs(new Path(partition1))) {true}
+      assertResult(metaClient.getFs.mkdirs(new Path(partition2))) {true}
+      assertResult(metaClient.getFs.mkdirs(new Path(partition3))) {true}
+
+      // default is dry run
+      val dryResult = spark.sql(s"""call repair_add_partition_meta(table => 
'$tableName')""").collect()
+      assertResult(3) {
+        dryResult.length
+      }
+
+      // real run
+      val realRunResult = spark.sql(s"""call repair_add_partition_meta(table 
=> '$tableName', dry_run => false)""").collect()
+      assertResult(3) {
+        realRunResult.length
+      }
+    }
+  }
+
+  test("Test Call repair_overwrite_hoodie_props Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // create commit instant
+      val newProps: URL = 
this.getClass.getClassLoader.getResource("table-config.properties")
+
+      // overwrite hoodie props
+      val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => 
'$tableName', new_props_file_path => '${newProps.getPath}')""").collect()
+      assertResult(15) {
+        Result.length
+      }
+    }
+  }
+
+  test("Test Call repair_corrupted_clean_files Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      var metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      // Create four requested files
+      for (i <- 100 until 104) {
+        val timestamp = String.valueOf(i)
+        // Write corrupted requested Clean File
+        createEmptyCleanRequestedFile(tablePath, timestamp, 
metaClient.getHadoopConf)
+      }
+
+      // reload meta client
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      // first, there are four instants
+      assertResult(4) {
+        
metaClient.getActiveTimeline.filterInflightsAndRequested.getInstants.count
+      }
+
+      checkAnswer(s"""call repair_corrupted_clean_files(table => 
'$tableName')""")(Seq(true))
+
+      // reload meta client
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      // after clearing, there should be 0 instant
+      assertResult(0) {
+        
metaClient.getActiveTimeline.filterInflightsAndRequested.getInstants.count
+      }
+    }
+  }
+
+  private var duplicatedPartitionPath: String = null
+  private var duplicatedPartitionPathWithUpdates: String = null
+  private var duplicatedPartitionPathWithUpserts: String = null
+  private var repairedOutputPath: String = null
+  private var fileFormat: HoodieFileFormat = null
+
+  test("Test Call repair_deduplicate Procedure with insert") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val bashPath = tmp.getCanonicalPath
+      val tablePath = s"$bashPath/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  name string,
+           |  favorite_number int,
+           |  favorite_color string
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'name',
+           |  type = 'cow'
+           | )
+       """.stripMargin)
+      var metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      generateRecords(tablePath, bashPath, metaClient)
+
+      // reload meta client
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+
+      // get fs and check number of latest files
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+        metaClient.getFs.listStatus(new Path(duplicatedPartitionPath)))
+      val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
+      // there should be 3 files
+      assertResult(3) {
+        filteredStatuses.size
+      }
+
+      // before deduplicate, all files contain 210 records
+      var files = filteredStatuses.toArray
+      var recordCount = getRecordCount(files)
+      assertResult(210){recordCount}
+
+      val partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH
+      val result = spark.sql(
+        s"""call repair_deduplicate(table => '$tableName',
+           | duplicated_partition_path => '$partitionPath',
+           | repaired_output_path => 
'$repairedOutputPath')""".stripMargin).collect()
+      assertResult(1) {
+        result.length
+      }
+
+      // after deduplicate, there are 200 records
+      val fileStatus = metaClient.getFs.listStatus(new 
Path(repairedOutputPath))
+      files = fileStatus.map((status: FileStatus) => status.getPath.toString)
+      recordCount = getRecordCount(files)
+      assertResult(200){recordCount}
+    }
+  }
+
+  test("Test Call repair_deduplicate Procedure with update") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val bashPath = tmp.getCanonicalPath
+      val tablePath = s"$bashPath/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  name string,
+           |  favorite_number int,
+           |  favorite_color string
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'name',
+           |  type = 'cow'
+           | )
+       """.stripMargin)
+      var metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      generateRecords(tablePath, bashPath, metaClient)
+
+      // reload meta client
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+
+      // get fs and check number of latest files
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+        metaClient.getFs.listStatus(new 
Path(duplicatedPartitionPathWithUpdates)))
+      val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
+      // there should be 2 files
+      assertResult(2) {
+        filteredStatuses.size
+      }
+
+      // before deduplicate, all files contain 110 records
+      var files = filteredStatuses.toArray
+      var recordCount = getRecordCount(files)
+      assertResult(110){recordCount}
+
+      val partitionPath = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH
+      val result = spark.sql(
+        s"""call repair_deduplicate(table => '$tableName',
+           | duplicated_partition_path => '$partitionPath',
+           | repaired_output_path => '$repairedOutputPath',
+           | dedupe_type => 'update_type')""".stripMargin).collect()
+      assertResult(1) {
+        result.length
+      }
+
+      // after deduplicate, there are 100 records
+      val fileStatus = metaClient.getFs.listStatus(new 
Path(repairedOutputPath))
+      files = fileStatus.map((status: FileStatus) => status.getPath.toString)
+      recordCount = getRecordCount(files)
+      assertResult(100){recordCount}
+    }
+  }
+
+  test("Test Call repair_deduplicate Procedure with upsert") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val bashPath = tmp.getCanonicalPath
+      val tablePath = s"$bashPath/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  name string,
+           |  favorite_number int,
+           |  favorite_color string
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'name',
+           |  type = 'cow'
+           | )
+       """.stripMargin)
+      var metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      generateRecords(tablePath, bashPath, metaClient)
+
+      // reload meta client
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+
+      // get fs and check number of latest files
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+        metaClient.getFs.listStatus(new 
Path(duplicatedPartitionPathWithUpserts)))
+      val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
+      // there should be 3 files
+      assertResult(3) {
+        filteredStatuses.size
+      }
+
+      // before deduplicate, all files contain 120 records
+      var files = filteredStatuses.toArray
+      var recordCount = getRecordCount(files)
+      assertResult(120){recordCount}
+
+      val partitionPath = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH
+      val result = spark.sql(
+        s"""call repair_deduplicate(table => '$tableName',
+           | duplicated_partition_path => '$partitionPath',
+           | repaired_output_path => '$repairedOutputPath',
+           | dedupe_type => 'upsert_type')""".stripMargin).collect()
+      assertResult(1) {
+        result.length
+      }
+
+      // after deduplicate, there are 100 records
+      val fileStatus = metaClient.getFs.listStatus(new 
Path(repairedOutputPath))
+      files = fileStatus.map((status: FileStatus) => status.getPath.toString)
+      recordCount = getRecordCount(files)
+      assertResult(100){recordCount}
+    }
+  }
+
+  test("Test Call repair_deduplicate Procedure with real") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val bashPath = tmp.getCanonicalPath
+      val tablePath = s"$bashPath/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  name string,
+           |  favorite_number int,
+           |  favorite_color string
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'name',
+           |  type = 'cow'
+           | )
+       """.stripMargin)
+      var metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      generateRecords(tablePath, bashPath, metaClient)
+
+      // reload meta client
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+
+      // get fs and check number of latest files
+      val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants,
+        metaClient.getFs.listStatus(new Path(duplicatedPartitionPath)))
+      val filteredStatuses = 
fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList
+      // there should be 3 files
+      assertResult(3) {
+        filteredStatuses.size
+      }
+
+      // before deduplicate, all files contain 210 records
+      var files = filteredStatuses.toArray
+      var recordCount = getRecordCount(files)
+      assertResult(210){recordCount}
+
+      val partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH
+      val result = spark.sql(
+        s"""call repair_deduplicate(table => '$tableName',
+           | duplicated_partition_path => '$partitionPath',
+           | repaired_output_path => '$repairedOutputPath',
+           | dry_run => false)""".stripMargin).collect()
+      assertResult(1) {
+        result.length
+      }
+
+      // after deduplicate, there are 200 records
+      val fileStatus = metaClient.getFs.listStatus(new 
Path(duplicatedPartitionPath))
+      files = fileStatus.map((status: FileStatus) => 
status.getPath.toString).filter(p => p.endsWith(".parquet"))
+      recordCount = getRecordCount(files)
+      assertResult(200){recordCount}
+    }
+  }
+
+  test("Test Call repair_migrate_partition_meta Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | partitioned by (ts)
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // default is dry run
+      var result = spark.sql(s"""call repair_migrate_partition_meta(table => 
'$tableName')""").collect()
+      assertResult(2) {
+        result.length
+      }
+
+      // real run
+      result = spark.sql(s"""call repair_migrate_partition_meta(table => 
'$tableName', dry_run => false)""").collect()
+      assertResult(2) {
+        result.length
+      }
+    }
+  }
+
+  private def generateRecords(tablePath: String, bashpath: String, metaClient: 
HoodieTableMetaClient): Unit ={
+    duplicatedPartitionPath = Paths.get(tablePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString
+    duplicatedPartitionPathWithUpdates = Paths.get(tablePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).toString
+    duplicatedPartitionPathWithUpserts = Paths.get(tablePath, 
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH).toString
+    repairedOutputPath = Paths.get(bashpath, "tmp").toString
+
+    // generate 200 records
+    val schema: Schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema)
+    val testTable: HoodieSparkWriteableTestTable = 
HoodieSparkWriteableTestTable.of(metaClient, schema)
+
+    val hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, 
schema)
+    val hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, 
schema)
+    testTable.addCommit("20160401010101")
+      .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", 
hoodieRecords1)
+    
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
"2", hoodieRecords2)
+    
testTable.getFileIdWithLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+
+    
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, 
"4", hoodieRecords1)
+    
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, 
"6", hoodieRecords1)
+
+    // read records and get 10 to generate duplicates
+    val dupRecords = hoodieRecords1.subList(0, 10)
+    
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, 
"5", dupRecords)
+    testTable.addCommit("20160401010202")
+      .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", 
dupRecords)
+    
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, 
"7", dupRecords)
+    
testTable.withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, 
"8", dupRecords)
+
+    fileFormat = metaClient.getTableConfig.getBaseFileFormat
+  }
+
+  private def getRecordCount(files: Array[String]): Long = {
+    var recordCount: Long = 0
+    for (file <- files){
+      if (HoodieFileFormat.PARQUET == fileFormat){
+        recordCount += spark.sqlContext.read.parquet(file).count()
+      } else if (HoodieFileFormat.ORC == fileFormat) {
+        recordCount += spark.sqlContext.read.orc(file).count()
+      } else {
+        throw new UnsupportedOperationException(fileFormat.name + " format not 
supported yet.")
+      }
+    }
+    recordCount
+  }
+
+  @throws[IOException]
+  def createEmptyCleanRequestedFile(basePath: String, instantTime: String, 
configuration: Configuration): Unit = {
+    val commitFilePath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + 
HoodieTimeline.makeRequestedCleanerFileName(instantTime))
+    val fs = FSUtils.getFs(basePath, configuration)
+    val os = fs.create(commitFilePath, true)
+    os.close()
+  }
+}

Reply via email to