This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b2ed7b6e [spark] Compact procedure support where parameter (#3119)
5b2ed7b6e is described below

commit 5b2ed7b6eb3802ee718c54cac3386952a1e3eae9
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 1 10:46:08 2024 +0800

    [spark] Compact procedure support where parameter (#3119)
---
 docs/content/engines/spark.md                      | 15 +++++-
 .../paimon/spark/procedure/BaseProcedure.java      |  6 +++
 .../paimon/spark/procedure/CompactProcedure.java   | 56 +++++++++++++------
 .../analysis/PaimonMergeIntoResolverBase.scala     |  2 +-
 .../analysis/expressions/ExpressionHelper.scala    | 54 ++++++++++++++++++-
 .../commands/DeleteFromPaimonTableCommand.scala    |  5 +-
 .../paimon/spark/commands/PaimonCommand.scala      | 22 +-------
 .../spark/procedure/CompactProcedureTestBase.scala | 62 +++++++++++++++++++++-
 8 files changed, 177 insertions(+), 45 deletions(-)

diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md
index 4a7fde0b6..ef3ffb429 100644
--- a/docs/content/engines/spark.md
+++ b/docs/content/engines/spark.md
@@ -567,8 +567,19 @@ s
     <tbody style="font-size: 12px; ">
     <tr>
       <td>compact</td>
-      <td>identifier: the target table identifier. Cannot be 
empty.<br><br><nobr>partitions: partition filter. Left empty for all 
partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy: 
'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'. 
<br><br><nobr>order_columns: the columns need to be sort. Left empty if 
'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two 
partitions date=01 and date=02, you need to write 'date=01;date=02'<br> [...]
-      <td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact 
parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0', 
 order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
+      <td>
+         To compact files. Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>partitions: partition filter. "," means "AND"<br>";" means 
"OR".If you want to compact one partition with date=01 and day=01, you need to 
write 'date=01,day=01'. Left empty for all partitions. (Can't be used together 
with "where")</li>
+            <li>where: partition predicate. Left empty for all partitions. 
(Can't be used together with "partitions")</li>          
+            <li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. 
Left empty for 'none'.</li>
+            <li>order_columns: the columns need to be sort. Left empty if 
'order_strategy' is 'none'.</li>
+      </td>
+      <td>
+         SET spark.sql.shuffle.partitions=10; --set the compact parallelism 
<br/>
+         CALL sys.compact(table => 'T', partitions => 'p=0;p=1',  
order_strategy => 'zorder', order_by => 'a,b') <br/>
+         CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy 
=> 'zorder', order_by => 'a,b')
+      </td>
     </tr>
     <tr>
       <td>expire_snapshots</td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
index 38cc42d70..b506b2fa1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
 import org.apache.spark.sql.connector.catalog.CatalogPlugin;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
@@ -108,6 +109,11 @@ abstract class BaseProcedure implements Procedure {
         }
     }
 
+    protected LogicalPlan createRelation(Identifier ident) {
+        return DataSourceV2Relation.create(
+                loadSparkTable(ident), Option.apply(tableCatalog), 
Option.apply(ident));
+    }
+
     protected void refreshSparkCache(Identifier ident, Table table) {
         CacheManager cacheManager = spark.sharedState().cacheManager();
         DataSourceV2Relation relation =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index df56ccfc3..111e9f75c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -26,9 +26,9 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.spark.DynamicOverWrite$;
 import org.apache.paimon.spark.SparkUtils;
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
 import org.apache.paimon.spark.sort.TableSorter;
 import org.apache.paimon.table.BucketMode;
@@ -44,7 +44,6 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParameterUtils;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SerializationUtils;
 import org.apache.paimon.utils.StringUtils;
 
@@ -53,7 +52,11 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Utils;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.plans.logical.Filter;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.types.DataTypes;
@@ -74,13 +77,14 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
  * Compact procedure. Usage:
  *
  * <pre><code>
- *  CALL sys.compact(table => 'tableId', [partitions => 'p1;p2'], 
[order_strategy => 'xxx'], [order_by => 'xxx'])
+ *  CALL sys.compact(table => 'tableId', [partitions => 
'p1=0,p2=0;p1=0,p2=1'], [order_strategy => 'xxx'], [order_by => 'xxx'], [where 
=> 'p1>0'])
  * </code></pre>
  */
 public class CompactProcedure extends BaseProcedure {
@@ -91,6 +95,7 @@ public class CompactProcedure extends BaseProcedure {
                 ProcedureParameter.optional("partitions", StringType),
                 ProcedureParameter.optional("order_strategy", StringType),
                 ProcedureParameter.optional("order_by", StringType),
+                ProcedureParameter.optional("where", StringType)
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -115,7 +120,6 @@ public class CompactProcedure extends BaseProcedure {
 
     @Override
     public InternalRow[] call(InternalRow args) {
-        Preconditions.checkArgument(args.numFields() >= 1);
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String partitions = blank(args, 1) ? null : args.getString(1);
         String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() : 
args.getString(2);
@@ -123,22 +127,40 @@ public class CompactProcedure extends BaseProcedure {
                 blank(args, 3)
                         ? Collections.emptyList()
                         : Arrays.asList(args.getString(3).split(","));
+        String where = blank(args, 4) ? null : args.getString(4);
         if (TableSorter.OrderType.NONE.name().equals(sortType) && 
!sortColumns.isEmpty()) {
             throw new IllegalArgumentException(
                     "order_strategy \"none\" cannot work with order_by 
columns.");
         }
-
+        checkArgument(
+                partitions == null || where == null,
+                "partitions and where cannot be used together.");
+        String finalWhere = partitions != null ? toWhere(partitions) : where;
         return modifyPaimonTable(
                 tableIdent,
                 table -> {
-                    Preconditions.checkArgument(table instanceof 
FileStoreTable);
+                    checkArgument(table instanceof FileStoreTable);
+                    LogicalPlan relation = createRelation(tableIdent);
+                    Expression condition = null;
+                    if (!StringUtils.isBlank(finalWhere)) {
+                        condition = ExpressionHelper.resolveFilter(spark(), 
relation, finalWhere);
+                        checkArgument(
+                                ExpressionHelper.onlyHasPartitionPredicate(
+                                        spark(),
+                                        condition,
+                                        table.partitionKeys().toArray(new 
String[0])),
+                                "Only partition predicate is supported, your 
predicate is %s, but partition keys are %s",
+                                condition,
+                                table.partitionKeys());
+                    }
                     InternalRow internalRow =
                             newInternalRow(
                                     execute(
                                             (FileStoreTable) table,
                                             sortType,
                                             sortColumns,
-                                            partitions));
+                                            relation,
+                                            condition));
                     return new InternalRow[] {internalRow};
                 });
     }
@@ -156,18 +178,18 @@ public class CompactProcedure extends BaseProcedure {
             FileStoreTable table,
             String sortType,
             List<String> sortColumns,
-            @Nullable String partitions) {
+            LogicalPlan relation,
+            @Nullable Expression condition) {
         table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
         BucketMode bucketMode = table.bucketMode();
         TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
-
         if (orderType.equals(TableSorter.OrderType.NONE)) {
             JavaSparkContext javaSparkContext = new 
JavaSparkContext(spark().sparkContext());
             Predicate filter =
-                    StringUtils.isBlank(partitions)
+                    condition == null
                             ? null
-                            : PredicateBuilder.partitions(
-                                    ParameterUtils.getPartitions(partitions), 
table.rowType());
+                            : 
ExpressionHelper.convertConditionToPaimonPredicate(
+                                    condition, relation.output(), 
table.rowType());
             switch (bucketMode) {
                 case FIXED:
                 case DYNAMIC:
@@ -183,7 +205,8 @@ public class CompactProcedure extends BaseProcedure {
         } else {
             switch (bucketMode) {
                 case UNAWARE:
-                    sortCompactUnAwareBucketTable(table, orderType, 
sortColumns, partitions);
+                    sortCompactUnAwareBucketTable(
+                            table, orderType, sortColumns, relation, 
condition);
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -336,10 +359,11 @@ public class CompactProcedure extends BaseProcedure {
             FileStoreTable table,
             TableSorter.OrderType orderType,
             List<String> sortColumns,
-            @Nullable String partitions) {
+            LogicalPlan relation,
+            @Nullable Expression condition) {
         Dataset<Row> row =
-                
spark().read().format("paimon").load(table.coreOptions().path().toString());
-        row = StringUtils.isBlank(partitions) ? row : 
row.where(toWhere(partitions));
+                Utils.createDataset(
+                        spark(), condition == null ? relation : new 
Filter(condition, relation));
         new WriteIntoPaimonTable(
                         table,
                         DynamicOverWrite$.MODULE$,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index a6ce9546c..c36ee4485 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -37,7 +37,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
     val matched = merge.matchedActions
     val notMatched = merge.notMatchedActions
 
-    val resolve: (Expression, LogicalPlan) => Expression = 
resolveExpression(spark) _
+    val resolve: (Expression, LogicalPlan) => Expression = 
resolveExpression(spark)
 
     def resolveMergeAction(action: MergeAction): MergeAction = {
       action match {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index a51a4f381..65f2a04bd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -18,9 +18,14 @@
 
 package org.apache.paimon.spark.catalyst.analysis.expressions
 
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.types.RowType
+
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, 
Expression, GetStructField, Literal, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, 
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, NullType}
 
@@ -106,4 +111,49 @@ object ExpressionHelper {
     override protected def withNewChildrenInternal(
         newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children 
= newChildren)
   }
+
+  def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String): 
Expression = {
+    val unResolvedExpression = 
spark.sessionState.sqlParser.parseExpression(where)
+    val filter = Filter(unResolvedExpression, plan)
+    spark.sessionState.analyzer.execute(filter) match {
+      case filter: Filter => filter.condition
+      case _ => throw new RuntimeException(s"Could not resolve expression 
$where in plan: $plan")
+    }
+  }
+
+  def onlyHasPartitionPredicate(
+      spark: SparkSession,
+      expr: Expression,
+      partitionCols: Array[String]): Boolean = {
+    val resolvedNameEquals = spark.sessionState.analyzer.resolver
+    splitConjunctivePredicates(expr).forall(
+      e =>
+        e.references.forall(r => 
partitionCols.exists(resolvedNameEquals(r.name, _))) &&
+          !SubqueryExpression.hasSubquery(expr))
+  }
+
+  def convertConditionToPaimonPredicate(
+      condition: Expression,
+      output: Seq[Attribute],
+      rowType: RowType): Predicate = {
+    val converter = new SparkFilterConverter(rowType)
+    val filters = normalizeExprs(Seq(condition), output)
+      .flatMap(splitConjunctivePredicates(_).map {
+        f =>
+          translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
+            throw new RuntimeException("Exec update failed:" +
+              s" cannot translate expression to source filter: $f"))
+      })
+      .toArray
+    val predicates = filters.map(converter.convert)
+    PredicateBuilder.and(predicates: _*)
+  }
+
+  def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
+    condition match {
+      case And(cond1, cond2) =>
+        splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
+      case other => other :: Nil
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index ff3aa253c..9f79664be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.options.Options
 import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
 import org.apache.paimon.spark.{InsertInto, SparkTable}
+import 
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
@@ -51,7 +52,9 @@ trait DeleteFromPaimonTableCommandBase extends 
PaimonLeafRunnableCommand with Pa
       (None, false)
     } else {
       try {
-        (Some(convertConditionToPaimonPredicate(condition, relation.output)), 
false)
+        (
+          Some(convertConditionToPaimonPredicate(condition(), relation.output, 
table.rowType())),
+          false)
       } catch {
         case NonFatal(_) =>
           (None, true)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 383e5c2cb..02a0e0cc2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,35 +18,15 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
 import org.apache.paimon.spark.SparkFilterConverter
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
-import java.io.IOException
-
 /** Helper trait for all paimon commands. */
 trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
 
-  protected def convertConditionToPaimonPredicate(
-      condition: Expression,
-      output: Seq[Attribute]): Predicate = {
-    val converter = new SparkFilterConverter(table.rowType)
-    val filters = normalizeExprs(Seq(condition), output)
-      .flatMap(splitConjunctivePredicates(_).map {
-        f =>
-          translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
-            throw new RuntimeException("Exec update failed:" +
-              s" cannot translate expression to source filter: $f"))
-      })
-      .toArray
-    val predicates = filters.map(converter.convert)
-    PredicateBuilder.and(predicates: _*)
-  }
-
   /**
    * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
    * methods where the `AlwaysTrue` Filter is used.
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 78f48d36d..ac3c7d77a 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.procedure
 
 import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.table.FileStoreTable
 
@@ -314,7 +315,7 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
           spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
           spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
 
-          spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')")
+          spark.sql("CALL sys.compact(table => 'T', partitions => 
'pt=\"p1\"')")
           
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
           Assertions.assertThat(lastSnapshotId(table)).isEqualTo(3)
 
@@ -373,7 +374,7 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
     spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')")
 
-    spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')")
+    spark.sql("CALL sys.compact(table => 'T', partitions => 'pt=\"p1\"')")
     
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
     Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
 
@@ -413,6 +414,63 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     checkAnswer(spark.sql(s"SELECT COUNT(*) FROM T"), Row(count) :: Nil)
   }
 
+  test("Paimon Procedure: compact with wrong usage") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, value STRING, pt STRING)
+                 |TBLPROPERTIES ('bucket'='-1', 'write-only'='true')
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    assert(intercept[IllegalArgumentException] {
+      spark.sql(
+        "CALL sys.compact(table => 'T', partitions => 'pt = \"p1\"', where => 
'pt = \"p1\"')")
+    }.getMessage.contains("partitions and where cannot be used together"))
+
+    assert(intercept[IllegalArgumentException] {
+      spark.sql("CALL sys.compact(table => 'T', partitions => 'id = 1')")
+    }.getMessage.contains("Only partition predicate is supported"))
+
+    assert(intercept[IllegalArgumentException] {
+      spark.sql("CALL sys.compact(table => 'T', where => 'id > 1 AND pt = 
\"p1\"')")
+    }.getMessage.contains("Only partition predicate is supported"))
+  }
+
+  test("Paimon Procedure: compact with where") {
+    spark.sql(
+      s"""
+         |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+         |TBLPROPERTIES ('bucket'='1', 'write-only'='true', 
'compaction.min.file-num'='1', 'compaction.max.file-num'='2')
+         |PARTITIONED BY (dt, hh)
+         |""".stripMargin)
+
+    val table = loadTable("T")
+    val fileIO = table.fileIO()
+
+    spark.sql(s"INSERT INTO T VALUES (1, '1', '2024-01-01', 0), (2, '2', 
'2024-01-01', 1)")
+    spark.sql(s"INSERT INTO T VALUES (3, '3', '2024-01-01', 0), (4, '4', 
'2024-01-01', 1)")
+    spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6', 
'2024-01-02', 1)")
+    spark.sql(s"INSERT INTO T VALUES (7, '7', '2024-01-02', 0), (8, '8', 
'2024-01-02', 1)")
+
+    spark.sql("CALL sys.compact(table => 'T', where => 'dt = \"2024-01-01\" 
and hh >= 1')")
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    Assertions
+      .assertThat(
+        fileIO.listStatus(new Path(table.location(), 
"dt=2024-01-01/hh=0/bucket-0")).length)
+      .isEqualTo(2)
+    Assertions
+      .assertThat(
+        fileIO.listStatus(new Path(table.location(), 
"dt=2024-01-01/hh=1/bucket-0")).length)
+      .isEqualTo(3)
+    Assertions
+      .assertThat(
+        fileIO.listStatus(new Path(table.location(), 
"dt=2024-01-02/hh=0/bucket-0")).length)
+      .isEqualTo(2)
+    Assertions
+      .assertThat(
+        fileIO.listStatus(new Path(table.location(), 
"dt=2024-01-02/hh=1/bucket-0")).length)
+      .isEqualTo(2)
+  }
+
   test("Paimon test: toWhere method in CompactProcedure") {
     val conditions = "f0=0,f1=0,f2=0;f0=1,f1=1,f2=1;f0=1,f1=2,f2=2;f3=3"
 

Reply via email to