This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5bab66498c8 [HUDI-6122] Unify call procedure options (#8537)
5bab66498c8 is described below
commit 5bab66498c857c32e042488db72a018b83ea926a
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 11 09:55:32 2023 +0800
[HUDI-6122] Unify call procedure options (#8537)
---
.../scala/org/apache/hudi/HoodieCLIUtils.scala | 8 +
.../procedures/ArchiveCommitsProcedure.scala | 4 +-
.../hudi/command/procedures/BaseProcedure.scala | 48 ++--
.../procedures/CommitsCompareProcedure.scala | 4 +-
.../command/procedures/CopyToTableProcedure.scala | 4 +-
.../hudi/command/procedures/CopyToTempView.scala | 4 +-
.../procedures/CreateMetadataTableProcedure.scala | 2 +-
.../procedures/CreateSavepointProcedure.scala | 6 +-
.../command/procedures/DeleteMarkerProcedure.scala | 4 +-
.../procedures/DeleteMetadataTableProcedure.scala | 2 +-
.../procedures/DeleteSavepointProcedure.scala | 6 +-
.../procedures/ExportInstantsProcedure.scala | 4 +-
.../procedures/HdfsParquetImportProcedure.scala | 14 +-
.../hudi/command/procedures/HelpProcedure.scala | 4 +-
.../command/procedures/HiveSyncProcedure.scala | 2 +-
.../procedures/InitMetadataTableProcedure.scala | 2 +-
.../command/procedures/ProcedureParameter.scala | 7 +-
.../RepairAddpartitionmetaProcedure.scala | 2 +-
.../RepairCorruptedCleanFilesProcedure.scala | 2 +-
.../procedures/RepairDeduplicateProcedure.scala | 6 +-
.../RepairMigratePartitionMetaProcedure.scala | 2 +-
.../RepairOverwriteHoodiePropsProcedure.scala | 4 +-
.../RollbackToInstantTimeProcedure.scala | 4 +-
.../procedures/RollbackToSavepointProcedure.scala | 6 +-
.../command/procedures/RunBootstrapProcedure.scala | 10 +-
.../command/procedures/RunCleanProcedure.scala | 48 ++--
.../procedures/RunClusteringProcedure.scala | 34 ++-
.../procedures/RunCompactionProcedure.scala | 15 +-
.../procedures/ShowArchivedCommitsProcedure.scala | 2 +-
.../procedures/ShowBootstrapMappingProcedure.scala | 2 +-
.../ShowBootstrapPartitionsProcedure.scala | 2 +-
.../procedures/ShowClusteringProcedure.scala | 4 +-
.../ShowCommitExtraMetadataProcedure.scala | 6 +-
.../procedures/ShowCommitFilesProcedure.scala | 4 +-
.../procedures/ShowCommitPartitionsProcedure.scala | 4 +-
.../procedures/ShowCommitWriteStatsProcedure.scala | 4 +-
.../command/procedures/ShowCommitsProcedure.scala | 2 +-
.../procedures/ShowCompactionProcedure.scala | 4 +-
.../procedures/ShowFileSystemViewProcedure.scala | 6 +-
.../procedures/ShowFsPathDetailProcedure.scala | 2 +-
.../ShowHoodieLogFileMetadataProcedure.scala | 4 +-
.../ShowHoodieLogFileRecordsProcedure.scala | 4 +-
.../procedures/ShowInvalidParquetProcedure.scala | 2 +-
.../ShowMetadataTableFilesProcedure.scala | 2 +-
.../ShowMetadataTablePartitionsProcedure.scala | 2 +-
.../ShowMetadataTableStatsProcedure.scala | 2 +-
.../procedures/ShowRollbacksProcedure.scala | 6 +-
.../procedures/ShowSavepointsProcedure.scala | 4 +-
.../procedures/ShowTablePropertiesProcedure.scala | 4 +-
.../procedures/StatsFileSizeProcedure.scala | 2 +-
.../StatsWriteAmplificationProcedure.scala | 2 +-
.../procedures/UpgradeOrDowngradeProcedure.scala | 4 +-
.../procedures/ValidateHoodieSyncProcedure.scala | 10 +-
.../ValidateMetadataTableFilesProcedure.scala | 2 +-
.../sql/hudi/procedure/TestCleanProcedure.scala | 269 ++++++++++++++-------
.../hudi/procedure/TestCompactionProcedure.scala | 45 ++++
56 files changed, 403 insertions(+), 261 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 5f0cba6fd7c..c9f5a8a1215 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -22,6 +22,7 @@ package org.apache.hudi
import org.apache.hudi.avro.model.HoodieClusteringGroup
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.util.StringUtils
import org.apache.spark.SparkException
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
@@ -90,4 +91,11 @@ object HoodieCLIUtils {
throw new SparkException(s"Unsupported identifier $table")
}
}
+
+ def extractOptions(s: String): Map[String, String] = {
+ StringUtils.split(s, ",").asScala
+ .map(split => StringUtils.split(split, "="))
+ .map(pair => pair.get(0) -> pair.get(1))
+ .toMap
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
index b097c942ad2..801e106419b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
@@ -30,8 +30,8 @@ class ArchiveCommitsProcedure extends BaseProcedure
with SparkAdapterSupport
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType, 20),
ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType, 30),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType,
10),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index 2930fc36c4c..b06aea2ac58 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
abstract class BaseProcedure extends Procedure {
- val INVALID_ARG_INDEX: Int = -1
-
val spark: SparkSession = SparkSession.active
val jsc = new JavaSparkContext(spark.sparkContext)
@@ -41,41 +39,31 @@ abstract class BaseProcedure extends Procedure {
.build
}
- protected def checkArgs(target: Array[ProcedureParameter], args:
ProcedureArgs): Unit = {
- val internalRow = args.internalRow
- for (i <- target.indices) {
- if (target(i).required) {
- var argsIndex: Integer = null
- if (args.isNamedArgs) {
- argsIndex = getArgsIndex(target(i).name, args)
- } else {
- argsIndex = getArgsIndex(i.toString, args)
- }
- assert(-1 != argsIndex && internalRow.get(argsIndex,
target(i).dataType) != null,
- s"Argument: ${target(i).name} is required")
- }
+ protected def getParamKey(parameter: ProcedureParameter, isNamedArgs:
Boolean): String = {
+ if (isNamedArgs) {
+ parameter.name
+ } else {
+ parameter.index.toString
}
}
- protected def getArgsIndex(key: String, args: ProcedureArgs): Integer = {
- args.map.getOrDefault(key, INVALID_ARG_INDEX)
+ protected def checkArgs(parameters: Array[ProcedureParameter], args:
ProcedureArgs): Unit = {
+ for (parameter <- parameters) {
+ if (parameter.required) {
+ val paramKey = getParamKey(parameter, args.isNamedArgs)
+ assert(args.map.containsKey(paramKey) &&
+ args.internalRow.get(args.map.get(paramKey), parameter.dataType) !=
null,
+ s"Argument: ${parameter.name} is required")
+ }
+ }
}
protected def getArgValueOrDefault(args: ProcedureArgs, parameter:
ProcedureParameter): Option[Any] = {
- var argsIndex: Int = INVALID_ARG_INDEX
- if (args.isNamedArgs) {
- argsIndex = getArgsIndex(parameter.name, args)
- } else {
- argsIndex = getArgsIndex(parameter.index.toString, args)
- }
-
- if (argsIndex.equals(INVALID_ARG_INDEX)) {
- parameter.default match {
- case option: Option[Any] => option
- case _ => Option.apply(parameter.default)
- }
+ val paramKey = getParamKey(parameter, args.isNamedArgs)
+ if (args.map.containsKey(paramKey)) {
+ Option.apply(getInternalRowValue(args.internalRow,
args.map.get(paramKey), parameter.dataType))
} else {
- Option.apply(getInternalRowValue(args.internalRow, argsIndex,
parameter.dataType))
+ Option.apply(parameter.default)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
index 379b38e5921..fdac678b477 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CommitsCompareProcedure.scala
@@ -30,8 +30,8 @@ import scala.collection.JavaConverters._
class CommitsCompareProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "path", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
index f5a895239cf..ac23595048f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTableProcedure.scala
@@ -28,9 +28,9 @@ class CopyToTableProcedure extends BaseProcedure with
ProcedureBuilder with Logg
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "query_type", DataTypes.StringType,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL),
- ProcedureParameter.required(2, "new_table", DataTypes.StringType, None),
+ ProcedureParameter.required(2, "new_table", DataTypes.StringType),
ProcedureParameter.optional(3, "begin_instance_time",
DataTypes.StringType, ""),
ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType,
""),
ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
index 13259c4964f..89c00dac6e4 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CopyToTempView.scala
@@ -27,9 +27,9 @@ import java.util.function.Supplier
class CopyToTempView extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "query_type", DataTypes.StringType,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL),
- ProcedureParameter.required(2, "view_name", DataTypes.StringType, None),
+ ProcedureParameter.required(2, "view_name", DataTypes.StringType),
ProcedureParameter.optional(3, "begin_instance_time",
DataTypes.StringType, ""),
ProcedureParameter.optional(4, "end_instance_time", DataTypes.StringType,
""),
ProcedureParameter.optional(5, "as_of_instant", DataTypes.StringType, ""),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
index bbed979f5cd..722ed07cc31 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateMetadataTableProcedure.scala
@@ -31,7 +31,7 @@ import java.util.function.Supplier
class CreateMetadataTableProcedure extends BaseProcedure with ProcedureBuilder
with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
index 1983d825ad4..09fe59f7269 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
@@ -29,11 +29,11 @@ import java.util.function.Supplier
class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "commit_time", DataTypes.StringType),
ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "comments", DataTypes.StringType, ""),
- ProcedureParameter.optional(4, "path", DataTypes.StringType, None)
+ ProcedureParameter.optional(4, "path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
index 32c853345d4..87d58fa6ed0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
@@ -30,8 +30,8 @@ import scala.util.{Failure, Success, Try}
class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with
Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "instant_time", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
index d6fccc1f9d2..1a6a2cfa5fc 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala
@@ -29,7 +29,7 @@ import java.util.function.Supplier
class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder
with SparkAdapterSupport {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
index e9d13914789..58b12c70c64 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
@@ -29,9 +29,9 @@ import java.util.function.Supplier
class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
- ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "instant_time", DataTypes.StringType),
+ ProcedureParameter.optional(2, "path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 97930432e4e..31918ad080c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -47,8 +47,8 @@ class ExportInstantsProcedure extends BaseProcedure with
ProcedureBuilder with L
val defaultActions = "clean,commit,deltacommit,rollback,savepoint,restore"
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "local_folder", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "local_folder", DataTypes.StringType),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1),
ProcedureParameter.optional(3, "actions", DataTypes.StringType,
defaultActions),
ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
index c9bee569c7f..e050572899e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
@@ -27,13 +27,13 @@ import scala.language.higherKinds
class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
- ProcedureParameter.required(2, "src_path", DataTypes.StringType, None),
- ProcedureParameter.required(3, "target_path", DataTypes.StringType, None),
- ProcedureParameter.required(4, "row_key", DataTypes.StringType, None),
- ProcedureParameter.required(5, "partition_key", DataTypes.StringType,
None),
- ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType,
None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "table_type", DataTypes.StringType),
+ ProcedureParameter.required(2, "src_path", DataTypes.StringType),
+ ProcedureParameter.required(3, "target_path", DataTypes.StringType),
+ ProcedureParameter.required(4, "row_key", DataTypes.StringType),
+ ProcedureParameter.required(5, "partition_key", DataTypes.StringType),
+ ProcedureParameter.required(6, "schema_file_path", DataTypes.StringType),
ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"),
ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"),
ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
index 5052b70aff1..43b7fcfc675 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HelpProcedure.scala
@@ -27,7 +27,7 @@ import java.util.function.Supplier
class HelpProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "cmd", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "cmd", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -88,7 +88,7 @@ class HelpProcedure extends BaseProcedure with
ProcedureBuilder with Logging {
result.append(tab)
.append(lengthFormat(param.name)).append(tab)
.append(lengthFormat(param.dataType.typeName)).append(tab)
- .append(lengthFormat(param.default.toString)).append(tab)
+ .append(lengthFormat(String.valueOf(param.default))).append(tab)
.append(lengthFormat(param.required.toString)).append(line)
})
result.append("outputType:").append(line)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
index 513f40a4c8c..e97fb458a9a 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
@@ -33,7 +33,7 @@ class HiveSyncProcedure extends BaseProcedure with
ProcedureBuilder
with ProvidesHoodieConfig with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "metastore_uri", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "username", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "password", DataTypes.StringType, ""),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
index 3b875e77ffa..cfeb3905126 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/InitMetadataTableProcedure.scala
@@ -32,7 +32,7 @@ import java.util.function.Supplier
class InitMetadataTableProcedure extends BaseProcedure with ProcedureBuilder
with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "read_only", DataTypes.BooleanType, false)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
index a9ad252bd7a..0757314e4b6 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ProcedureParameter.scala
@@ -54,8 +54,8 @@ object ProcedureParameter {
* @param dataType the type of the parameter
* @return the constructed stored procedure parameter
*/
- def required(index: Int, name: String, dataType: DataType, default: Any):
ProcedureParameterImpl = {
- ProcedureParameterImpl(index, name, dataType, default, required = true)
+ def required(index: Int, name: String, dataType: DataType):
ProcedureParameterImpl = {
+ ProcedureParameterImpl(index, name, dataType, null, required = true)
}
/**
@@ -63,9 +63,10 @@ object ProcedureParameter {
*
* @param name the name of the parameter.
* @param dataType the type of the parameter.
+ * @param default the default value of the parameter.
* @return the constructed optional stored procedure parameter
*/
- def optional(index: Int, name: String, dataType: DataType, default: Any):
ProcedureParameterImpl = {
+ def optional(index: Int, name: String, dataType: DataType, default: Any =
null): ProcedureParameterImpl = {
ProcedureParameterImpl(index, name, dataType, default, required = false)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
index bb65174c4b4..d636b7328b9 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
@@ -31,7 +31,7 @@ import scala.collection.JavaConversions._
class RepairAddpartitionmetaProcedure extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
index ff185d1bdfa..4a828893bc5 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairCorruptedCleanFilesProcedure.scala
@@ -32,7 +32,7 @@ import
scala.collection.JavaConverters.asScalaIteratorConverter
class RepairCorruptedCleanFilesProcedure extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
index 8ee5055e1fd..d4d22364fe8 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairDeduplicateProcedure.scala
@@ -30,9 +30,9 @@ import scala.util.{Failure, Success, Try}
class RepairDeduplicateProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "duplicated_partition_path",
DataTypes.StringType, None),
- ProcedureParameter.required(2, "repaired_output_path",
DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "duplicated_partition_path",
DataTypes.StringType),
+ ProcedureParameter.required(2, "repaired_output_path",
DataTypes.StringType),
ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, true),
ProcedureParameter.optional(4, "dedupe_type", DataTypes.StringType,
"insert_type")
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index 7daacb2f184..66ab250ee7f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConversions._
class RepairMigratePartitionMetaProcedure extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "dry_run", DataTypes.BooleanType, true)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
index 043217cf2df..81a09e147a7 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairOverwriteHoodiePropsProcedure.scala
@@ -33,8 +33,8 @@ import
scala.collection.JavaConverters.asScalaIteratorConverter
class RepairOverwriteHoodiePropsProcedure extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "new_props_file_path",
DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "new_props_file_path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
index 862b74b427a..49775236b78 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
@@ -32,8 +32,8 @@ import java.util.function.Supplier
class RollbackToInstantTimeProcedure extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None))
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "instant_time", DataTypes.StringType))
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("rollback_result", DataTypes.BooleanType, nullable = true,
Metadata.empty))
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
index 955ceb01045..94bdc6db80f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
@@ -29,9 +29,9 @@ import java.util.function.Supplier
class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
- ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "instant_time", DataTypes.StringType),
+ ProcedureParameter.optional(2, "path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
index 6025276176e..bb93cf1a485 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunBootstrapProcedure.scala
@@ -36,11 +36,11 @@ import java.util.function.Supplier
import scala.collection.JavaConverters._
class RunBootstrapProcedure extends BaseProcedure with ProcedureBuilder with
Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "table_type", DataTypes.StringType, None),
- ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType,
None),
- ProcedureParameter.required(3, "base_path", DataTypes.StringType, None),
- ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "table_type", DataTypes.StringType),
+ ProcedureParameter.required(2, "bootstrap_path", DataTypes.StringType),
+ ProcedureParameter.required(3, "base_path", DataTypes.StringType),
+ ProcedureParameter.required(4, "rowKey_field", DataTypes.StringType),
ProcedureParameter.optional(5, "base_file_format", DataTypes.StringType,
"PARQUET"),
ProcedureParameter.optional(6, "partition_path_field",
DataTypes.StringType, ""),
ProcedureParameter.optional(7, "bootstrap_index_class",
DataTypes.StringType,
"org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex"),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index f7653ce680e..7f7c2fe0825 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -31,15 +31,16 @@ import java.util.function.Supplier
class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with
Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType,
false),
ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType,
true),
- ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType,
HoodieCleanConfig.CLEANER_POLICY.defaultValue()),
- ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType,
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt),
- ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType,
HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt),
- ProcedureParameter.optional(6, "file_versions_retained",
DataTypes.IntegerType,
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt),
- ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType,
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()),
- ProcedureParameter.optional(8, "trigger_max_commits",
DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt)
+ ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType),
+ ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType),
+ ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType),
+ ProcedureParameter.optional(6, "file_versions_retained",
DataTypes.IntegerType),
+ ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType),
+ ProcedureParameter.optional(8, "trigger_max_commits",
DataTypes.IntegerType),
+ ProcedureParameter.optional(9, "options", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -69,20 +70,35 @@ class RunCleanProcedure extends BaseProcedure with
ProcedureBuilder with Logging
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val skipLocking = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Boolean]
val scheduleInLine = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
+ var confs: Map[String, String] = Map.empty
+ if (getArgValueOrDefault(args, PARAMETERS(3)).isDefined) {
+ confs += HoodieCleanConfig.CLEANER_POLICY.key() ->
getArgValueOrDefault(args, PARAMETERS(3)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(4)).isDefined) {
+ confs += HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(4)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(5)).isDefined) {
+ confs += HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(5)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(6)).isDefined) {
+ confs += HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(6)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(7)).isDefined) {
+ confs += HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() ->
getArgValueOrDefault(args, PARAMETERS(7)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(8)).isDefined) {
+ confs += HoodieCleanConfig.CLEAN_MAX_COMMITS.key() ->
getArgValueOrDefault(args, PARAMETERS(8)).get.toString
+ }
+ if (getArgValueOrDefault(args, PARAMETERS(9)).isDefined) {
+ confs ++= HoodieCLIUtils.extractOptions(getArgValueOrDefault(args,
PARAMETERS(9)).get.asInstanceOf[String])
+ }
+
val basePath = getBasePath(tableName, Option.empty)
val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
- val props: Map[String, String] = Map(
- HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args,
PARAMETERS(3)).get.toString,
- HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(4)).get.toString,
- HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(5)).get.toString,
- HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() ->
getArgValueOrDefault(args, PARAMETERS(6)).get.toString,
- HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() ->
getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
- HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args,
PARAMETERS(8)).get.toString
- )
var client: SparkRDDWriteClient[_] = null
try {
- client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
props,
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
tableName.asInstanceOf[Option[String]])
val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine,
skipLocking)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index eba972382ea..7cd0413e1e7 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -48,17 +48,17 @@ class RunClusteringProcedure extends BaseProcedure
* [ORDER BY (col_name1 [, ...] ) ]
*/
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
- ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
- ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
+ ProcedureParameter.optional(2, "predicate", DataTypes.StringType),
+ ProcedureParameter.optional(3, "order", DataTypes.StringType),
ProcedureParameter.optional(4, "show_involved_partition",
DataTypes.BooleanType, false),
- ProcedureParameter.optional(5, "op", DataTypes.StringType, None),
- ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType,
None),
+ ProcedureParameter.optional(5, "op", DataTypes.StringType),
+ ProcedureParameter.optional(6, "order_strategy", DataTypes.StringType),
// params => key=value, key2=value2
- ProcedureParameter.optional(7, "options", DataTypes.StringType, None),
- ProcedureParameter.optional(8, "instants", DataTypes.StringType, None),
- ProcedureParameter.optional(9, "selected_partitions",
DataTypes.StringType, None)
+ ProcedureParameter.optional(7, "options", DataTypes.StringType),
+ ProcedureParameter.optional(8, "instants", DataTypes.StringType),
+ ProcedureParameter.optional(9, "selected_partitions", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -88,7 +88,7 @@ class RunClusteringProcedure extends BaseProcedure
val basePath: String = getBasePath(tableName, tablePath)
val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
- var conf: Map[String, String] = Map.empty
+ var confs: Map[String, String] = Map.empty
val selectedPartitions: String = (parts, predicate) match {
case (_, Some(p)) => prunePartition(metaClient, p.asInstanceOf[String])
@@ -102,7 +102,7 @@ class RunClusteringProcedure extends BaseProcedure
logInfo("No partition matched")
return Seq()
} else {
- conf = conf ++ Map(
+ confs = confs ++ Map(
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key() ->
"SELECTED_PARTITIONS",
HoodieClusteringConfig.PARTITION_SELECTED.key() -> selectedPartitions
)
@@ -113,7 +113,7 @@ class RunClusteringProcedure extends BaseProcedure
orderColumns match {
case Some(o) =>
validateOrderColumns(o.asInstanceOf[String], metaClient)
- conf = conf ++ Map(
+ confs = confs ++ Map(
HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() ->
o.asInstanceOf[String]
)
logInfo(s"Order columns: $o")
@@ -124,7 +124,7 @@ class RunClusteringProcedure extends BaseProcedure
orderStrategy match {
case Some(o) =>
val strategy =
LayoutOptimizationStrategy.fromValue(o.asInstanceOf[String])
- conf = conf ++ Map(
+ confs = confs ++ Map(
HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key() ->
strategy.getValue
)
case _ =>
@@ -133,11 +133,7 @@ class RunClusteringProcedure extends BaseProcedure
options match {
case Some(p) =>
- val paramPairs = StringUtils.split(p.asInstanceOf[String], ",").asScala
- paramPairs.foreach{ pair =>
- val values = StringUtils.split(pair, "=")
- conf = conf ++ Map(values.get(0) -> values.get(1))
- }
+ confs = confs ++ HoodieCLIUtils.extractOptions(p.asInstanceOf[String])
case _ =>
logInfo("No options")
}
@@ -174,7 +170,7 @@ class RunClusteringProcedure extends BaseProcedure
var client: SparkRDDWriteClient[_] = null
try {
- client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
conf,
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
tableName.asInstanceOf[Option[String]])
if (operator.isSchedule) {
val instantTime = HoodieActiveTimeline.createNewInstantTime
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 47a20f3cf6c..47c371f0c7f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -39,10 +39,11 @@ class RunCompactionProcedure extends BaseProcedure with
ProcedureBuilder with Sp
* operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT
instantTimestamp = INTEGER_VALUE)?
*/
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "op", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(2, "path", DataTypes.StringType, None),
- ProcedureParameter.optional(3, "timestamp", DataTypes.LongType, None)
+ ProcedureParameter.required(0, "op", DataTypes.StringType),
+ ProcedureParameter.optional(1, "table", DataTypes.StringType),
+ ProcedureParameter.optional(2, "path", DataTypes.StringType),
+ ProcedureParameter.optional(3, "timestamp", DataTypes.LongType),
+ ProcedureParameter.optional(4, "options", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -62,13 +63,17 @@ class RunCompactionProcedure extends BaseProcedure with
ProcedureBuilder with Sp
val tableName = getArgValueOrDefault(args, PARAMETERS(1))
val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
val instantTimestamp = getArgValueOrDefault(args, PARAMETERS(3))
+ var confs: Map[String, String] = Map.empty
+ if (getArgValueOrDefault(args, PARAMETERS(4)).isDefined) {
+ confs = confs ++
HoodieCLIUtils.extractOptions(getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String])
+ }
val basePath = getBasePath(tableName, tablePath)
val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
var client: SparkRDDWriteClient[_] = null
try {
- client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
Map.empty,
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
tableName.asInstanceOf[Option[String]])
var willCompactionInstants: Seq[String] = Seq.empty
operation match {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
index 01c7465b3f4..a63125374dd 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
@@ -35,7 +35,7 @@ import scala.collection.JavaConverters._
class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends
BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(2, "start_ts", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "end_ts", DataTypes.StringType, "")
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
index 99c4f5fd0df..958f37c5881 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapMappingProcedure.scala
@@ -31,7 +31,7 @@ import scala.collection.JavaConverters._
class ShowBootstrapMappingProcedure extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "file_ids", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
index ad14b8b8b6c..c62bcfa73e9 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowBootstrapPartitionsProcedure.scala
@@ -27,7 +27,7 @@ import java.util.function.Supplier
class ShowBootstrapPartitionsProcedure extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
index 092610119e6..69aae49466e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
@@ -31,8 +31,8 @@ import scala.collection.JavaConverters._
class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with
SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
ProcedureParameter.optional(3, "show_involved_partition",
DataTypes.BooleanType, false)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
index 1a8f4dd9e44..e80fc2b36db 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
@@ -31,10 +31,10 @@ import scala.collection.JavaConversions._
class ShowCommitExtraMetadataProcedure() extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
- ProcedureParameter.optional(2, "instant_time", DataTypes.StringType, None),
- ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType, None)
+ ProcedureParameter.optional(2, "instant_time", DataTypes.StringType),
+ ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
index 53fcd072c3b..407ebcf76d1 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitFilesProcedure.scala
@@ -34,9 +34,9 @@ import scala.collection.JavaConversions._
class ShowCommitFilesProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
index 8b0fd6156d0..8439ebf9374 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitPartitionsProcedure.scala
@@ -34,9 +34,9 @@ import scala.collection.JavaConversions._
class ShowCommitPartitionsProcedure() extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
index 4e3609b5334..50d55d47557 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitWriteStatsProcedure.scala
@@ -33,9 +33,9 @@ import scala.collection.JavaConversions._
class ShowCommitWriteStatsProcedure() extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
index 3c3fde385bc..8f8ebd9ce29 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
@@ -33,7 +33,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean)
extends BaseProcedure
var sortByFieldParameter: ProcedureParameter = _
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
index 1076b9fc44a..5aee4bf3a12 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
@@ -37,8 +37,8 @@ class ShowCompactionProcedure extends BaseProcedure with
ProcedureBuilder with S
* SHOW COMPACTION ON path = STRING (LIMIT limit = INTEGER_VALUE)?
*/
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
index 6f2aa2c9187..8a696bc96fa 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -34,7 +34,7 @@ import
scala.collection.JavaConverters.{asJavaIterableConverter, asJavaIteratorC
class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure
with ProcedureBuilder {
private val PARAMETERS_ALL: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType,
false),
ProcedureParameter.optional(3, "include_in_flight", DataTypes.BooleanType,
false),
@@ -55,13 +55,13 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
))
private val PARAMETERS_LATEST: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType,
false),
ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType,
false),
ProcedureParameter.optional(4, "exclude_compaction",
DataTypes.BooleanType, false),
ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.required(6, "partition_path", DataTypes.StringType,
None),
+ ProcedureParameter.required(6, "partition_path", DataTypes.StringType),
ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
index aae3858c114..b3a3b0b700c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFsPathDetailProcedure.scala
@@ -27,7 +27,7 @@ import java.util.function.Supplier
class ShowFsPathDetailProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "path", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "path", DataTypes.StringType),
ProcedureParameter.optional(1, "is_sub", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "sort", DataTypes.BooleanType, true)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 3553caef515..d1da7cfed06 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -36,8 +36,8 @@ import
scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorC
class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with
ProcedureBuilder {
override def parameters: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "log_file_path_pattern",
DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "log_file_path_pattern",
DataTypes.StringType),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index ebb325f1936..5761a75383a 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -37,8 +37,8 @@ import scala.collection.JavaConverters._
class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with
ProcedureBuilder {
override def parameters: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "log_file_path_pattern",
DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "log_file_path_pattern",
DataTypes.StringType),
ProcedureParameter.optional(2, "merge", DataTypes.BooleanType, false),
ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index 4153b72d6f9..d87239675ed 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -31,7 +31,7 @@ import java.util.function.Supplier
class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
index edd0439a2bd..1d9ac40087b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableFilesProcedure.scala
@@ -33,7 +33,7 @@ import java.util.function.Supplier
class ShowMetadataTableFilesProcedure() extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "partition", DataTypes.StringType, "")
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
index f9a676abc91..6142b6df73f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTablePartitionsProcedure.scala
@@ -33,7 +33,7 @@ import
scala.collection.JavaConverters.asScalaIteratorConverter
class ShowMetadataTablePartitionsProcedure() extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
index 22f42e474bd..86fe0575f9c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableStatsProcedure.scala
@@ -30,7 +30,7 @@ import scala.collection.JavaConversions._
class ShowMetadataTableStatsProcedure() extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
index 48dcc09b826..8516b8bef2c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
@@ -35,14 +35,14 @@ import scala.collection.JavaConverters._
class ShowRollbacksProcedure(showDetails: Boolean) extends BaseProcedure with
ProcedureBuilder {
private val ROLLBACKS_PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
)
private val ROLLBACK_PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.required(2, "instant_time", DataTypes.StringType)
)
private val ROLLBACKS_OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
index b7fbe46a78c..3a789f95105 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
@@ -28,8 +28,8 @@ import java.util.stream.Collectors
class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "path", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
index e245159c849..9846a2906e1 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala
@@ -27,8 +27,8 @@ import scala.collection.JavaConversions._
class ShowTablePropertiesProcedure() extends BaseProcedure with
ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
- ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
index 29b76276194..524767a230c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsFileSizeProcedure.scala
@@ -31,7 +31,7 @@ import
scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapCon
class StatsFileSizeProcedure extends BaseProcedure with ProcedureBuilder {
override def parameters: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "partition_path", DataTypes.StringType, ""),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
index b83d97f047e..0c0f55cca5e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala
@@ -28,7 +28,7 @@ import
scala.collection.JavaConverters.asScalaIteratorConverter
class StatsWriteAmplificationProcedure extends BaseProcedure with
ProcedureBuilder {
override def parameters: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
index 49cbe5e2de6..0ae413040bc 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -34,8 +34,8 @@ import scala.util.{Failure, Success, Try}
class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "to_version", DataTypes.StringType, None)
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.required(1, "to_version", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
index 76c6b471740..774baf854a1 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala
@@ -35,11 +35,11 @@ import scala.collection.JavaConverters._
class ValidateHoodieSyncProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "src_table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "dst_table", DataTypes.StringType, None),
- ProcedureParameter.required(2, "mode", DataTypes.StringType, "complete"),
- ProcedureParameter.required(3, "hive_server_url", DataTypes.StringType,
None),
- ProcedureParameter.required(4, "hive_pass", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "src_table", DataTypes.StringType),
+ ProcedureParameter.required(1, "dst_table", DataTypes.StringType),
+ ProcedureParameter.required(2, "mode", DataTypes.StringType),
+ ProcedureParameter.required(3, "hive_server_url", DataTypes.StringType),
+ ProcedureParameter.required(4, "hive_pass", DataTypes.StringType),
ProcedureParameter.optional(5, "src_db", DataTypes.StringType, "rawdata"),
ProcedureParameter.optional(6, "target_db", DataTypes.StringType,
"dwh_hoodie"),
ProcedureParameter.optional(7, "partition_cnt", DataTypes.IntegerType, 5),
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
index c11d8434ddc..65b9fc49270 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataTableFilesProcedure.scala
@@ -36,7 +36,7 @@ import
scala.collection.JavaConverters.asScalaIteratorConverter
class ValidateMetadataTableFilesProcedure() extends BaseProcedure with
ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false)
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
index 4dd950e382c..e90a10e9f9a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala
@@ -22,99 +22,182 @@ package org.apache.spark.sql.hudi.procedure
class TestCleanProcedure extends HoodieSparkProcedureTestBase {
test("Test Call run_clean Procedure by Table") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- | ) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey = 'id',
- | type = 'cow',
- | preCombineField = 'ts'
- | )
- |""".stripMargin)
-
- spark.sql("set hoodie.parquet.max.file.size = 10000")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"update $tableName set price = 11 where id = 1")
- spark.sql(s"update $tableName set price = 12 where id = 1")
- spark.sql(s"update $tableName set price = 13 where id = 1")
-
- // KEEP_LATEST_COMMITS
- val result1 = spark.sql(s"call run_clean(table => '$tableName',
retain_commits => 1)")
- .collect()
- .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2),
row.getString(3), row.getString(4), row.getInt(5)))
-
- assertResult(1)(result1.length)
- assertResult(2)(result1(0)(2))
-
- val result11 = spark.sql(
- s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
- assertResult(2)(result11.length)
-
- checkAnswer(s"select id, name, price, ts from $tableName order by id") (
- Seq(1, "a1", 13, 1000)
- )
-
- val result2 = spark.sql(s"call run_clean(table => '$tableName',
retain_commits => 1)")
- .collect()
- assertResult(0)(result2.length)
-
- // KEEP_LATEST_FILE_VERSIONS
- spark.sql(s"update $tableName set price = 14 where id = 1")
-
- val result3 = spark.sql(
- s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
- assertResult(0)(result3.length)
-
- val result4 = spark.sql(
- s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
- assertResult(3)(result4.length)
-
- val result5 = spark.sql(
- s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
- assertResult(1)(result5.length)
- assertResult(2)(result5(0)(2))
-
- val result6 = spark.sql(
- s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
- assertResult(1)(result6.length)
-
- checkAnswer(s"select id, name, price, ts from $tableName order by id") (
- Seq(1, "a1", 14, 1000)
- )
-
- // trigger time
- spark.sql(s"update $tableName set price = 15 where id = 1")
-
- // no trigger, only has 1 commit
- val result7 = spark.sql(
- s"call run_clean(table => '$tableName', trigger_max_commits => 2,
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained =>
1)").collect()
- assertResult(0)(result7.length)
-
- val result8 = spark.sql(
- s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
- assertResult(2)(result8.length)
-
- // trigger
- val result9 = spark.sql(
- s"call run_clean(table => '$tableName', trigger_max_commits => 1,
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained =>
1)").collect()
- assertResult(1)(result9.length)
- assertResult(1)(result9(0)(2))
-
- val result10 = spark.sql(
- s"call show_fsview_all(table => '$tableName', path_regex => '', limit
=> 10)").collect()
- assertResult(1)(result10.length)
-
- checkAnswer(s"select id, name, price, ts from $tableName order by id") (
- Seq(1, "a1", 15, 1000)
- )
+ withSQLConf("hoodie.clean.automatic" -> "false",
"hoodie.parquet.max.file.size" -> "10000") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ |""".stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"update $tableName set price = 11 where id = 1")
+ spark.sql(s"update $tableName set price = 12 where id = 1")
+ spark.sql(s"update $tableName set price = 13 where id = 1")
+
+ // KEEP_LATEST_COMMITS
+ val result1 = spark.sql(s"call run_clean(table => '$tableName',
retain_commits => 1)")
+ .collect()
+ .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2),
row.getString(3), row.getString(4), row.getInt(5)))
+
+ assertResult(1)(result1.length)
+ assertResult(2)(result1(0)(2))
+
+ val result11 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(2)(result11.length)
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 13, 1000)
+ )
+
+ val result2 = spark.sql(s"call run_clean(table => '$tableName',
retain_commits => 1)")
+ .collect()
+ assertResult(0)(result2.length)
+
+ // KEEP_LATEST_FILE_VERSIONS
+ spark.sql(s"update $tableName set price = 14 where id = 1")
+
+ val result3 = spark.sql(
+ s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
+ assertResult(0)(result3.length)
+
+ val result4 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(3)(result4.length)
+
+ val result5 = spark.sql(
+ s"call run_clean(table => '$tableName', clean_policy =>
'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
+ assertResult(1)(result5.length)
+ assertResult(2)(result5(0)(2))
+
+ val result6 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(1)(result6.length)
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 14, 1000)
+ )
+
+ // trigger time
+ spark.sql(s"update $tableName set price = 15 where id = 1")
+
+ // no trigger, only has 1 commit
+ val result7 = spark.sql(
+ s"call run_clean(table => '$tableName', trigger_max_commits => 2,
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained =>
1)").collect()
+ assertResult(0)(result7.length)
+
+ val result8 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(2)(result8.length)
+
+ // trigger
+ val result9 = spark.sql(
+ s"call run_clean(table => '$tableName', trigger_max_commits => 1,
clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained =>
1)").collect()
+ assertResult(1)(result9.length)
+ assertResult(1)(result9(0)(2))
+
+ val result10 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(1)(result10.length)
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 15, 1000)
+ )
+ }
+ }
+ }
+
+ test("Test Call run_clean Procedure with table props") {
+ withSQLConf("hoodie.clean.automatic" -> "false") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.cleaner.policy = 'KEEP_LATEST_COMMITS',
+ | hoodie.cleaner.commits.retained = '2'
+ | )
+ |""".stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
+
+ val result1 = spark.sql(s"call run_clean(table => '$tableName')")
+ .collect()
+ .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2),
row.getString(3), row.getString(4), row.getInt(5)))
+ assertResult(1)(result1.length)
+ assertResult(1)(result1(0)(2))
+
+ val result2 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(3)(result2.length)
+ }
+ }
+ }
+
+ test("Test Call run_clean Procedure with options") {
+ withSQLConf("hoodie.clean.automatic" -> "false") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ |""".stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1001)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1002)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1003)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1004)")
+
+ val result1 = spark.sql(
+ s"""call run_clean(table => '$tableName', options => "
+ | hoodie.cleaner.policy=KEEP_LATEST_COMMITS,
+ | hoodie.cleaner.commits.retained=2
+ |")""".stripMargin)
+ .collect()
+ .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2),
row.getString(3), row.getString(4), row.getInt(5)))
+ assertResult(1)(result1.length)
+ assertResult(1)(result1(0)(2))
+
+ val result2 = spark.sql(
+ s"call show_fsview_all(table => '$tableName', path_regex => '',
limit => 10)").collect()
+ assertResult(3)(result2.length)
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 236d87970d2..6a6e74e8b72 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -174,6 +174,7 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
)
}
}
+
test("Test show_compaction Procedure by Path") {
withTempDir { tmp =>
val tableName1 = generateTableName
@@ -205,4 +206,48 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
assertResult(2)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}/$tableName1')").collect().length)
}
}
+
+ test("Test run_compaction Procedure with options") {
+ withSQLConf("hoodie.compact.inline" -> "false",
"hoodie.compact.inline.max.delta.commits" -> "1") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a3', 10, 1000)")
+
+ val result1 = spark.sql(
+ s"""call run_compaction(table => '$tableName', op => 'run', options
=> "
+ |
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,
+ | hoodie.compaction.logfile.num.threshold=3
+ |")""".stripMargin)
+ .collect()
+ assertResult(0)(result1.length)
+
+ spark.sql(s"insert into $tableName values(1, 'a4', 10, 1000)")
+ val result2 = spark.sql(
+ s"""call run_compaction(table => '$tableName', op => 'run', options
=> "
+ |
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,
+ | hoodie.compaction.logfile.num.threshold=3
+ |")""".stripMargin)
+ .collect()
+ assertResult(1)(result2.length)
+ }
+ }
+ }
}