This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bab66498c8 [HUDI-6122] Unify call procedure options (#8537)
5bab66498c8 is described below

commit 5bab66498c857c32e042488db72a018b83ea926a
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 11 09:55:32 2023 +0800

    [HUDI-6122] Unify call procedure options (#8537)
---
 .../scala/org/apache/hudi/HoodieCLIUtils.scala     |   8 +
 .../procedures/ArchiveCommitsProcedure.scala       |   4 +-
 .../hudi/command/procedures/BaseProcedure.scala    |  48 ++--
 .../procedures/CommitsCompareProcedure.scala       |   4 +-
 .../command/procedures/CopyToTableProcedure.scala  |   4 +-
 .../hudi/command/procedures/CopyToTempView.scala   |   4 +-
 .../procedures/CreateMetadataTableProcedure.scala  |   2 +-
 .../procedures/CreateSavepointProcedure.scala      |   6 +-
 .../command/procedures/DeleteMarkerProcedure.scala |   4 +-
 .../procedures/DeleteMetadataTableProcedure.scala  |   2 +-
 .../procedures/DeleteSavepointProcedure.scala      |   6 +-
 .../procedures/ExportInstantsProcedure.scala       |   4 +-
 .../procedures/HdfsParquetImportProcedure.scala    |  14 +-
 .../hudi/command/procedures/HelpProcedure.scala    |   4 +-
 .../command/procedures/HiveSyncProcedure.scala     |   2 +-
 .../procedures/InitMetadataTableProcedure.scala    |   2 +-
 .../command/procedures/ProcedureParameter.scala    |   7 +-
 .../RepairAddpartitionmetaProcedure.scala          |   2 +-
 .../RepairCorruptedCleanFilesProcedure.scala       |   2 +-
 .../procedures/RepairDeduplicateProcedure.scala    |   6 +-
 .../RepairMigratePartitionMetaProcedure.scala      |   2 +-
 .../RepairOverwriteHoodiePropsProcedure.scala      |   4 +-
 .../RollbackToInstantTimeProcedure.scala           |   4 +-
 .../procedures/RollbackToSavepointProcedure.scala  |   6 +-
 .../command/procedures/RunBootstrapProcedure.scala |  10 +-
 .../command/procedures/RunCleanProcedure.scala     |  48 ++--
 .../procedures/RunClusteringProcedure.scala        |  34 ++-
 .../procedures/RunCompactionProcedure.scala        |  15 +-
 .../procedures/ShowArchivedCommitsProcedure.scala  |   2 +-
 .../procedures/ShowBootstrapMappingProcedure.scala |   2 +-
 .../ShowBootstrapPartitionsProcedure.scala         |   2 +-
 .../procedures/ShowClusteringProcedure.scala       |   4 +-
 .../ShowCommitExtraMetadataProcedure.scala         |   6 +-
 .../procedures/ShowCommitFilesProcedure.scala      |   4 +-
 .../procedures/ShowCommitPartitionsProcedure.scala |   4 +-
 .../procedures/ShowCommitWriteStatsProcedure.scala |   4 +-
 .../command/procedures/ShowCommitsProcedure.scala  |   2 +-
 .../procedures/ShowCompactionProcedure.scala       |   4 +-
 .../procedures/ShowFileSystemViewProcedure.scala   |   6 +-
 .../procedures/ShowFsPathDetailProcedure.scala     |   2 +-
 .../ShowHoodieLogFileMetadataProcedure.scala       |   4 +-
 .../ShowHoodieLogFileRecordsProcedure.scala        |   4 +-
 .../procedures/ShowInvalidParquetProcedure.scala   |   2 +-
 .../ShowMetadataTableFilesProcedure.scala          |   2 +-
 .../ShowMetadataTablePartitionsProcedure.scala     |   2 +-
 .../ShowMetadataTableStatsProcedure.scala          |   2 +-
 .../procedures/ShowRollbacksProcedure.scala        |   6 +-
 .../procedures/ShowSavepointsProcedure.scala       |   4 +-
 .../procedures/ShowTablePropertiesProcedure.scala  |   4 +-
 .../procedures/StatsFileSizeProcedure.scala        |   2 +-
 .../StatsWriteAmplificationProcedure.scala         |   2 +-
 .../procedures/UpgradeOrDowngradeProcedure.scala   |   4 +-
 .../procedures/ValidateHoodieSyncProcedure.scala   |  10 +-
 .../ValidateMetadataTableFilesProcedure.scala      |   2 +-
 .../sql/hudi/procedure/TestCleanProcedure.scala    | 269 ++++++++++++++-------
 .../hudi/procedure/TestCompactionProcedure.scala   |  45 ++++
 56 files changed, 403 insertions(+), 261 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 5f0cba6fd7c..c9f5a8a1215 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -22,6 +22,7 @@ package org.apache.hudi
 import org.apache.hudi.avro.model.HoodieClusteringGroup
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.util.StringUtils
 import org.apache.spark.SparkException
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
@@ -90,4 +91,11 @@ object HoodieCLIUtils {
         throw new SparkException(s"Unsupported identifier $table")
     }
   }
+
+  def extractOptions(s: String): Map[String, String] = {
+    StringUtils.split(s, ",").asScala
+      .map(split => StringUtils.split(split, "="))
+      .map(pair => pair.get(0) -> pair.get(1))
+      .toMap
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
index b097c942ad2..801e106419b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
@@ -30,8 +30,8 @@ class ArchiveCommitsProcedure extends BaseProcedure
   with SparkAdapterSupport
   with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
     ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType, 20),
     ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType, 30),
     ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 
10),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index 2930fc36c4c..b06aea2ac58 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
 
 abstract class BaseProcedure extends Procedure {
-  val INVALID_ARG_INDEX: Int = -1
-
   val spark: SparkSession = SparkSession.active
   val jsc = new JavaSparkContext(spark.sparkContext)
 
@@ -41,41 +39,31 @@ abstract class BaseProcedure extends Procedure {
       .build
   }
 
-  protected def checkArgs(target: Array[ProcedureParameter], args: 
ProcedureArgs): Unit = {
-    val internalRow = args.internalRow
-    for (i <- target.indices) {
-      if (target(i).required) {
-        var argsIndex: Integer = null
-        if (args.isNamedArgs) {
-          argsIndex = getArgsIndex(target(i).name, args)
-        } else {
-          argsIndex = getArgsIndex(i.toString, args)
-        }
-        assert(-1 != argsIndex && internalRow.get(argsIndex, 
target(i).dataType) != null,
-          s"Argument: ${target(i).name} is required")
-      }
+  protected def getParamKey(parameter: ProcedureParameter, isNamedArgs: 
Boolean): String = {
+    if (isNamedArgs) {
+      parameter.name
+    } else {
+      parameter.index.toString
     }
   }
 
-  protected def getArgsIndex(key: String, args: ProcedureArgs): Integer = {
-    args.map.getOrDefault(key, INVALID_ARG_INDEX)
+  protected def checkArgs(parameters: Array[ProcedureParameter], args: 
ProcedureArgs): Unit = {
+    for (parameter <- parameters) {
+      if (parameter.required) {
+        val paramKey = getParamKey(parameter, args.isNamedArgs)
+        assert(args.map.containsKey(paramKey) &&
+          args.internalRow.get(args.map.get(paramKey), parameter.dataType) != 
null,
+          s"Argument: ${parameter.name} is required")
+      }
+    }
   }
 
   protected def getArgValueOrDefault(args: ProcedureArgs, parameter: 
ProcedureParameter): Option[Any] = {
-    var argsIndex: Int = INVALID_ARG_INDEX
-    if (args.isNamedArgs) {
-      argsIndex = getArgsIndex(parameter.name, args)
-    } else {
-      argsIndex = getArgsIndex(parameter.index.toString, args)
-    }
-
-    if (argsIndex.equals(INVALID_ARG_INDEX)) {
-      parameter.default match {
-        case option: Option[Any] => option
-        case _ => Option.apply(parameter.default)
-      }
+    val paramKey = getParamKey(parameter, args.isNamedArgs)
+    if (args.map.containsKey(paramKey)) {
+      Option.apply(getInternalRowValue(args.internalRow, 
args.map.get(paramKey), parameter.dataType))
     } else {
-      Option.apply(getInternalRowValue(args.internalRow, argsIndex, 
parameter.dataType))
+      Option.apply(parameter.default)
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
index 379b38e5921..fdac678b477 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
@@ -30,8 +30,8 @@ import scala.collection.JavaConverters._
 
 class CommitsCompareProcedure() extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "path", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
index f5a895239cf..ac23595048f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
@@ -28,9 +28,9 @@ class CopyToTableProcedure extends BaseProcedure with 
ProcedureBuilder with Logg
 
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "query_type", DataTypes.StringType, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL),
-    ProcedureParameter.required(2, "new_table", DataTypes.StringType, None),
+    ProcedureParameter.required(2, "new_table", DataTypes.StringType),
     ProcedureParameter.optional(3, "begin_instance_time", 
DataTypes.StringType, ""),
     ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType, 
""),
     ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
index 13259c4964f..89c00dac6e4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
@@ -27,9 +27,9 @@ import java.util.function.Supplier
 class CopyToTempView extends BaseProcedure with ProcedureBuilder with Logging {
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "query_type", DataTypes.StringType, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL),
-    ProcedureParameter.required(2, "view_name", DataTypes.StringType, None),
+    ProcedureParameter.required(2, "view_name", DataTypes.StringType),
     ProcedureParameter.optional(3, "begin_instance_time", 
DataTypes.StringType, ""),
     ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType, 
""),
     ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
index bbed979f5cd..722ed07cc31 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
@@ -31,7 +31,7 @@ import java.util.function.Supplier
 
 class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder 
with SparkAdapterSupport {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
index 1983d825ad4..09fe59f7269 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
@@ -29,11 +29,11 @@ import java.util.function.Supplier
 
 class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "commit_time", DataTypes.StringType),
     ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
     ProcedureParameter.optional(3, "comments", DataTypes.StringType, ""),
-    ProcedureParameter.optional(4, "path", DataTypes.StringType, None)
+    ProcedureParameter.optional(4, "path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
index 32c853345d4..87d58fa6ed0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
@@ -30,8 +30,8 @@ import scala.util.{Failure, Success, Try}
 
 class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with 
Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "instant_time", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
index d6fccc1f9d2..1a6a2cfa5fc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
@@ -29,7 +29,7 @@ import java.util.function.Supplier
 
 class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder 
with SparkAdapterSupport {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
index e9d13914789..58b12c70c64 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
@@ -29,9 +29,9 @@ import java.util.function.Supplier
 
 class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
-    ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "instant_time", DataTypes.StringType),
+    ProcedureParameter.optional(2, "path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 97930432e4e..31918ad080c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -47,8 +47,8 @@ class ExportInstantsProcedure extends BaseProcedure with 
ProcedureBuilder with L
   val defaultActions = "clean,commit,deltacommit,rollback,savepoint,restore"
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "local_folder", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "local_folder", DataTypes.StringType),
     ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
     ProcedureParameter.optional(3, "actions", DataTypes.StringType, 
defaultActions),
     ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
index c9bee569c7f..e050572899e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
@@ -27,13 +27,13 @@ import scala.language.higherKinds
 
 class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
-    ProcedureParameter.required(2, "src_path", DataTypes.StringType, None),
-    ProcedureParameter.required(3, "target_path", DataTypes.StringType, None),
-    ProcedureParameter.required(4, "row_key", DataTypes.StringType, None),
-    ProcedureParameter.required(5, "partition_key", DataTypes.StringType, 
None),
-    ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType, 
None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "table_type", DataTypes.StringType),
+    ProcedureParameter.required(2, "src_path", DataTypes.StringType),
+    ProcedureParameter.required(3, "target_path", DataTypes.StringType),
+    ProcedureParameter.required(4, "row_key", DataTypes.StringType),
+    ProcedureParameter.required(5, "partition_key", DataTypes.StringType),
+    ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType),
     ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"),
     ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"),
     ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
index 5052b70aff1..43b7fcfc675 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
@@ -27,7 +27,7 @@ import java.util.function.Supplier
 class HelpProcedure extends BaseProcedure with ProcedureBuilder with Logging {
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "cmd", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "cmd", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -88,7 +88,7 @@ class HelpProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
         result.append(tab)
           .append(lengthFormat(param.name)).append(tab)
           .append(lengthFormat(param.dataType.typeName)).append(tab)
-          .append(lengthFormat(param.default.toString)).append(tab)
+          .append(lengthFormat(String.valueOf(param.default))).append(tab)
           .append(lengthFormat(param.required.toString)).append(line)
       })
       result.append("outputType:").append(line)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
index 513f40a4c8c..e97fb458a9a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
@@ -33,7 +33,7 @@ class HiveSyncProcedure extends BaseProcedure with 
ProcedureBuilder
   with ProvidesHoodieConfig with Logging {
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "metastore_uri", DataTypes.StringType, ""),
     ProcedureParameter.optional(2, "username", DataTypes.StringType, ""),
     ProcedureParameter.optional(3, "password", DataTypes.StringType, ""),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
index 3b875e77ffa..cfeb3905126 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
@@ -32,7 +32,7 @@ import java.util.function.Supplier
 
 class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder 
with SparkAdapterSupport with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "read_only", DataTypes.BooleanType, false)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
index a9ad252bd7a..0757314e4b6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
@@ -54,8 +54,8 @@ object ProcedureParameter {
    * @param dataType the type of the parameter
    * @return the constructed stored procedure parameter
    */
-  def required(index: Int, name: String, dataType: DataType, default: Any): 
ProcedureParameterImpl = {
-    ProcedureParameterImpl(index, name, dataType, default, required = true)
+  def required(index: Int, name: String, dataType: DataType): 
ProcedureParameterImpl = {
+    ProcedureParameterImpl(index, name, dataType, null, required = true)
   }
 
   /**
@@ -63,9 +63,10 @@ object ProcedureParameter {
    *
    * @param name     the name of the parameter.
    * @param dataType the type of the parameter.
+   * @param default  the default value of the parameter.
    * @return the constructed optional stored procedure parameter
    */
-  def optional(index: Int, name: String, dataType: DataType, default: Any): 
ProcedureParameterImpl = {
+  def optional(index: Int, name: String, dataType: DataType, default: Any = 
null): ProcedureParameterImpl = {
     ProcedureParameterImpl(index, name, dataType, default, required = false)
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
index bb65174c4b4..d636b7328b9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
@@ -31,7 +31,7 @@ import scala.collection.JavaConversions._
 
 class RepairAddpartitionmetaProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
index ff185d1bdfa..4a828893bc5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
@@ -32,7 +32,7 @@ import 
scala.collection.JavaConverters.asScalaIteratorConverter
 
 class RepairCorruptedCleanFilesProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
index 8ee5055e1fd..d4d22364fe8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
@@ -30,9 +30,9 @@ import scala.util.{Failure, Success, Try}
 
 class RepairDeduplicateProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "duplicated_partition_path", 
DataTypes.StringType, None),
-    ProcedureParameter.required(2, "repaired_output_path", 
DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "duplicated_partition_path", 
DataTypes.StringType),
+    ProcedureParameter.required(2, "repaired_output_path", 
DataTypes.StringType),
     ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, true),
     ProcedureParameter.optional(4, "dedupe_type", DataTypes.StringType, 
"insert_type")
   )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index 7daacb2f184..66ab250ee7f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConversions._
 
 class RepairMigratePartitionMetaProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
index 043217cf2df..81a09e147a7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
@@ -33,8 +33,8 @@ import 
scala.collection.JavaConverters.asScalaIteratorConverter
 
 class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "new_props_file_path", 
DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "new_props_file_path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
index 862b74b427a..49775236b78 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
@@ -32,8 +32,8 @@ import java.util.function.Supplier
 
 class RollbackToInstantTimeProcedure extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None))
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "instant_time", DataTypes.StringType))
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("rollback_result", DataTypes.BooleanType, nullable = true, 
Metadata.empty))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
index 955ceb01045..94bdc6db80f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
@@ -29,9 +29,9 @@ import java.util.function.Supplier
 
 class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
-    ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "instant_time", DataTypes.StringType),
+    ProcedureParameter.optional(2, "path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
index 6025276176e..bb93cf1a485 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -36,11 +36,11 @@ import java.util.function.Supplier
 import scala.collection.JavaConverters._
 class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with 
Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
-    ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType, 
None),
-    ProcedureParameter.required(3, "base_path", DataTypes.StringType, None),
-    ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "table_type", DataTypes.StringType),
+    ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType),
+    ProcedureParameter.required(3, "base_path", DataTypes.StringType),
+    ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType),
     ProcedureParameter.optional(5, "base_file_format", DataTypes.StringType, 
"PARQUET"),
     ProcedureParameter.optional(6, "partition_path_field", 
DataTypes.StringType, ""),
     ProcedureParameter.optional(7, "bootstrap_index_class", 
DataTypes.StringType, 
"org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index f7653ce680e..7f7c2fe0825 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -31,15 +31,16 @@ import java.util.function.Supplier
 class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with 
Logging {
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, 
true),
-    ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, 
HoodieCleanConfig.CLEANER_POLICY.defaultValue()),
-    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt),
-    ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType, 
HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt),
-    ProcedureParameter.optional(6, "file_versions_retained", 
DataTypes.IntegerType, 
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt),
-    ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType, 
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()),
-    ProcedureParameter.optional(8, "trigger_max_commits", 
DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt)
+    ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType),
+    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType),
+    ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType),
+    ProcedureParameter.optional(6, "file_versions_retained", 
DataTypes.IntegerType),
+    ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType),
+    ProcedureParameter.optional(8, "trigger_max_commits", 
DataTypes.IntegerType),
+    ProcedureParameter.optional(9, "options", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -69,20 +70,35 @@ class RunCleanProcedure extends BaseProcedure with 
ProcedureBuilder with Logging
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
     val skipLocking = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Boolean]
     val scheduleInLine = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
+    var confs: Map[String, String] = Map.empty
+    if (getArgValueOrDefault(args, PARAMETERS(3)).isDefined) {
+      confs += HoodieCleanConfig.CLEANER_POLICY.key() -> 
getArgValueOrDefault(args, PARAMETERS(3)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(4)).isDefined) {
+      confs += HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(4)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(5)).isDefined) {
+      confs += HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(5)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(6)).isDefined) {
+      confs += HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(6)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(7)).isDefined) {
+      confs += HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> 
getArgValueOrDefault(args, PARAMETERS(7)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(8)).isDefined) {
+      confs += HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> 
getArgValueOrDefault(args, PARAMETERS(8)).get.toString
+    }
+    if (getArgValueOrDefault(args, PARAMETERS(9)).isDefined) {
+      confs ++= HoodieCLIUtils.extractOptions(getArgValueOrDefault(args, 
PARAMETERS(9)).get.asInstanceOf[String])
+    }
+
     val basePath = getBasePath(tableName, Option.empty)
     val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
-    val props: Map[String, String] = Map(
-      HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args, 
PARAMETERS(3)).get.toString,
-      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(4)).get.toString,
-      HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(5)).get.toString,
-      HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> 
getArgValueOrDefault(args, PARAMETERS(6)).get.toString,
-      HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> 
getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
-      HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, 
PARAMETERS(8)).get.toString
-    )
 
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
props,
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
         tableName.asInstanceOf[Option[String]])
       val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, 
skipLocking)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index eba972382ea..7cd0413e1e7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -48,17 +48,17 @@ class RunClusteringProcedure extends BaseProcedure
    * [ORDER BY (col_name1 [, ...] ) ]
    */
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
-    ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
-    ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
+    ProcedureParameter.optional(2, "predicate", DataTypes.StringType),
+    ProcedureParameter.optional(3, "order", DataTypes.StringType),
     ProcedureParameter.optional(4, "show_involved_partition", 
DataTypes.BooleanType, false),
-    ProcedureParameter.optional(5, "op", DataTypes.StringType, None),
-    ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType, 
None),
+    ProcedureParameter.optional(5, "op", DataTypes.StringType),
+    ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType),
     // params => key=value, key2=value2
-    ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
-    ProcedureParameter.optional(8, "instants", DataTypes.StringType, None),
-    ProcedureParameter.optional(9, "selected_partitions", 
DataTypes.StringType, None)
+    ProcedureParameter.optional(7, "options", DataTypes.StringType),
+    ProcedureParameter.optional(8, "instants", DataTypes.StringType),
+    ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -88,7 +88,7 @@ class RunClusteringProcedure extends BaseProcedure
 
     val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
-    var conf: Map[String, String] = Map.empty
+    var confs: Map[String, String] = Map.empty
 
     val selectedPartitions: String = (parts, predicate) match {
       case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
@@ -102,7 +102,7 @@ class RunClusteringProcedure extends BaseProcedure
       logInfo("No partition matched")
       return Seq()
     } else {
-      conf = conf ++ Map(
+      confs = confs ++ Map(
         HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() -> 
"SELECTED_PARTITIONS",
         HoodieClusteringConfig.PARTITION_SELECTED.key() -> selectedPartitions
       )
@@ -113,7 +113,7 @@ class RunClusteringProcedure extends BaseProcedure
     orderColumns match {
       case Some(o) =>
         validateOrderColumns(o.asInstanceOf[String], metaClient)
-        conf = conf ++ Map(
+        confs = confs ++ Map(
           HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() -> 
o.asInstanceOf[String]
         )
         logInfo(s"Order columns: $o")
@@ -124,7 +124,7 @@ class RunClusteringProcedure extends BaseProcedure
     orderStrategy match {
       case Some(o) =>
         val strategy = 
LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String])
-        conf = conf ++ Map(
+        confs = confs ++ Map(
           HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() -> 
strategy.getValue
         )
       case _ =>
@@ -133,11 +133,7 @@ class RunClusteringProcedure extends BaseProcedure
 
     options match {
       case Some(p) =>
-        val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
-        paramPairs.foreach{ pair =>
-          val values = StringUtils.split(pair, "=")
-          conf = conf ++ Map(values.get(0) -> values.get(1))
-        }
+        confs = confs ++ HoodieCLIUtils.extractOptions(p.asInstanceOf[String])
       case _ =>
         logInfo("No options")
     }
@@ -174,7 +170,7 @@ class RunClusteringProcedure extends BaseProcedure
 
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
conf,
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
         tableName.asInstanceOf[Option[String]])
       if (operator.isSchedule) {
         val instantTime = HoodieActiveTimeline.createNewInstantTime
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 47a20f3cf6c..47c371f0c7f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -39,10 +39,11 @@ class RunCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
    * operation = (RUN | SCHEDULE) COMPACTION  ON path = STRING   (AT 
instantTimestamp = INTEGER_VALUE)?
    */
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "op", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(2, "path", DataTypes.StringType, None),
-    ProcedureParameter.optional(3, "timestamp", DataTypes.LongType, None)
+    ProcedureParameter.required(0, "op", DataTypes.StringType),
+    ProcedureParameter.optional(1, "table", DataTypes.StringType),
+    ProcedureParameter.optional(2, "path", DataTypes.StringType),
+    ProcedureParameter.optional(3, "timestamp", DataTypes.LongType),
+    ProcedureParameter.optional(4, "options", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -62,13 +63,17 @@ class RunCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
     val tableName = getArgValueOrDefault(args, PARAMETERS(1))
     val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
     val instantTimestamp = getArgValueOrDefault(args, PARAMETERS(3))
+    var confs: Map[String, String] = Map.empty
+    if (getArgValueOrDefault(args, PARAMETERS(4)).isDefined) {
+      confs = confs ++ 
HoodieCLIUtils.extractOptions(getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String])
+    }
 
     val basePath = getBasePath(tableName, tablePath)
     val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
Map.empty,
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
         tableName.asInstanceOf[Option[String]])
       var willCompactionInstants: Seq[String] = Seq.empty
       operation match {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
index 01c7465b3f4..a63125374dd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
@@ -35,7 +35,7 @@ import scala.collection.JavaConverters._
 
 class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends 
BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
     ProcedureParameter.optional(2, "start_ts", DataTypes.StringType, ""),
     ProcedureParameter.optional(3, "end_ts", DataTypes.StringType, "")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
index 99c4f5fd0df..958f37c5881 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
@@ -31,7 +31,7 @@ import scala.collection.JavaConverters._
 
 class ShowBootstrapMappingProcedure extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
     ProcedureParameter.optional(2, "file_ids", DataTypes.StringType, ""),
     ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
index ad14b8b8b6c..c62bcfa73e9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
@@ -27,7 +27,7 @@ import java.util.function.Supplier
 
 class ShowBootstrapPartitionsProcedure extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
index 092610119e6..69aae49466e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
@@ -31,8 +31,8 @@ import scala.collection.JavaConverters._
 
 class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with 
SparkAdapterSupport with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
     ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
     ProcedureParameter.optional(3, "show_involved_partition", 
DataTypes.BooleanType, false)
   )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
index 1a8f4dd9e44..e80fc2b36db 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
@@ -31,10 +31,10 @@ import scala.collection.JavaConversions._
 
 class ShowCommitExtraMetadataProcedure() extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
-    ProcedureParameter.optional(2, "instant_time", DataTypes.StringType, None),
-    ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType, None)
+    ProcedureParameter.optional(2, "instant_time", DataTypes.StringType),
+    ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
index 53fcd072c3b..407ebcf76d1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
@@ -34,9 +34,9 @@ import scala.collection.JavaConversions._
 
 class ShowCommitFilesProcedure() extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
index 8b0fd6156d0..8439ebf9374 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
@@ -34,9 +34,9 @@ import scala.collection.JavaConversions._
 
 class ShowCommitPartitionsProcedure() extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
index 4e3609b5334..50d55d47557 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
@@ -33,9 +33,9 @@ import scala.collection.JavaConversions._
 
 class ShowCommitWriteStatsProcedure() extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
index 3c3fde385bc..8f8ebd9ce29 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
@@ -33,7 +33,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) 
extends BaseProcedure
   var sortByFieldParameter: ProcedureParameter = _
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
index 1076b9fc44a..5aee4bf3a12 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
@@ -37,8 +37,8 @@ class ShowCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with S
    * SHOW COMPACTION  ON path = STRING (LIMIT limit = INTEGER_VALUE)?
    */
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
     ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
index 6f2aa2c9187..8a696bc96fa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -34,7 +34,7 @@ import 
scala.collection.JavaConverters.{asJavaIterableConverter, asJavaIteratorC
 
 class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure 
with ProcedureBuilder {
   private val PARAMETERS_ALL: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
     ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(3, "include_in_flight", DataTypes.BooleanType, 
false),
@@ -55,13 +55,13 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
   ))
 
   private val PARAMETERS_LATEST: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
     ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(4, "exclude_compaction", 
DataTypes.BooleanType, false),
     ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.required(6, "partition_path", DataTypes.StringType, 
None),
+    ProcedureParameter.required(6, "partition_path", DataTypes.StringType),
     ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)
 
   )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
index aae3858c114..b3a3b0b700c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
@@ -27,7 +27,7 @@ import java.util.function.Supplier
 
 class ShowFsPathDetailProcedure extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "path", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "path", DataTypes.StringType),
     ProcedureParameter.optional(1, "is_sub", DataTypes.BooleanType, false),
     ProcedureParameter.optional(2, "sort", DataTypes.BooleanType, true)
   )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 3553caef515..d1da7cfed06 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -36,8 +36,8 @@ import 
scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorC
 
 class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with 
ProcedureBuilder {
   override def parameters: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "log_file_path_pattern", 
DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "log_file_path_pattern", 
DataTypes.StringType),
     ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index ebb325f1936..5761a75383a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -37,8 +37,8 @@ import scala.collection.JavaConverters._
 
 class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with 
ProcedureBuilder {
   override def parameters: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "log_file_path_pattern", 
DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "log_file_path_pattern", 
DataTypes.StringType),
     ProcedureParameter.optional(2, "merge", DataTypes.BooleanType, false),
     ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10)
   )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index 4153b72d6f9..d87239675ed 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -31,7 +31,7 @@ import java.util.function.Supplier
 
 class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
index edd0439a2bd..1d9ac40087b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
@@ -33,7 +33,7 @@ import java.util.function.Supplier
 
 class ShowMetadataTableFilesProcedure() extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "partition", DataTypes.StringType, "")
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
index f9a676abc91..6142b6df73f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
@@ -33,7 +33,7 @@ import 
scala.collection.JavaConverters.asScalaIteratorConverter
 
 class ShowMetadataTablePartitionsProcedure() extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
index 22f42e474bd..86fe0575f9c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
@@ -30,7 +30,7 @@ import scala.collection.JavaConversions._
 
 class ShowMetadataTableStatsProcedure() extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
index 48dcc09b826..8516b8bef2c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
@@ -35,14 +35,14 @@ import scala.collection.JavaConverters._
 
 class ShowRollbacksProcedure(showDetails: Boolean) extends BaseProcedure with 
ProcedureBuilder {
   private val ROLLBACKS_PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
   )
 
   private val ROLLBACK_PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
   )
 
   private val ROLLBACKS_OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
index b7fbe46a78c..3a789f95105 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
@@ -28,8 +28,8 @@ import java.util.stream.Collectors
 
 class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "path", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
index e245159c849..9846a2906e1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
@@ -27,8 +27,8 @@ import scala.collection.JavaConversions._
 
 class ShowTablePropertiesProcedure() extends BaseProcedure with 
ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType),
     ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
index 29b76276194..524767a230c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
@@ -31,7 +31,7 @@ import 
scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapCon
 class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder {
 
   override def parameters: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
     ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
   )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
index b83d97f047e..0c0f55cca5e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
@@ -28,7 +28,7 @@ import 
scala.collection.JavaConverters.asScalaIteratorConverter
 
 class StatsWriteAmplificationProcedure extends BaseProcedure with 
ProcedureBuilder {
   override def parameters: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
index 49cbe5e2de6..0ae413040bc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -34,8 +34,8 @@ import scala.util.{Failure, Success, Try}
 
 class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "to_version", DataTypes.StringType, None)
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.required(1, "to_version", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
index 76c6b471740..774baf854a1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
@@ -35,11 +35,11 @@ import scala.collection.JavaConverters._
 class ValidateHoodieSyncProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
 
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "src_table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "dst_table", DataTypes.StringType, None),
-    ProcedureParameter.required(2, "mode", DataTypes.StringType, "complete"),
-    ProcedureParameter.required(3, "hive_server_url", DataTypes.StringType, 
None),
-    ProcedureParameter.required(4, "hive_pass", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "src_table", DataTypes.StringType),
+    ProcedureParameter.required(1, "dst_table", DataTypes.StringType),
+    ProcedureParameter.required(2, "mode", DataTypes.StringType),
+    ProcedureParameter.required(3, "hive_server_url", DataTypes.StringType),
+    ProcedureParameter.required(4, "hive_pass", DataTypes.StringType),
     ProcedureParameter.optional(5, "src_db", DataTypes.StringType, "rawdata"),
     ProcedureParameter.optional(6, "target_db", DataTypes.StringType, 
"dwh_hoodie"),
     ProcedureParameter.optional(7, "partition_cnt", DataTypes.IntegerType, 5),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
index c11d8434ddc..65b9fc49270 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
@@ -36,7 +36,7 @@ import 
scala.collection.JavaConverters.asScalaIteratorConverter
 
 class ValidateMetadataTableFilesProcedure() extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false)
   )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
index 4dd950e382c..e90a10e9f9a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
@@ -22,99 +22,182 @@ package org.apache.spark.sql.hudi.procedure
 class TestCleanProcedure extends HoodieSparkProcedureTestBase {
 
   test("Test Call run_clean Procedure by Table") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      spark.sql(
-        s"""
-          |create table $tableName (
-          | id int,
-          | name string,
-          | price double,
-          | ts long
-          | ) using hudi
-          | location '${tmp.getCanonicalPath}'
-          | tblproperties (
-          |   primaryKey = 'id',
-          |   type = 'cow',
-          |   preCombineField = 'ts'
-          | )
-          |""".stripMargin)
-
-      spark.sql("set hoodie.parquet.max.file.size = 10000")
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"update $tableName set price = 11 where id = 1")
-      spark.sql(s"update $tableName set price = 12 where id = 1")
-      spark.sql(s"update $tableName set price = 13 where id = 1")
-
-      // KEEP_LATEST_COMMITS
-      val result1 = spark.sql(s"call run_clean(table => '$tableName', 
retain_commits => 1)")
-        .collect()
-        .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), 
row.getString(3), row.getString(4), row.getInt(5)))
-
-      assertResult(1)(result1.length)
-      assertResult(2)(result1(0)(2))
-
-      val result11 = spark.sql(
-        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
-      assertResult(2)(result11.length)
-
-      checkAnswer(s"select id, name, price, ts from $tableName order by id") (
-        Seq(1, "a1", 13, 1000)
-      )
-
-      val result2 = spark.sql(s"call run_clean(table => '$tableName', 
retain_commits => 1)")
-        .collect()
-      assertResult(0)(result2.length)
-
-      // KEEP_LATEST_FILE_VERSIONS
-      spark.sql(s"update $tableName set price = 14 where id = 1")
-
-      val result3 = spark.sql(
-        s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
-      assertResult(0)(result3.length)
-
-      val result4 = spark.sql(
-        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
-      assertResult(3)(result4.length)
-
-      val result5 = spark.sql(
-        s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
-      assertResult(1)(result5.length)
-      assertResult(2)(result5(0)(2))
-
-      val result6 = spark.sql(
-        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
-      assertResult(1)(result6.length)
-
-      checkAnswer(s"select id, name, price, ts from $tableName order by id") (
-        Seq(1, "a1", 14, 1000)
-      )
-
-      // trigger time
-      spark.sql(s"update $tableName set price = 15 where id = 1")
-
-      // no trigger, only has 1 commit
-      val result7 = spark.sql(
-        s"call run_clean(table => '$tableName', trigger_max_commits => 2, 
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 
1)").collect()
-      assertResult(0)(result7.length)
-
-      val result8 = spark.sql(
-        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
-      assertResult(2)(result8.length)
-
-      // trigger
-      val result9 = spark.sql(
-        s"call run_clean(table => '$tableName', trigger_max_commits => 1, 
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 
1)").collect()
-      assertResult(1)(result9.length)
-      assertResult(1)(result9(0)(2))
-
-      val result10 = spark.sql(
-        s"call show_fsview_all(table => '$tableName', path_regex => '', limit 
=> 10)").collect()
-      assertResult(1)(result10.length)
-
-      checkAnswer(s"select id, name, price, ts from $tableName order by id") (
-        Seq(1, "a1", 15, 1000)
-      )
+    withSQLConf("hoodie.clean.automatic" -> "false", 
"hoodie.parquet.max.file.size" -> "10000") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             | ) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"update $tableName set price = 11 where id = 1")
+        spark.sql(s"update $tableName set price = 12 where id = 1")
+        spark.sql(s"update $tableName set price = 13 where id = 1")
+
+        // KEEP_LATEST_COMMITS
+        val result1 = spark.sql(s"call run_clean(table => '$tableName', 
retain_commits => 1)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), 
row.getString(3), row.getString(4), row.getInt(5)))
+
+        assertResult(1)(result1.length)
+        assertResult(2)(result1(0)(2))
+
+        val result11 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(2)(result11.length)
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 13, 1000)
+        )
+
+        val result2 = spark.sql(s"call run_clean(table => '$tableName', 
retain_commits => 1)")
+          .collect()
+        assertResult(0)(result2.length)
+
+        // KEEP_LATEST_FILE_VERSIONS
+        spark.sql(s"update $tableName set price = 14 where id = 1")
+
+        val result3 = spark.sql(
+          s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
+        assertResult(0)(result3.length)
+
+        val result4 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(3)(result4.length)
+
+        val result5 = spark.sql(
+          s"call run_clean(table => '$tableName', clean_policy => 
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
+        assertResult(1)(result5.length)
+        assertResult(2)(result5(0)(2))
+
+        val result6 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(1)(result6.length)
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 14, 1000)
+        )
+
+        // trigger time
+        spark.sql(s"update $tableName set price = 15 where id = 1")
+
+        // no trigger, only has 1 commit
+        val result7 = spark.sql(
+          s"call run_clean(table => '$tableName', trigger_max_commits => 2, 
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 
1)").collect()
+        assertResult(0)(result7.length)
+
+        val result8 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(2)(result8.length)
+
+        // trigger
+        val result9 = spark.sql(
+          s"call run_clean(table => '$tableName', trigger_max_commits => 1, 
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 
1)").collect()
+        assertResult(1)(result9.length)
+        assertResult(1)(result9(0)(2))
+
+        val result10 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(1)(result10.length)
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 15, 1000)
+        )
+      }
+    }
+  }
+
+  test("Test Call run_clean Procedure with table props") {
+    withSQLConf("hoodie.clean.automatic" -> "false") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             | ) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts',
+             |   hoodie.cleaner.policy = 'KEEP_LATEST_COMMITS',
+             |   hoodie.cleaner.commits.retained = '2'
+             | )
+             |""".stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
+
+        val result1 = spark.sql(s"call run_clean(table => '$tableName')")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), 
row.getString(3), row.getString(4), row.getInt(5)))
+        assertResult(1)(result1.length)
+        assertResult(1)(result1(0)(2))
+
+        val result2 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(3)(result2.length)
+      }
+    }
+  }
+
+  test("Test Call run_clean Procedure with options") {
+    withSQLConf("hoodie.clean.automatic" -> "false") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | price double,
+             | ts long
+             | ) using hudi
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |   primaryKey = 'id',
+             |   type = 'cow',
+             |   preCombineField = 'ts'
+             | )
+             |""".stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
+
+        val result1 = spark.sql(
+          s"""call run_clean(table => '$tableName', options => "
+             | hoodie.cleaner.policy=KEEP_LATEST_COMMITS,
+             | hoodie.cleaner.commits.retained=2
+             |")""".stripMargin)
+          .collect()
+          .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), 
row.getString(3), row.getString(4), row.getInt(5)))
+        assertResult(1)(result1.length)
+        assertResult(1)(result1(0)(2))
+
+        val result2 = spark.sql(
+          s"call show_fsview_all(table => '$tableName', path_regex => '', 
limit => 10)").collect()
+        assertResult(3)(result2.length)
+      }
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 236d87970d2..6a6e74e8b72 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -174,6 +174,7 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
       )
     }
   }
+
   test("Test show_compaction Procedure by Path") {
     withTempDir { tmp =>
       val tableName1 = generateTableName
@@ -205,4 +206,48 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
       assertResult(2)(spark.sql(s"call show_compaction(path => 
'${tmp.getCanonicalPath}/$tableName1')").collect().length)
     }
   }
+
+  test("Test run_compaction Procedure with options") {
+    withSQLConf("hoodie.compact.inline" -> "false", 
"hoodie.compact.inline.max.delta.commits" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
+        spark.sql(s"insert into $tableName values(1, 'a3', 10, 1000)")
+
+        val result1 = spark.sql(
+          s"""call run_compaction(table => '$tableName', op => 'run', options 
=> "
+             | 
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,
+             | hoodie.compaction.logfile.num.threshold=3
+             |")""".stripMargin)
+          .collect()
+        assertResult(0)(result1.length)
+
+        spark.sql(s"insert into $tableName values(1, 'a4', 10, 1000)")
+        val result2 = spark.sql(
+          s"""call run_compaction(table => '$tableName', op => 'run', options 
=> "
+             | 
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,
+             | hoodie.compaction.logfile.num.threshold=3
+             |")""".stripMargin)
+          .collect()
+        assertResult(1)(result2.length)
+      }
+    }
+  }
 }


Reply via email to