This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 210d6c929cb [HUDI-6447] Unify operation in compaction and clustering 
procedure (#9068)
210d6c929cb is described below

commit 210d6c929cb9ee17048240fa27acd90fff8fce6b
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jul 6 11:45:27 2023 +0800

    [HUDI-6447] Unify operation in compaction and clustering procedure (#9068)
---
 .../apache/hudi/config/HoodieClusteringConfig.java | 48 -----------
 .../command/procedures/HoodieProcedureUtils.scala  | 79 ++++++++++++++++++
 .../procedures/RunClusteringProcedure.scala        | 68 ++++------------
 .../procedures/RunCompactionProcedure.scala        | 95 ++++++++++------------
 .../spark/sql/hudi/TestCompactionTable.scala       |  2 +-
 .../hudi/procedure/TestClusteringProcedure.scala   |  5 +-
 .../hudi/procedure/TestCompactionProcedure.scala   | 95 +++++++++++++++++++---
 7 files changed, 226 insertions(+), 166 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e9ff847a6f0..a124a75639b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -750,52 +750,4 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return value;
     }
   }
-
-  public enum ClusteringOperator {
-
-    /**
-     * only schedule the clustering plan
-     */
-    SCHEDULE("schedule"),
-
-    /**
-     * only execute then pending clustering plans
-     */
-    EXECUTE("execute"),
-
-    /**
-     * schedule cluster first, and execute all pending clustering plans
-     */
-    SCHEDULE_AND_EXECUTE("scheduleandexecute");
-
-    private static final Map<String, ClusteringOperator> VALUE_TO_ENUM_MAP =
-            TypeUtils.getValueToEnumMap(ClusteringOperator.class, e -> 
e.value);
-
-    private final String value;
-
-    ClusteringOperator(String value) {
-      this.value = value;
-    }
-
-    @Nonnull
-    public static ClusteringOperator fromValue(String value) {
-      ClusteringOperator enumValue = VALUE_TO_ENUM_MAP.get(value);
-      if (enumValue == null) {
-        throw new HoodieException(String.format("Invalid value (%s)", value));
-      }
-      return enumValue;
-    }
-
-    public boolean isSchedule() {
-      return this != ClusteringOperator.EXECUTE;
-    }
-
-    public boolean isExecute() {
-      return this != ClusteringOperator.SCHEDULE;
-    }
-
-    public String getValue() {
-      return value;
-    }
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
index 374f86773d1..22f6c3c9fd5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala
@@ -19,9 +19,13 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.HoodieException
+
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 
 import java.util
+import scala.collection.JavaConverters._
 
 object HoodieProcedureUtils {
 
@@ -43,4 +47,79 @@ object HoodieProcedureUtils {
 
     ProcedureArgs(isNamedArgs = true, map, new GenericInternalRow(values))
   }
+
+  sealed trait Operation {
+    def value: String
+
+    def isSchedule: Boolean
+
+    def isExecute: Boolean
+  }
+
+  /**
+   * schedule: schedule a new plan
+   */
+  case object Schedule extends Operation {
+    override def value: String = "schedule"
+
+    override def isSchedule: Boolean = true
+
+    override def isExecute: Boolean = false
+  }
+
+  /**
+   * execute: if specific instants exist, execute them, otherwise execute all 
pending plans
+   */
+  case object Execute extends Operation {
+    override def value: String = "execute"
+
+    override def isSchedule: Boolean = false
+
+    override def isExecute: Boolean = true
+  }
+
+  /**
+   * scheduleAndExecute: schedule a new plan and then execute it, if no plan 
is generated during
+   * schedule, execute all pending plans
+   */
+  case object ScheduleAndExecute extends Operation {
+    override def value: String = "scheduleandexecute"
+
+    override def isSchedule: Boolean = true
+
+    override def isExecute: Boolean = true
+  }
+
+  object Operation {
+    private val ValueToEnumMap: Map[String, Operation with Product with 
Serializable] = Seq(Schedule, Execute, ScheduleAndExecute)
+      .map(enum => enum.value -> enum).toMap
+
+    def fromValue(value: String): Operation = {
+      ValueToEnumMap.getOrElse(value, throw new HoodieException(s"Invalid 
value ($value)"))
+    }
+  }
+
+  def fileterPendingInstantsAndGetOperation(pendingInstants: Seq[String], 
specificInstants: Option[String], op: Option[String]): (Seq[String], Operation) 
= {
+    specificInstants match {
+      case Some(inst) =>
+        if (op.exists(o => !Execute.value.equalsIgnoreCase(o))) {
+          throw new HoodieException("specific instants only can be used in 
'execute' op or not specific op")
+        }
+        // No op specified, set it as 'execute' with instants specified
+        (HoodieProcedureUtils.checkAndFilterPendingInstants(pendingInstants, 
inst), Execute)
+      case _ =>
+        // No op specified, set it as 'scheduleAndExecute' default
+        (pendingInstants, op.map(o => 
Operation.fromValue(o.toLowerCase)).getOrElse(ScheduleAndExecute))
+    }
+  }
+
+  def checkAndFilterPendingInstants(pendingInstants: Seq[String], instantStr: 
String): Seq[String] = {
+    val instants = StringUtils.split(instantStr, ",").asScala
+    val pendingSet = pendingInstants.toSet
+    val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
+    if (noneInstants.nonEmpty) {
+      throw new HoodieException (s"specific ${noneInstants.mkString(",")} 
instants is not exist")
+    }
+    instants.sortBy(f => f)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index edc9aee8090..bf04470a71f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -22,11 +22,12 @@ import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
-import org.apache.hudi.common.util.{ClusteringUtils, StringUtils, Option => 
HOption}
+import org.apache.hudi.common.util.{ClusteringUtils, HoodieTimer, Option => 
HOption}
 import org.apache.hudi.config.HoodieClusteringConfig
-import org.apache.hudi.config.HoodieClusteringConfig.{ClusteringOperator, 
LayoutOptimizationStrategy}
+import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
 import org.apache.hudi.exception.HoodieClusteringException
 import org.apache.hudi.{AvroConversionUtils, HoodieCLIUtils, HoodieFileIndex}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.{resolveExpr, 
splitPartitionAndDataPredicates}
 import org.apache.spark.sql.Row
@@ -34,7 +35,6 @@ import 
org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.execution.datasources.FileStatusCache
 import org.apache.spark.sql.types._
 
-import java.util.Locale
 import java.util.function.Supplier
 import scala.collection.JavaConverters._
 
@@ -83,7 +83,7 @@ class RunClusteringProcedure extends BaseProcedure
     val op = getArgValueOrDefault(args, PARAMETERS(5))
     val orderStrategy = getArgValueOrDefault(args, PARAMETERS(6))
     val options = getArgValueOrDefault(args, PARAMETERS(7))
-    val instantsStr = getArgValueOrDefault(args, PARAMETERS(8))
+    val specificInstants = getArgValueOrDefault(args, PARAMETERS(8))
     val parts = getArgValueOrDefault(args, PARAMETERS(9))
 
     val basePath: String = getBasePath(tableName, tablePath)
@@ -140,57 +140,34 @@ class RunClusteringProcedure extends BaseProcedure
         logInfo("No options")
     }
 
-    // Get all pending clustering instants
-    var pendingClustering = 
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
+    val pendingClusteringInstants = 
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
       .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
 
-    var operator: ClusteringOperator = ClusteringOperator.SCHEDULE_AND_EXECUTE
-    pendingClustering = instantsStr match {
-      case Some(inst) =>
-        op match {
-          case Some(o) =>
-            if 
(!ClusteringOperator.EXECUTE.name().equalsIgnoreCase(o.asInstanceOf[String])) {
-              throw new HoodieClusteringException("specific instants only can 
be used in 'execute' op or not specific op")
-            }
-          case _ =>
-            logInfo("No op and set it to EXECUTE with instants specified.")
-        }
-        operator = ClusteringOperator.EXECUTE
-        checkAndFilterPendingInstants(pendingClustering, 
inst.asInstanceOf[String])
-      case _ =>
-        logInfo("No specific instants")
-        op match {
-          case Some(o) =>
-            operator = 
ClusteringOperator.fromValue(o.asInstanceOf[String].toLowerCase(Locale.ROOT))
-          case _ =>
-            logInfo("No op, use default scheduleAndExecute")
-        }
-        pendingClustering
-    }
-
-    logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")
+    var (filteredPendingClusteringInstants, operation) = 
HoodieProcedureUtils.fileterPendingInstantsAndGetOperation(
+      pendingClusteringInstants, 
specificInstants.asInstanceOf[Option[String]], op.asInstanceOf[Option[String]])
 
     var client: SparkRDDWriteClient[_] = null
     try {
       client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
         tableName.asInstanceOf[Option[String]])
-      if (operator.isSchedule) {
+
+      if (operation.isSchedule) {
         val instantTime = HoodieActiveTimeline.createNewInstantTime
         if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
-          pendingClustering ++= Seq(instantTime)
+          filteredPendingClusteringInstants = Seq(instantTime)
         }
       }
-      logInfo(s"Clustering instants to run: 
${pendingClustering.mkString(",")}.")
+      logInfo(s"Clustering instants to run: 
${filteredPendingClusteringInstants.mkString(",")}.")
 
-      if (operator.isExecute) {
-        val startTs = System.currentTimeMillis()
-        pendingClustering.foreach(client.cluster(_, true))
-        logInfo(s"Finish clustering all the instants: 
${pendingClustering.mkString(",")}," +
-          s" time cost: ${System.currentTimeMillis() - startTs}ms.")
+      if (operation.isExecute) {
+        val timer = HoodieTimer.start
+        filteredPendingClusteringInstants.foreach(client.cluster(_, true))
+        logInfo(s"Finish clustering at instants: 
${filteredPendingClusteringInstants.mkString(",")}," +
+          s" spend: ${timer.endTimer()}ms.")
       }
 
       val clusteringInstants = 
metaClient.reloadActiveTimeline().getInstants.iterator().asScala
-        .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && 
pendingClustering.contains(p.getTimestamp))
+        .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && 
filteredPendingClusteringInstants.contains(p.getTimestamp))
         .toSeq
         .sortBy(f => f.getTimestamp)
         .reverse
@@ -251,17 +228,6 @@ class RunClusteringProcedure extends BaseProcedure
       }
     })
   }
-
-  private def checkAndFilterPendingInstants(pendingInstants: Seq[String], 
instantStr: String): Seq[String] = {
-    val instants = StringUtils.split(instantStr, ",").asScala
-    val pendingSet = pendingInstants.toSet
-    val noneInstants = instants.filter(ins => !pendingSet.contains(ins))
-    if (noneInstants.nonEmpty) {
-      throw new HoodieClusteringException(s"specific 
${noneInstants.mkString(",")} instants is not exist")
-    }
-    instants.sortBy(f => f)
-  }
-
 }
 
 object RunClusteringProcedure {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index 47c371f0c7f..35e3b8331fd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -24,6 +24,7 @@ import 
org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
 import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option => 
HOption}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
@@ -43,7 +44,8 @@ class RunCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
     ProcedureParameter.optional(1, "table", DataTypes.StringType),
     ProcedureParameter.optional(2, "path", DataTypes.StringType),
     ProcedureParameter.optional(3, "timestamp", DataTypes.LongType),
-    ProcedureParameter.optional(4, "options", DataTypes.StringType)
+    ProcedureParameter.optional(4, "options", DataTypes.StringType),
+    ProcedureParameter.optional(5, "instants", DataTypes.StringType)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -59,7 +61,7 @@ class RunCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
   override def call(args: ProcedureArgs): Seq[Row] = {
     super.checkArgs(PARAMETERS, args)
 
-    val operation = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String].toLowerCase
+    var op = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String].toLowerCase
     val tableName = getArgValueOrDefault(args, PARAMETERS(1))
     val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
     val instantTimestamp = getArgValueOrDefault(args, PARAMETERS(3))
@@ -67,68 +69,55 @@ class RunCompactionProcedure extends BaseProcedure with 
ProcedureBuilder with Sp
     if (getArgValueOrDefault(args, PARAMETERS(4)).isDefined) {
       confs = confs ++ 
HoodieCLIUtils.extractOptions(getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String])
     }
+    var specificInstants = getArgValueOrDefault(args, PARAMETERS(5))
+
+    // For old version compatibility
+    if (op.equals("run")) {
+      op = "scheduleandexecute"
+    }
+    if (instantTimestamp.isDefined && specificInstants.isEmpty) {
+      specificInstants = Option(instantTimestamp.get.toString)
+      op = "execute"
+    }
 
     val basePath = getBasePath(tableName, tablePath)
     val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
+    val pendingCompactionInstants = 
metaClient.getActiveTimeline.getWriteTimeline.getInstants.iterator().asScala
+      .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+      .map(_.getTimestamp)
+      .toSeq.sortBy(f => f)
+
+    var (filteredPendingCompactionInstants, operation) = 
HoodieProcedureUtils.fileterPendingInstantsAndGetOperation(
+      pendingCompactionInstants, 
specificInstants.asInstanceOf[Option[String]], Option(op))
+
     var client: SparkRDDWriteClient[_] = null
     try {
       client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
confs,
         tableName.asInstanceOf[Option[String]])
-      var willCompactionInstants: Seq[String] = Seq.empty
-      operation match {
-        case "schedule" =>
-          val instantTime = 
instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
-          if (client.scheduleCompactionAtInstant(instantTime, 
HOption.empty[java.util.Map[String, String]])) {
-            willCompactionInstants = Seq(instantTime)
-          }
-        case "run" =>
-          // Do compaction
-          val timeLine = metaClient.getActiveTimeline
-          val pendingCompactionInstants = 
timeLine.getWriteTimeline.getInstants.iterator().asScala
-            .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
-            .map(_.getTimestamp)
-            .toSeq.sortBy(f => f)
-          willCompactionInstants = if (instantTimestamp.isEmpty) {
-            if (pendingCompactionInstants.nonEmpty) {
-              pendingCompactionInstants
-            } else { // If there are no pending compaction, schedule to 
generate one.
-              // CompactionHoodiePathCommand will return instanceTime for 
SCHEDULE.
-              val instantTime = HoodieActiveTimeline.createNewInstantTime()
-              if (client.scheduleCompactionAtInstant(instantTime, 
HOption.empty[java.util.Map[String, String]])) {
-                Seq(instantTime)
-              } else {
-                Seq.empty
-              }
-            }
-          } else {
-            // Check if the compaction timestamp has exists in the pending 
compaction
-            if 
(pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
-              Seq(instantTimestamp.get.toString)
-            } else {
-              throw new IllegalArgumentException(s"Compaction instant: 
${instantTimestamp.get} is not found in " +
-                s"$basePath, Available pending compaction instants are: 
${pendingCompactionInstants.mkString(",")} ")
-            }
-          }
-
-          if (willCompactionInstants.isEmpty) {
-            logInfo(s"No need to compaction on $basePath")
-          } else {
-            logInfo(s"Run compaction at instants: 
[${willCompactionInstants.mkString(",")}] on $basePath")
-            val timer = HoodieTimer.start
-            willCompactionInstants.foreach { compactionInstant =>
-              val writeResponse = client.compact(compactionInstant)
-              handleResponse(writeResponse.getCommitMetadata.get())
-              client.commitCompaction(compactionInstant, 
writeResponse.getCommitMetadata.get(), HOption.empty())
-            }
-            logInfo(s"Finish Run compaction at instants: 
[${willCompactionInstants.mkString(",")}]," +
-              s" spend: ${timer.endTimer()}ms")
-          }
-        case _ => throw new UnsupportedOperationException(s"Unsupported 
compaction operation: $operation")
+
+      if (operation.isSchedule) {
+        val instantTime = HoodieActiveTimeline.createNewInstantTime
+        if (client.scheduleCompactionAtInstant(instantTime, 
HOption.empty[java.util.Map[String, String]])) {
+          filteredPendingCompactionInstants = Seq(instantTime)
+        }
+      }
+
+      logInfo(s"Compaction instants to run: 
${filteredPendingCompactionInstants.mkString(",")}.")
+
+      if (operation.isExecute) {
+        val timer = HoodieTimer.start
+        filteredPendingCompactionInstants.foreach { compactionInstant =>
+          val writeResponse = client.compact(compactionInstant)
+          handleResponse(writeResponse.getCommitMetadata.get())
+          client.commitCompaction(compactionInstant, 
writeResponse.getCommitMetadata.get(), HOption.empty())
+        }
+        logInfo(s"Finish Run compaction at instants: 
[${filteredPendingCompactionInstants.mkString(",")}]," +
+          s" spend: ${timer.endTimer()}ms")
       }
 
       val compactionInstants = 
metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
-        .filter(instant => 
willCompactionInstants.contains(instant.getTimestamp))
+        .filter(instant => 
filteredPendingCompactionInstants.contains(instant.getTimestamp))
         .toSeq
         .sortBy(p => p.getTimestamp)
         .reverse
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index a51912b5feb..ea9588419b3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -122,7 +122,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
       assertResult(2)(spark.sql(s"show compaction on 
'${tmp.getCanonicalPath}'").collect().length)
 
       checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
-        s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, 
Available pending compaction instants are:  "
+        s"specific 12345 instants is not exist"
       )
     })
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 7b8dc8f8a90..b4cfc8e9d8c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -19,8 +19,6 @@
 
 package org.apache.spark.sql.hudi.procedure
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
@@ -29,6 +27,9 @@ import 
org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstan
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, 
HoodieDataSourceHelpers, HoodieFileIndex}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
 import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, 
StructField, StructType}
 import org.apache.spark.sql.{Dataset, Row}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index 6a6e74e8b72..02e9406cdde 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -19,8 +19,11 @@
 
 package org.apache.spark.sql.hudi.procedure
 
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieInstant
 
+import org.apache.hadoop.conf.Configuration
+
 class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
 
   test("Test Call run_compaction Procedure by Table") {
@@ -170,7 +173,7 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
       assertResult(2)(spark.sql(s"call show_compaction(path => 
'${tmp.getCanonicalPath}')").collect().length)
 
       checkException(s"call run_compaction(op => 'run', path => 
'${tmp.getCanonicalPath}', timestamp => 12345L)")(
-        s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, 
Available pending compaction instants are:  "
+        s"specific 12345 instants is not exist"
       )
     }
   }
@@ -196,13 +199,9 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
            | location '${tmp.getCanonicalPath}/$tableName1'
        """.stripMargin)
       spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
-
-      spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
-
-      spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
-
-      spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
-
+      spark.sql(s"update $tableName1 set name = 'a2' where id = 1")
+      spark.sql(s"update $tableName1 set name = 'a3' where id = 1")
+      spark.sql(s"update $tableName1 set name = 'a4' where id = 1")
       assertResult(2)(spark.sql(s"call show_compaction(path => 
'${tmp.getCanonicalPath}/$tableName1')").collect().length)
     }
   }
@@ -228,8 +227,8 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
        """.stripMargin)
 
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        spark.sql(s"insert into $tableName values(1, 'a2', 10, 1000)")
-        spark.sql(s"insert into $tableName values(1, 'a3', 10, 1000)")
+        spark.sql(s"update $tableName set name = 'a2' where id = 1")
+        spark.sql(s"update $tableName set name = 'a3' where id = 1")
 
         val result1 = spark.sql(
           s"""call run_compaction(table => '$tableName', op => 'run', options 
=> "
@@ -239,7 +238,7 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
           .collect()
         assertResult(0)(result1.length)
 
-        spark.sql(s"insert into $tableName values(1, 'a4', 10, 1000)")
+        spark.sql(s"update $tableName set name = 'a4' where id = 1")
         val result2 = spark.sql(
           s"""call run_compaction(table => '$tableName', op => 'run', options 
=> "
              | 
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy,
@@ -250,4 +249,78 @@ class TestCompactionProcedure extends 
HoodieSparkProcedureTestBase {
       }
     }
   }
+
+  test("Test Call run_compaction Procedure with specific instants") {
+    withSQLConf("hoodie.compact.inline" -> "false", 
"hoodie.compact.inline.max.delta.commits" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"update $tableName set name = 'a2' where id = 1")
+
+        spark.sql(s"call run_compaction(table => '$tableName', op => 
'schedule')")
+
+        val metaClient = HoodieTableMetaClient.builder.setConf(new 
Configuration).setBasePath(tmp.getCanonicalPath).build
+        val instants = 
metaClient.getActiveTimeline.filterPendingCompactionTimeline().getInstants
+        assertResult(1)(instants.size())
+        val ts = instants.get(0).getTimestamp
+        assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName', 
op => 'execute', instants => '$ts')").collect().length)
+
+        checkExceptionContain(
+          s"call run_compaction(table => '$tableName', op => 'execute', 
instants => '000000')"
+        )("specific 000000 instants is not exist")
+      }
+    }
+  }
+
+  test("Test Call run_compaction Procedure with operation") {
+    withSQLConf("hoodie.compact.inline" -> "false", 
"hoodie.compact.inline.max.delta.commits" -> "1") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"update $tableName set name = 'a2' where id = 1")
+
+        assertResult(0)(spark.sql(s"call run_compaction(table => '$tableName', 
op => 'execute')").collect().length)
+
+        assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName', 
op => 'schedule')").collect().length)
+
+        assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName', 
op => 'execute')").collect().length)
+
+        spark.sql(s"update $tableName set name = 'a3' where id = 1")
+
+        assertResult(1)(spark.sql(s"call run_compaction(table => '$tableName', 
op => 'scheduleAndExecute')").collect().length)
+      }
+    }
+  }
 }

Reply via email to