This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 210d6c929cb [HUDI-6447] Unify operation in compaction and clustering
procedure (#9068)
210d6c929cb is described below
commit 210d6c929cb9ee17048240fa27acd90fff8fce6b
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jul 6 11:45:27 2023 +0800
[HUDI-6447] Unify operation in compaction and clustering procedure (#9068)
---
.../apache/hudi/config/HoodieClusteringConfig.java | 48 -----------
.../command/procedures/HoodieProcedureUtils.scala | 79 ++++++++++++++++++
.../procedures/RunClusteringProcedure.scala | 68 ++++------------
.../procedures/RunCompactionProcedure.scala | 95 ++++++++++------------
.../spark/sql/hudi/TestCompactionTable.scala | 2 +-
.../hudi/procedure/TestClusteringProcedure.scala | 5 +-
.../hudi/procedure/TestCompactionProcedure.scala | 95 +++++++++++++++++++---
7 files changed, 226 insertions(+), 166 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e9ff847a6f0..a124a75639b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -750,52 +750,4 @@ public class HoodieClusteringConfig extends HoodieConfig {
return value;
}
}
-
- public enum ClusteringOperator {
-
- /**
- * only schedule the clustering plan
- */
- SCHEDULE("schedule"),
-
- /**
- * only execute then pending clustering plans
- */
- EXECUTE("execute"),
-
- /**
- * schedule cluster first, and execute all pending clustering plans
- */
- SCHEDULE_AND_EXECUTE("scheduleandexecute");
-
- private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP =
- TypeUtils.getValueToEnumMap(ClusteringOperator.class, e ->
e.value);
-
- private final String value;
-
- ClusteringOperator(String value) {
- this.value = value;
- }
-
- @Nonnull
- public static ClusteringOperator fromValue(String value) {
- ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value);
- if (enumValue == null) {
- throw new HoodieException(String.format("Invalid value (%s)", value));
- }
- return enumValue;
- }
-
- public boolean isSchedule() {
- return this != ClusteringOperator.EXECUTE;
- }
-
- public boolean isExecute() {
- return this != ClusteringOperator.SCHEDULE;
- }
-
- public String getValue() {
- return value;
- }
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
index 374f86773d1..22f6c3c9fd5 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
@@ -19,9 +19,13 @@
package org.apache.spark.sql.hudi.command.procedures
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.HoodieException
+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import java.util
+import scala.collection.JavaConverters._
object HoodieProcedureUtils {
@@ -43,4 +47,79 @@ object HoodieProcedureUtils {
ProcedureArgs(isNamedArgs = true, map, new GenericInternalRow(values))
}
+
+ sealed trait Operation {
+ def value: String
+
+ def isSchedule: Boolean
+
+ def isExecute: Boolean
+ }
+
+ /**
+ * schedule: schedule a new plan
+ */
+ case object Schedule extends Operation {
+ override def value: String = "schedule"
+
+ override def isSchedule: Boolean = true
+
+ override def isExecute: Boolean = false
+ }
+
+ /**
+ * execute: if specific instants exist, execute them, otherwise execute all
pending plans
+ */
+ case object Execute extends Operation {
+ override def value: String = "execute"
+
+ override def isSchedule: Boolean = false
+
+ override def isExecute: Boolean = true
+ }
+
+ /**
+ * scheduleAndExecute: schedule a new plan and then execute it, if no plan
is generated during
+ * schedule, execute all pending plans
+ */
+ case object ScheduleAndExecute extends Operation {
+ override def value: String = "scheduleandexecute"
+
+ override def isSchedule: Boolean = true
+
+ override def isExecute: Boolean = true
+ }
+
+ object Operation {
+ private val ValueToEnumMap: Map[String, Operation with Product with
Serializable] = Seq(Schedule, Execute, ScheduleAndExecute)
+ .map(enum => enum.value -> enum).toMap
+
+ def fromValue(value: String): Operation = {
+ ValueToEnumMap.getOrElse(value, throw new HoodieException(s"Invalid
value ($value)"))
+ }
+ }
+
+ def fileterPendingInstantsAndGetOperation(pendingInstants: Seq[String],
specificInstants: Option[String], op: Option[String]): (Seq[String], Operation)
= {
+ specificInstants match {
+ case Some(inst) =>
+ if (op.exists(o => !Execute.value.equalsIgnoreCase(o))) {
+ throw new HoodieException("specific instants only can be used in
'execute' op or not specific op")
+ }
+ // No op specified, set it as 'execute' with instants specified
+ (HoodieProcedureUtils.checkAndFilterPendingInstants(pendingInstants,
inst), Execute)
+ case _ =>
+ // No op specified, set it as 'scheduleAndExecute' default
+ (pendingInstants, op.map(o =>
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+ }
+ }
+
+ def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr:
String): Seq[String] = {
+ val instants = StringUtils.split(instantStr, ",").asScala
+ val pendingSet = pendingInstants.toSet
+ val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
+ if (noneInstants.nonEmpty) {
+ throw new HoodieException (s"specific ${noneInstants.mkString(",")}
instants is not exist")
+ }
+ instants.sortBy(f => f)
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index edc9aee8090..bf04470a71f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -22,11 +22,12 @@ import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option =>
HOption}
+import org.apache.hudi.common.util.{ClusteringUtils, HoodieTimer, Option =>
HOption}
import org.apache.hudi.config.HoodieClusteringConfig
-import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator,
LayoutOptimizationStrategy}
+import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr,
splitPartitionAndDataPredicates}
import org.apache.spark.sql.Row
@@ -34,7 +35,6 @@ import
org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.types._
-import java.util.Locale
import java.util.function.Supplier
import scala.collection.JavaConverters._
@@ -83,7 +83,7 @@ class RunClusteringProcedure extends BaseProcedure
val op = getArgValueOrDefault(args, PARAMETERS(5))
val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
val options = getArgValueOrDefault(args, PARAMETERS(7))
- val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
+ val specificInstants = getArgValueOrDefault(args, PARAMETERS(8))
val parts = getArgValueOrDefault(args, PARAMETERS(9))
val basePath: String = getBasePath(tableName, tablePath)
@@ -140,57 +140,34 @@ class RunClusteringProcedure extends BaseProcedure
logInfo("No options")
}
- // Get all pending clustering instants
- var pendingClustering =
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
+ val pendingClusteringInstants =
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
- var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE
- pendingClustering = instantsStr match {
- case Some(inst) =>
- op match {
- case Some(o) =>
- if
(!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) {
- throw new HoodieClusteringException("specific instants only can
be used in 'execute' op or not specific op")
- }
- case _ =>
- logInfo("No op and set it to EXECUTE with instants specified.")
- }
- operator = ClusteringOperator.EXECUTE
- checkAndFilterPendingInstants(pendingClustering,
inst.asInstanceOf[String])
- case _ =>
- logInfo("No specific instants")
- op match {
- case Some(o) =>
- operator =
ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT))
- case _ =>
- logInfo("No op, use default scheduleAndExecute")
- }
- pendingClustering
- }
-
- logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
+ var (filteredPendingClusteringInstants, operation) =
HoodieProcedureUtils.fileterPendingInstantsAndGetOperation(
+ pendingClusteringInstants,
specificInstants.asInstanceOf[Option[String]], op.asInstanceOf[Option[String]])
var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
tableName.asInstanceOf[Option[String]])
- if (operator.isSchedule) {
+
+ if (operation.isSchedule) {
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
- pendingClustering ++= Seq(instantTime)
+ filteredPendingClusteringInstants = Seq(instantTime)
}
}
- logInfo(s"Clustering instants to run:
${pendingClustering.mkString(",")}.")
+ logInfo(s"Clustering instants to run:
${filteredPendingClusteringInstants.mkString(",")}.")
- if (operator.isExecute) {
- val startTs = System.currentTimeMillis()
- pendingClustering.foreach(client.cluster(_, true))
- logInfo(s"Finish clustering all the instants:
${pendingClustering.mkString(",")}," +
- s" time cost: ${System.currentTimeMillis() - startTs}ms.")
+ if (operation.isExecute) {
+ val timer = HoodieTimer.start
+ filteredPendingClusteringInstants.foreach(client.cluster(_, true))
+ logInfo(s"Finish clustering at instants:
${filteredPendingClusteringInstants.mkString(",")}," +
+ s" spend: ${timer.endTimer()}ms.")
}
val clusteringInstants =
metaClient.reloadActiveTimeline().getInstants.iterator().asScala
- .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION &&
pendingClustering.contains(p.getTimestamp))
+ .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION &&
filteredPendingClusteringInstants.contains(p.getTimestamp))
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
@@ -251,17 +228,6 @@ class RunClusteringProcedure extends BaseProcedure
}
})
}
-
- private def checkAndFilterPendingInstants(pendingInstants: Seq[String],
instantStr: String): Seq[String] = {
- val instants = StringUtils.split(instantStr, ",").asScala
- val pendingSet = pendingInstants.toSet
- val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
- if (noneInstants.nonEmpty) {
- throw new HoodieClusteringException(s"specific
${noneInstants.mkString(",")} instants is not exist")
- }
- instants.sortBy(f => f)
- }
-
}
object RunClusteringProcedure {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 47c371f0c7f..35e3b8331fd 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -24,6 +24,7 @@ import
org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option =>
HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
@@ -43,7 +44,8 @@ class RunCompactionProcedure extends BaseProcedure with
ProcedureBuilder with Sp
ProcedureParameter.optional(1, "table", DataTypes.StringType),
ProcedureParameter.optional(2, "path", DataTypes.StringType),
ProcedureParameter.optional(3, "timestamp", DataTypes.LongType),
- ProcedureParameter.optional(4, "options", DataTypes.StringType)
+ ProcedureParameter.optional(4, "options", DataTypes.StringType),
+ ProcedureParameter.optional(5, "instants", DataTypes.StringType)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -59,7 +61,7 @@ class RunCompactionProcedure extends BaseProcedure with
ProcedureBuilder with Sp
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
- val operation = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String].toLowerCase
+ var op = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String].toLowerCase
val tableName = getArgValueOrDefault(args, PARAMETERS(1))
val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
val instantTimestamp = getArgValueOrDefault(args, PARAMETERS(3))
@@ -67,68 +69,55 @@ class RunCompactionProcedure extends BaseProcedure with
ProcedureBuilder with Sp
if (getArgValueOrDefault(args, PARAMETERS(4)).isDefined) {
confs = confs ++
HoodieCLIUtils.extractOptions(getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String])
}
+ var specificInstants = getArgValueOrDefault(args, PARAMETERS(5))
+
+ // For old version compatibility
+ if (op.equals("run")) {
+ op = "scheduleandexecute"
+ }
+ if (instantTimestamp.isDefined && specificInstants.isEmpty) {
+ specificInstants = Option(instantTimestamp.get.toString)
+ op = "execute"
+ }
val basePath = getBasePath(tableName, tablePath)
val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val pendingCompactionInstants =
metaClient.getActiveTimeline.getWriteTimeline.getInstants.iterator().asScala
+ .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+ .map(_.getTimestamp)
+ .toSeq.sortBy(f => f)
+
+ var (filteredPendingCompactionInstants, operation) =
HoodieProcedureUtils.fileterPendingInstantsAndGetOperation(
+ pendingCompactionInstants,
specificInstants.asInstanceOf[Option[String]], Option(op))
+
var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
tableName.asInstanceOf[Option[String]])
- var willCompactionInstants: Seq[String] = Seq.empty
- operation match {
- case "schedule" =>
- val instantTime =
instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
- if (client.scheduleCompactionAtInstant(instantTime,
HOption.empty[java.util.Map[String, String]])) {
- willCompactionInstants = Seq(instantTime)
- }
- case "run" =>
- // Do compaction
- val timeLine = metaClient.getActiveTimeline
- val pendingCompactionInstants =
timeLine.getWriteTimeline.getInstants.iterator().asScala
- .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
- .map(_.getTimestamp)
- .toSeq.sortBy(f => f)
- willCompactionInstants = if (instantTimestamp.isEmpty) {
- if (pendingCompactionInstants.nonEmpty) {
- pendingCompactionInstants
- } else { // If there are no pending compaction, schedule to
generate one.
- // CompactionHoodiePathCommand will return instanceTime for
SCHEDULE.
- val instantTime = HoodieActiveTimeline.createNewInstantTime()
- if (client.scheduleCompactionAtInstant(instantTime,
HOption.empty[java.util.Map[String, String]])) {
- Seq(instantTime)
- } else {
- Seq.empty
- }
- }
- } else {
- // Check if the compaction timestamp has exists in the pending
compaction
- if
(pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
- Seq(instantTimestamp.get.toString)
- } else {
- throw new IllegalArgumentException(s"Compaction instant:
${instantTimestamp.get} is not found in " +
- s"$basePath, Available pending compaction instants are:
${pendingCompactionInstants.mkString(",")} ")
- }
- }
-
- if (willCompactionInstants.isEmpty) {
- logInfo(s"No need to compaction on $basePath")
- } else {
- logInfo(s"Run compaction at instants:
[${willCompactionInstants.mkString(",")}] on $basePath")
- val timer = HoodieTimer.start
- willCompactionInstants.foreach { compactionInstant =>
- val writeResponse = client.compact(compactionInstant)
- handleResponse(writeResponse.getCommitMetadata.get())
- client.commitCompaction(compactionInstant,
writeResponse.getCommitMetadata.get(), HOption.empty())
- }
- logInfo(s"Finish Run compaction at instants:
[${willCompactionInstants.mkString(",")}]," +
- s" spend: ${timer.endTimer()}ms")
- }
- case _ => throw new UnsupportedOperationException(s"Unsupported
compaction operation: $operation")
+
+ if (operation.isSchedule) {
+ val instantTime = HoodieActiveTimeline.createNewInstantTime
+ if (client.scheduleCompactionAtInstant(instantTime,
HOption.empty[java.util.Map[String, String]])) {
+ filteredPendingCompactionInstants = Seq(instantTime)
+ }
+ }
+
+ logInfo(s"Compaction instants to run:
${filteredPendingCompactionInstants.mkString(",")}.")
+
+ if (operation.isExecute) {
+ val timer = HoodieTimer.start
+ filteredPendingCompactionInstants.foreach { compactionInstant =>
+ val writeResponse = client.compact(compactionInstant)
+ handleResponse(writeResponse.getCommitMetadata.get())
+ client.commitCompaction(compactionInstant,
writeResponse.getCommitMetadata.get(), HOption.empty())
+ }
+ logInfo(s"Finish Run compaction at instants:
[${filteredPendingCompactionInstants.mkString(",")}]," +
+ s" spend: ${timer.endTimer()}ms")
}
val compactionInstants =
metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
- .filter(instant =>
willCompactionInstants.contains(instant.getTimestamp))
+ .filter(instant =>
filteredPendingCompactionInstants.contains(instant.getTimestamp))
.toSeq
.sortBy(p => p.getTimestamp)
.reverse
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index a51912b5feb..ea9588419b3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -122,7 +122,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
assertResult(2)(spark.sql(s"show compaction on
'${tmp.getCanonicalPath}'").collect().length)
checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
- s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath},
Available pending compaction instants are: "
+ s"specific 12345 instants is not exist"
)
})
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 7b8dc8f8a90..b4cfc8e9d8c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -19,8 +19,6 @@
package org.apache.spark.sql.hudi.procedure
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
@@ -29,6 +27,9 @@ import
org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstan
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils,
HoodieDataSourceHelpers, HoodieFileIndex}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
import org.apache.spark.sql.types.{DataTypes, Metadata, StringType,
StructField, StructType}
import org.apache.spark.sql.{Dataset, Row}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 6a6e74e8b72..02e9406cdde 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -19,8 +19,11 @@
package org.apache.spark.sql.hudi.procedure
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hadoop.conf.Configuration
+
class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
test("Test Call run_compaction Procedure by Table") {
@@ -170,7 +173,7 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
assertResult(2)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}')").collect().length)
checkException(s"call run_compaction(op => 'run', path =>
'${tmp.getCanonicalPath}', timestamp => 12345L)")(
- s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath},
Available pending compaction instants are: "
+ s"specific 12345 instants is not exist"
)
}
}
@@ -196,13 +199,9 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
-
- spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
-
- spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
-
- spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
-
+ spark.sql(s"update $tableName1 set name = 'a2' where id = 1")
+ spark.sql(s"update $tableName1 set name = 'a3' where id = 1")
+ spark.sql(s"update $tableName1 set name = 'a4' where id = 1")
assertResult(2)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}/$tableName1')").collect().length)
}
}
@@ -228,8 +227,8 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a3', 10, 1000)")
+ spark.sql(s"update $tableName set name = 'a2' where id = 1")
+ spark.sql(s"update $tableName set name = 'a3' where id = 1")
val result1 = spark.sql(
s"""call run_compaction(table => '$tableName', op => 'run', options
=> "
@@ -239,7 +238,7 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
.collect()
assertResult(0)(result1.length)
- spark.sql(s"insert into $tableName values(1, 'a4', 10, 1000)")
+ spark.sql(s"update $tableName set name = 'a4' where id = 1")
val result2 = spark.sql(
s"""call run_compaction(table => '$tableName', op => 'run', options
=> "
|
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,
@@ -250,4 +249,78 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
}
}
}
+
+ test("Test Call run_compaction Procedure with specific instants") {
+ withSQLConf("hoodie.compact.inline" -> "false",
"hoodie.compact.inline.max.delta.commits" -> "1") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"update $tableName set name = 'a2' where id = 1")
+
+ spark.sql(s"call run_compaction(table => '$tableName', op =>
'schedule')")
+
+ val metaClient = HoodieTableMetaClient.builder.setConf(new
Configuration).setBasePath(tmp.getCanonicalPath).build
+ val instants =
metaClient.getActiveTimeline.filterPendingCompactionTimeline().getInstants
+ assertResult(1)(instants.size())
+ val ts = instants.get(0).getTimestamp
+ assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName',
op => 'execute', instants => '$ts')").collect().length)
+
+ checkExceptionContain(
+ s"call run_compaction(table => '$tableName', op => 'execute',
instants => '000000')"
+ )("specific 000000 instants is not exist")
+ }
+ }
+ }
+
+ test("Test Call run_compaction Procedure with operation") {
+ withSQLConf("hoodie.compact.inline" -> "false",
"hoodie.compact.inline.max.delta.commits" -> "1") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"update $tableName set name = 'a2' where id = 1")
+
+ assertResult(0)(spark.sql(s"call run_compaction(table => '$tableName',
op => 'execute')").collect().length)
+
+ assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName',
op => 'schedule')").collect().length)
+
+ assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName',
op => 'execute')").collect().length)
+
+ spark.sql(s"update $tableName set name = 'a3' where id = 1")
+
+ assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName',
op => 'scheduleAndExecute')").collect().length)
+ }
+ }
+ }
}