This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5b2ed7b6e [spark] Compact procedure support where parameter (#3119)
5b2ed7b6e is described below
commit 5b2ed7b6eb3802ee718c54cac3386952a1e3eae9
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 1 10:46:08 2024 +0800
[spark] Compact procedure support where parameter (#3119)
---
docs/content/engines/spark.md | 15 +++++-
.../paimon/spark/procedure/BaseProcedure.java | 6 +++
.../paimon/spark/procedure/CompactProcedure.java | 56 +++++++++++++------
.../analysis/PaimonMergeIntoResolverBase.scala | 2 +-
.../analysis/expressions/ExpressionHelper.scala | 54 ++++++++++++++++++-
.../commands/DeleteFromPaimonTableCommand.scala | 5 +-
.../paimon/spark/commands/PaimonCommand.scala | 22 +-------
.../spark/procedure/CompactProcedureTestBase.scala | 62 +++++++++++++++++++++-
8 files changed, 177 insertions(+), 45 deletions(-)
diff --git a/docs/content/engines/spark.md b/docs/content/engines/spark.md
index 4a7fde0b6..ef3ffb429 100644
--- a/docs/content/engines/spark.md
+++ b/docs/content/engines/spark.md
@@ -567,8 +567,19 @@ s
<tbody style="font-size: 12px; ">
<tr>
<td>compact</td>
- <td>identifier: the target table identifier. Cannot be
empty.<br><br><nobr>partitions: partition filter. Left empty for all
partitions.<br> "," means "AND"<br>";" means "OR"</nobr><br><br>order_strategy:
'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
<br><br><nobr>order_columns: the columns need to be sort. Left empty if
'order_strategy' is 'none'. </nobr><br><br>If you want sort compact two
partitions date=01 and date=02, you need to write 'date=01;date=02'<br> [...]
- <td><nobr>SET spark.sql.shuffle.partitions=10; --set the compact
parallelism</nobr><br><nobr>CALL sys.compact(table => 'T', partitions => 'p=0',
order_strategy => 'zorder', order_by => 'a,b')</nobr></td>
+ <td>
+ To compact files. Argument:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>partitions: partition filter. "," means "AND"<br>";" means
"OR".If you want to compact one partition with date=01 and day=01, you need to
write 'date=01,day=01'. Left empty for all partitions. (Can't be used together
with "where")</li>
+ <li>where: partition predicate. Left empty for all partitions.
(Can't be used together with "partitions")</li>
+ <li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'.
Left empty for 'none'.</li>
+ <li>order_columns: the columns need to be sort. Left empty if
'order_strategy' is 'none'.</li>
+ </td>
+ <td>
+ SET spark.sql.shuffle.partitions=10; --set the compact parallelism
<br/>
+ CALL sys.compact(table => 'T', partitions => 'p=0;p=1',
order_strategy => 'zorder', order_by => 'a,b') <br/>
+ CALL sys.compact(table => 'T', where => 'p>0 and p<3', order_strategy
=> 'zorder', order_by => 'a,b')
+ </td>
</tr>
<tr>
<td>expire_snapshots</td>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
index 38cc42d70..b506b2fa1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
@@ -108,6 +109,11 @@ abstract class BaseProcedure implements Procedure {
}
}
+ protected LogicalPlan createRelation(Identifier ident) {
+ return DataSourceV2Relation.create(
+ loadSparkTable(ident), Option.apply(tableCatalog),
Option.apply(ident));
+ }
+
protected void refreshSparkCache(Identifier ident, Table table) {
CacheManager cacheManager = spark.sharedState().cacheManager();
DataSourceV2Relation relation =
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index df56ccfc3..111e9f75c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -26,9 +26,9 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
@@ -44,7 +44,6 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.StringUtils;
@@ -53,7 +52,11 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Utils;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.plans.logical.Filter;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
@@ -74,13 +77,14 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
* Compact procedure. Usage:
*
* <pre><code>
- * CALL sys.compact(table => 'tableId', [partitions => 'p1;p2'],
[order_strategy => 'xxx'], [order_by => 'xxx'])
+ * CALL sys.compact(table => 'tableId', [partitions =>
'p1=0,p2=0;p1=0,p2=1'], [order_strategy => 'xxx'], [order_by => 'xxx'], [where
=> 'p1>0'])
* </code></pre>
*/
public class CompactProcedure extends BaseProcedure {
@@ -91,6 +95,7 @@ public class CompactProcedure extends BaseProcedure {
ProcedureParameter.optional("partitions", StringType),
ProcedureParameter.optional("order_strategy", StringType),
ProcedureParameter.optional("order_by", StringType),
+ ProcedureParameter.optional("where", StringType)
};
private static final StructType OUTPUT_TYPE =
@@ -115,7 +120,6 @@ public class CompactProcedure extends BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
- Preconditions.checkArgument(args.numFields() >= 1);
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
String partitions = blank(args, 1) ? null : args.getString(1);
String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() :
args.getString(2);
@@ -123,22 +127,40 @@ public class CompactProcedure extends BaseProcedure {
blank(args, 3)
? Collections.emptyList()
: Arrays.asList(args.getString(3).split(","));
+ String where = blank(args, 4) ? null : args.getString(4);
if (TableSorter.OrderType.NONE.name().equals(sortType) &&
!sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by
columns.");
}
-
+ checkArgument(
+ partitions == null || where == null,
+ "partitions and where cannot be used together.");
+ String finalWhere = partitions != null ? toWhere(partitions) : where;
return modifyPaimonTable(
tableIdent,
table -> {
- Preconditions.checkArgument(table instanceof
FileStoreTable);
+ checkArgument(table instanceof FileStoreTable);
+ LogicalPlan relation = createRelation(tableIdent);
+ Expression condition = null;
+ if (!StringUtils.isBlank(finalWhere)) {
+ condition = ExpressionHelper.resolveFilter(spark(),
relation, finalWhere);
+ checkArgument(
+ ExpressionHelper.onlyHasPartitionPredicate(
+ spark(),
+ condition,
+ table.partitionKeys().toArray(new
String[0])),
+ "Only partition predicate is supported, your
predicate is %s, but partition keys are %s",
+ condition,
+ table.partitionKeys());
+ }
InternalRow internalRow =
newInternalRow(
execute(
(FileStoreTable) table,
sortType,
sortColumns,
- partitions));
+ relation,
+ condition));
return new InternalRow[] {internalRow};
});
}
@@ -156,18 +178,18 @@ public class CompactProcedure extends BaseProcedure {
FileStoreTable table,
String sortType,
List<String> sortColumns,
- @Nullable String partitions) {
+ LogicalPlan relation,
+ @Nullable Expression condition) {
table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
-
if (orderType.equals(TableSorter.OrderType.NONE)) {
JavaSparkContext javaSparkContext = new
JavaSparkContext(spark().sparkContext());
Predicate filter =
- StringUtils.isBlank(partitions)
+ condition == null
? null
- : PredicateBuilder.partitions(
- ParameterUtils.getPartitions(partitions),
table.rowType());
+ :
ExpressionHelper.convertConditionToPaimonPredicate(
+ condition, relation.output(),
table.rowType());
switch (bucketMode) {
case FIXED:
case DYNAMIC:
@@ -183,7 +205,8 @@ public class CompactProcedure extends BaseProcedure {
} else {
switch (bucketMode) {
case UNAWARE:
- sortCompactUnAwareBucketTable(table, orderType,
sortColumns, partitions);
+ sortCompactUnAwareBucketTable(
+ table, orderType, sortColumns, relation,
condition);
break;
default:
throw new UnsupportedOperationException(
@@ -336,10 +359,11 @@ public class CompactProcedure extends BaseProcedure {
FileStoreTable table,
TableSorter.OrderType orderType,
List<String> sortColumns,
- @Nullable String partitions) {
+ LogicalPlan relation,
+ @Nullable Expression condition) {
Dataset<Row> row =
-
spark().read().format("paimon").load(table.coreOptions().path().toString());
- row = StringUtils.isBlank(partitions) ? row :
row.where(toWhere(partitions));
+ Utils.createDataset(
+ spark(), condition == null ? relation : new
Filter(condition, relation));
new WriteIntoPaimonTable(
table,
DynamicOverWrite$.MODULE$,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index a6ce9546c..c36ee4485 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -37,7 +37,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
val matched = merge.matchedActions
val notMatched = merge.notMatchedActions
- val resolve: (Expression, LogicalPlan) => Expression =
resolveExpression(spark) _
+ val resolve: (Expression, LogicalPlan) => Expression =
resolveExpression(spark)
def resolveMergeAction(action: MergeAction): MergeAction = {
action match {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index a51a4f381..65f2a04bd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -18,9 +18,14 @@
package org.apache.paimon.spark.catalyst.analysis.expressions
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.types.RowType
+
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast,
Expression, GetStructField, Literal, PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast,
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, NullType}
@@ -106,4 +111,49 @@ object ExpressionHelper {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children
= newChildren)
}
+
+ def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String):
Expression = {
+ val unResolvedExpression =
spark.sessionState.sqlParser.parseExpression(where)
+ val filter = Filter(unResolvedExpression, plan)
+ spark.sessionState.analyzer.execute(filter) match {
+ case filter: Filter => filter.condition
+ case _ => throw new RuntimeException(s"Could not resolve expression
$where in plan: $plan")
+ }
+ }
+
+ def onlyHasPartitionPredicate(
+ spark: SparkSession,
+ expr: Expression,
+ partitionCols: Array[String]): Boolean = {
+ val resolvedNameEquals = spark.sessionState.analyzer.resolver
+ splitConjunctivePredicates(expr).forall(
+ e =>
+ e.references.forall(r =>
partitionCols.exists(resolvedNameEquals(r.name, _))) &&
+ !SubqueryExpression.hasSubquery(expr))
+ }
+
+ def convertConditionToPaimonPredicate(
+ condition: Expression,
+ output: Seq[Attribute],
+ rowType: RowType): Predicate = {
+ val converter = new SparkFilterConverter(rowType)
+ val filters = normalizeExprs(Seq(condition), output)
+ .flatMap(splitConjunctivePredicates(_).map {
+ f =>
+ translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
+ throw new RuntimeException("Exec update failed:" +
+ s" cannot translate expression to source filter: $f"))
+ })
+ .toArray
+ val predicates = filters.map(converter.convert)
+ PredicateBuilder.and(predicates: _*)
+ }
+
+ def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
+ condition match {
+ case And(cond1, cond2) =>
+ splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
+ case other => other :: Nil
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index ff3aa253c..9f79664be 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
+import
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
@@ -51,7 +52,9 @@ trait DeleteFromPaimonTableCommandBase extends
PaimonLeafRunnableCommand with Pa
(None, false)
} else {
try {
- (Some(convertConditionToPaimonPredicate(condition, relation.output)),
false)
+ (
+ Some(convertConditionToPaimonPredicate(condition(), relation.output,
table.rowType())),
+ false)
} catch {
case NonFatal(_) =>
(None, true)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 383e5c2cb..02a0e0cc2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,35 +18,15 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
-import java.io.IOException
-
/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
- protected def convertConditionToPaimonPredicate(
- condition: Expression,
- output: Seq[Attribute]): Predicate = {
- val converter = new SparkFilterConverter(table.rowType)
- val filters = normalizeExprs(Seq(condition), output)
- .flatMap(splitConjunctivePredicates(_).map {
- f =>
- translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
- throw new RuntimeException("Exec update failed:" +
- s" cannot translate expression to source filter: $f"))
- })
- .toArray
- val predicates = filters.map(converter.convert)
- PredicateBuilder.and(predicates: _*)
- }
-
/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
* methods where the `AlwaysTrue` Filter is used.
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 78f48d36d..ac3c7d77a 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure
import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.table.FileStoreTable
@@ -314,7 +315,7 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
- spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')")
+ spark.sql("CALL sys.compact(table => 'T', partitions =>
'pt=\"p1\"')")
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(3)
@@ -373,7 +374,7 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')")
- spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')")
+ spark.sql("CALL sys.compact(table => 'T', partitions => 'pt=\"p1\"')")
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
@@ -413,6 +414,63 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
checkAnswer(spark.sql(s"SELECT COUNT(*) FROM T"), Row(count) :: Nil)
}
+ test("Paimon Procedure: compact with wrong usage") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, value STRING, pt STRING)
+ |TBLPROPERTIES ('bucket'='-1', 'write-only'='true')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ assert(intercept[IllegalArgumentException] {
+ spark.sql(
+ "CALL sys.compact(table => 'T', partitions => 'pt = \"p1\"', where =>
'pt = \"p1\"')")
+ }.getMessage.contains("partitions and where cannot be used together"))
+
+ assert(intercept[IllegalArgumentException] {
+ spark.sql("CALL sys.compact(table => 'T', partitions => 'id = 1')")
+ }.getMessage.contains("Only partition predicate is supported"))
+
+ assert(intercept[IllegalArgumentException] {
+ spark.sql("CALL sys.compact(table => 'T', where => 'id > 1 AND pt =
\"p1\"')")
+ }.getMessage.contains("Only partition predicate is supported"))
+ }
+
+ test("Paimon Procedure: compact with where") {
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
+ |TBLPROPERTIES ('bucket'='1', 'write-only'='true',
'compaction.min.file-num'='1', 'compaction.max.file-num'='2')
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+ val fileIO = table.fileIO()
+
+ spark.sql(s"INSERT INTO T VALUES (1, '1', '2024-01-01', 0), (2, '2',
'2024-01-01', 1)")
+ spark.sql(s"INSERT INTO T VALUES (3, '3', '2024-01-01', 0), (4, '4',
'2024-01-01', 1)")
+ spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6',
'2024-01-02', 1)")
+ spark.sql(s"INSERT INTO T VALUES (7, '7', '2024-01-02', 0), (8, '8',
'2024-01-02', 1)")
+
+ spark.sql("CALL sys.compact(table => 'T', where => 'dt = \"2024-01-01\"
and hh >= 1')")
+
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+ Assertions
+ .assertThat(
+ fileIO.listStatus(new Path(table.location(),
"dt=2024-01-01/hh=0/bucket-0")).length)
+ .isEqualTo(2)
+ Assertions
+ .assertThat(
+ fileIO.listStatus(new Path(table.location(),
"dt=2024-01-01/hh=1/bucket-0")).length)
+ .isEqualTo(3)
+ Assertions
+ .assertThat(
+ fileIO.listStatus(new Path(table.location(),
"dt=2024-01-02/hh=0/bucket-0")).length)
+ .isEqualTo(2)
+ Assertions
+ .assertThat(
+ fileIO.listStatus(new Path(table.location(),
"dt=2024-01-02/hh=1/bucket-0")).length)
+ .isEqualTo(2)
+ }
+
test("Paimon test: toWhere method in CompactProcedure") {
val conditions = "f0=0,f1=0,f2=0;f0=1,f1=1,f2=1;f0=1,f1=2,f2=2;f3=3"