This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 927a668c370d61171e9fbbcd8416783f3233acbe
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Dec 11 10:17:07 2023 +0800

    [spark] Optimize compact procedure in non-sort scenario (#2457)
---
 docs/content/maintenance/dedicated-compaction.md   |   4 +-
 .../apache/paimon/predicate/PredicateBuilder.java  |   7 +
 .../AppendOnlyTableCompactionCoordinator.java      |   4 +-
 .../AppendOnlyTableCompactionCoordinatorTest.java  |   2 +-
 .../UnawareBucketCompactionTopoBuilder.java        |  20 +--
 .../paimon/spark/procedure/CompactProcedure.java   | 200 +++++++++++++++++++--
 .../org/apache/paimon/spark/sort/TableSorter.java  |   6 +-
 .../spark/procedure/CompactProcedureTest.scala     | 130 +++++++++++++-
 .../paimon/spark/sql/PaimonPushDownTest.scala      |   3 +-
 9 files changed, 332 insertions(+), 44 deletions(-)

diff --git a/docs/content/maintenance/dedicated-compaction.md 
b/docs/content/maintenance/dedicated-compaction.md
index 7e569178b..7f00f7355 100644
--- a/docs/content/maintenance/dedicated-compaction.md
+++ b/docs/content/maintenance/dedicated-compaction.md
@@ -223,8 +223,8 @@ For more usage of the compact_database action, see
 
 ## Sort Compact
 
-If your table is configured with [dynamic bucket]({{< ref 
"concepts/primary-key-table#dynamic-bucket" >}})
-or [append table]({{< ref 
"concepts/append-only-table#append-for-scalable-table" >}}) ,
+If your table is configured with [dynamic bucket primary key table]({{< ref 
"concepts/primary-key-table#dynamic-bucket" >}})
+or [unaware bucket append table]({{< ref 
"concepts/append-only-table#append-for-scalable-table" >}}) ,
 you can trigger a compact with specified column sort to speed up queries.
 
 ```bash  
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index e2d529d6b..ee84d59d8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -360,6 +360,13 @@ public class PredicateBuilder {
         return predicate;
     }
 
+    public static Predicate partitions(List<Map<String, String>> partitions, 
RowType rowType) {
+        return PredicateBuilder.or(
+                partitions.stream()
+                        .map(p -> PredicateBuilder.partition(p, rowType))
+                        .toArray(Predicate[]::new));
+    }
+
     public static Predicate equalPartition(BinaryRow partition, RowType 
partitionType) {
         Preconditions.checkArgument(
                 partition.getFieldCount() == partitionType.getFieldCount(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
index 62a238419..e37d57336 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
@@ -212,12 +212,12 @@ public class AppendOnlyTableCompactionCoordinator {
         }
 
         public boolean readyToRemove() {
-            return toCompact.size() == 0 || age > REMOVE_AGE;
+            return toCompact.isEmpty() || age > REMOVE_AGE;
         }
 
         private List<List<DataFileMeta>> agePack() {
             List<List<DataFileMeta>> packed = pack();
-            if (packed.size() == 0) {
+            if (packed.isEmpty()) {
                 // non-packed, we need to grow up age, and check whether to 
compact once
                 if (++age > COMPACT_AGE && toCompact.size() > 1) {
                     List<DataFileMeta> all = new ArrayList<>(toCompact);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
index 7b20dc7f5..83e5be14c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java
@@ -105,7 +105,7 @@ public class AppendOnlyTableCompactionCoordinatorTest {
             
assertThat(compactionCoordinator.partitionCompactCoordinators.size()).isEqualTo(1);
         }
 
-        // age enough, generate less file comaction
+        // age enough, generate less file compaction
         List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.compactPlan();
         assertThat(tasks.size()).isEqualTo(1);
         assertThat(new HashSet<>(files))
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index 8c92cb1b2..84031474a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -24,7 +24,6 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 
@@ -95,7 +94,12 @@ public class UnawareBucketCompactionTopoBuilder {
         long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
         BucketUnawareCompactSource source =
                 new BucketUnawareCompactSource(
-                        table, isContinuous, scanInterval, 
getPartitionFilter());
+                        table,
+                        isContinuous,
+                        scanInterval,
+                        specifiedPartitions != null
+                                ? 
PredicateBuilder.partitions(specifiedPartitions, table.rowType())
+                                : null);
 
         return BucketUnawareCompactSource.buildSource(env, source, 
isContinuous, tableIdentifier);
     }
@@ -123,16 +127,4 @@ public class UnawareBucketCompactionTopoBuilder {
         }
         return new DataStream<>(env, transformation);
     }
-
-    private Predicate getPartitionFilter() {
-        Predicate partitionPredicate = null;
-        if (specifiedPartitions != null) {
-            partitionPredicate =
-                    PredicateBuilder.or(
-                            specifiedPartitions.stream()
-                                    .map(p -> PredicateBuilder.partition(p, 
table.rowType()))
-                                    .toArray(Predicate[]::new));
-        }
-        return partitionPredicate;
-    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 7239efad3..13c086e24 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -20,16 +20,37 @@ package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.spark.DynamicOverWrite$;
+import org.apache.paimon.spark.SparkUtils;
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
 import org.apache.paimon.spark.sort.TableSorter;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -42,15 +63,26 @@ import org.apache.spark.sql.types.StructType;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
-/** Compact procedure for tables. */
+/**
+ * Compact procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.compact(table => 'tableId', [partitions => 'p1;p2'], 
[order_strategy => 'xxx'], [order_by => 'xxx'])
+ * </code></pre>
+ */
 public class CompactProcedure extends BaseProcedure {
 
     private static final ProcedureParameter[] PARAMETERS =
@@ -85,7 +117,7 @@ public class CompactProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
         Preconditions.checkArgument(args.numFields() >= 1);
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        String partitionFilter = blank(args, 1) ? null : 
toWhere(args.getString(1));
+        String partitions = blank(args, 1) ? null : args.getString(1);
         String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() : 
args.getString(2);
         List<String> sortColumns =
                 blank(args, 3)
@@ -106,14 +138,14 @@ public class CompactProcedure extends BaseProcedure {
                                             (FileStoreTable) table,
                                             sortType,
                                             sortColumns,
-                                            partitionFilter));
+                                            partitions));
                     return new InternalRow[] {internalRow};
                 });
     }
 
     @Override
     public String description() {
-        return "This procedure execute sort compact action on unaware-bucket 
table.";
+        return "This procedure execute compact action on paimon table.";
     }
 
     private boolean blank(InternalRow args, int index) {
@@ -124,28 +156,160 @@ public class CompactProcedure extends BaseProcedure {
             FileStoreTable table,
             String sortType,
             List<String> sortColumns,
-            @Nullable String filter) {
-        CoreOptions coreOptions = table.store().options();
-
-        // sort only works with bucket=-1 yet
-        if 
(!TableSorter.OrderType.of(sortType).equals(TableSorter.OrderType.NONE)) {
-            if (!(table instanceof AppendOnlyFileStoreTable) || 
coreOptions.bucket() != -1) {
-                throw new UnsupportedOperationException(
-                        "Spark compact with sort_type "
-                                + sortType
-                                + " only support unaware-bucket append-only 
table yet.");
+            @Nullable String partitions) {
+        table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        BucketMode bucketMode = table.bucketMode();
+        TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
+
+        if (orderType.equals(TableSorter.OrderType.NONE)) {
+            JavaSparkContext javaSparkContext = new 
JavaSparkContext(spark().sparkContext());
+            Predicate filter =
+                    StringUtils.isBlank(partitions)
+                            ? null
+                            : PredicateBuilder.partitions(
+                                    ParameterUtils.getPartitions(partitions), 
table.rowType());
+            switch (bucketMode) {
+                case FIXED:
+                case DYNAMIC:
+                    compactAwareBucketTable(table, filter, javaSparkContext);
+                    break;
+                case UNAWARE:
+                    compactUnAwareBucketTable(table, filter, javaSparkContext);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Spark compact with " + bucketMode + " is not 
support yet.");
+            }
+        } else {
+            switch (bucketMode) {
+                case UNAWARE:
+                    sortCompactUnAwareBucketTable(table, orderType, 
sortColumns, partitions);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Spark compact with sort_type "
+                                    + sortType
+                                    + " only support unaware-bucket 
append-only table yet.");
             }
         }
+        return true;
+    }
+
+    private void compactAwareBucketTable(
+            FileStoreTable table, @Nullable Predicate filter, JavaSparkContext 
javaSparkContext) {
+        InnerTableScan scan = table.newScan();
+        if (filter != null) {
+            scan.withFilter(filter);
+        }
+
+        List<Pair<BinaryRow, Integer>> partitionBuckets =
+                scan.plan().splits().stream()
+                        .map(split -> (DataSplit) split)
+                        .map(dataSplit -> Pair.of(dataSplit.partition(), 
dataSplit.bucket()))
+                        .distinct()
+                        .collect(Collectors.toList());
+
+        if (partitionBuckets.isEmpty()) {
+            return;
+        }
 
-        Dataset<Row> row = 
spark().read().format("paimon").load(coreOptions.path().toString());
-        row = StringUtils.isBlank(filter) ? row : row.where(filter);
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        JavaRDD<CommitMessage> commitMessageJavaRDD =
+                javaSparkContext
+                        .parallelize(partitionBuckets)
+                        .mapPartitions(
+                                (FlatMapFunction<Iterator<Pair<BinaryRow, 
Integer>>, CommitMessage>)
+                                        pairIterator -> {
+                                            IOManager ioManager = 
SparkUtils.createIOManager();
+                                            BatchTableWrite write = 
writeBuilder.newWrite();
+                                            write.withIOManager(ioManager);
+                                            try {
+                                                while (pairIterator.hasNext()) 
{
+                                                    Pair<BinaryRow, Integer> 
pair =
+                                                            
pairIterator.next();
+                                                    write.compact(
+                                                            pair.getLeft(), 
pair.getRight(), true);
+                                                }
+                                                return 
write.prepareCommit().iterator();
+                                            } finally {
+                                                write.close();
+                                                ioManager.close();
+                                            }
+                                        });
+
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            commit.commit(commitMessageJavaRDD.collect());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void compactUnAwareBucketTable(
+            FileStoreTable table, @Nullable Predicate filter, JavaSparkContext 
javaSparkContext) {
+        AppendOnlyFileStoreTable fileStoreTable = (AppendOnlyFileStoreTable) 
table;
+        List<AppendOnlyCompactionTask> compactionTasks =
+                new AppendOnlyTableCompactionCoordinator(fileStoreTable, 
false, filter).run();
+        if (compactionTasks.isEmpty()) {
+            return;
+        }
+
+        CompactionTaskSerializer serializer = new CompactionTaskSerializer();
+        List<byte[]> serializedTasks = new ArrayList<>();
+        try {
+            for (AppendOnlyCompactionTask compactionTask : compactionTasks) {
+                serializedTasks.add(serializer.serialize(compactionTask));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("serialize compaction task failed");
+        }
+
+        String commitUser = UUID.randomUUID().toString();
+        JavaRDD<CommitMessage> commitMessageJavaRDD =
+                javaSparkContext
+                        .parallelize(serializedTasks)
+                        .mapPartitions(
+                                (FlatMapFunction<Iterator<byte[]>, 
CommitMessage>)
+                                        taskIterator -> {
+                                            AppendOnlyFileStoreWrite write =
+                                                    
fileStoreTable.store().newWrite(commitUser);
+                                            CompactionTaskSerializer ser =
+                                                    new 
CompactionTaskSerializer();
+                                            ArrayList<CommitMessage> messages 
= new ArrayList<>();
+                                            try {
+                                                while (taskIterator.hasNext()) 
{
+                                                    AppendOnlyCompactionTask 
task =
+                                                            ser.deserialize(
+                                                                    
ser.getVersion(),
+                                                                    
taskIterator.next());
+                                                    
messages.add(task.doCompact(write));
+                                                }
+                                                return messages.iterator();
+                                            } finally {
+                                                write.close();
+                                            }
+                                        });
+
+        try (TableCommitImpl commit = table.newCommit(commitUser)) {
+            commit.commit(commitMessageJavaRDD.collect());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void sortCompactUnAwareBucketTable(
+            FileStoreTable table,
+            TableSorter.OrderType orderType,
+            List<String> sortColumns,
+            @Nullable String partitions) {
+        Dataset<Row> row =
+                
spark().read().format("paimon").load(table.coreOptions().path().toString());
+        row = StringUtils.isBlank(partitions) ? row : 
row.where(toWhere(partitions));
         new WriteIntoPaimonTable(
                         table,
                         DynamicOverWrite$.MODULE$,
-                        TableSorter.getSorter(table, sortType, 
sortColumns).sort(row),
+                        TableSorter.getSorter(table, orderType, 
sortColumns).sort(row),
                         new Options())
                 .run(spark());
-        return true;
     }
 
     @VisibleForTesting
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
index d6cdbefa8..b480dc8d4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
@@ -62,8 +62,8 @@ public abstract class TableSorter {
     public abstract Dataset<Row> sort(Dataset<Row> input);
 
     public static TableSorter getSorter(
-            FileStoreTable table, String sortStrategy, List<String> 
orderColumns) {
-        switch (OrderType.of(sortStrategy)) {
+            FileStoreTable table, TableSorter.OrderType orderType, 
List<String> orderColumns) {
+        switch (orderType) {
             case ORDER:
                 return new OrderSorter(table, orderColumns);
             case ZORDER:
@@ -79,7 +79,7 @@ public abstract class TableSorter {
                     }
                 };
             default:
-                throw new IllegalArgumentException("cannot match order type: " 
+ sortStrategy);
+                throw new IllegalArgumentException("cannot match order type: " 
+ orderType);
         }
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
index 5e826237b..788c8dd3e 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -17,17 +17,18 @@
  */
 package org.apache.paimon.spark.procedure
 
+import org.apache.paimon.Snapshot.CommitKind
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.AbstractFileStoreTable
 
 import org.apache.spark.sql.{Dataset, Row}
-import org.apache.spark.sql.SparkSession.setActiveSession
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming.StreamTest
 import org.assertj.core.api.Assertions
 
 import java.util
 
-/** Test sort compact procedure. See {@link CompactProcedure}. */
+/** Test sort compact procedure. See [[CompactProcedure]]. */
 class CompactProcedureTest extends PaimonSparkTestBase with StreamTest {
 
   import testImplicits._
@@ -257,7 +258,122 @@ class CompactProcedureTest extends PaimonSparkTestBase 
with StreamTest {
     }
   }
 
-  test("Piamon test: toWhere method in CompactProcedure") {
+  test("Paimon Procedure: compact aware bucket pk table") {
+    Seq(1, -1).foreach(
+      bucket => {
+        withTable("T") {
+          spark.sql(
+            s"""
+               |CREATE TABLE T (id INT, value STRING, pt STRING)
+               |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='$bucket', 
'write-only'='true')
+               |PARTITIONED BY (pt)
+               |""".stripMargin)
+
+          val table = loadTable("T")
+
+          spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+          spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
+
+          spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')")
+          
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+          Assertions.assertThat(lastSnapshotId(table)).isEqualTo(3)
+
+          spark.sql(s"CALL sys.compact(table => 'T')")
+          
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+          Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
+
+          // compact condition no longer met
+          spark.sql(s"CALL sys.compact(table => 'T')")
+          Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
+
+          checkAnswer(
+            spark.sql(s"SELECT * FROM T ORDER BY id"),
+            Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: 
Row(4, "d", "p2") :: Nil)
+        }
+      })
+  }
+
+  test("Paimon Procedure: compact aware bucket pk table with many small 
files") {
+    Seq(3, -1).foreach(
+      bucket => {
+        withTable("T") {
+          spark.sql(
+            s"""
+               |CREATE TABLE T (id INT, value STRING, pt STRING)
+               |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='$bucket', 
'write-only'='true',
+               
|'source.split.target-size'='128m','source.split.open-file-cost'='32m') -- 
simulate multiple splits in a single bucket
+               |PARTITIONED BY (pt)
+               |""".stripMargin)
+
+          val table = loadTable("T")
+
+          val count = 100
+          for (i <- 0 until count) {
+            spark.sql(s"INSERT INTO T VALUES ($i, 'a', 'p${i % 2}')")
+          }
+
+          spark.sql(s"CALL sys.compact(table => 'T')")
+          
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+          checkAnswer(spark.sql(s"SELECT COUNT(*) FROM T"), Row(count) :: Nil)
+        }
+      })
+  }
+
+  test("Paimon Procedure: compact unaware bucket append table") {
+    spark.sql(
+      s"""
+         |CREATE TABLE T (id INT, value STRING, pt STRING)
+         |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 
'compaction.min.file-num'='2', 'compaction.max.file-num' = '3')
+         |PARTITIONED BY (pt)
+         |""".stripMargin)
+
+    val table = loadTable("T")
+
+    spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+    spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
+    spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')")
+
+    spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')")
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
+
+    spark.sql(s"CALL sys.compact(table => 'T')")
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)
+
+    // compact condition no longer met
+    spark.sql(s"CALL sys.compact(table => 'T')")
+    Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)
+
+    checkAnswer(
+      spark.sql(s"SELECT * FROM T ORDER BY id"),
+      Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4, 
"d", "p2") :: Row(
+        5,
+        "e",
+        "p1") :: Row(6, "f", "p2") :: Nil)
+  }
+
+  test("Paimon Procedure: compact unaware bucket append table with many small 
files") {
+    spark.sql(
+      s"""
+         |CREATE TABLE T (id INT, value STRING, pt STRING)
+         |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 
'compaction.max.file-num' = '10')
+         |PARTITIONED BY (pt)
+         |""".stripMargin)
+
+    val table = loadTable("T")
+
+    val count = 100
+    for (i <- 0 until count) {
+      spark.sql(s"INSERT INTO T VALUES ($i, 'a', 'p${i % 2}')")
+    }
+
+    spark.sql(s"CALL sys.compact(table => 'T')")
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    checkAnswer(spark.sql(s"SELECT COUNT(*) FROM T"), Row(count) :: Nil)
+  }
+
+  test("Paimon test: toWhere method in CompactProcedure") {
     val conditions = "f0=0,f1=0,f2=0;f0=1,f1=1,f2=1;f0=1,f1=2,f2=2;f3=3"
 
     val where = CompactProcedure.toWhere(conditions)
@@ -266,4 +382,12 @@ class CompactProcedureTest extends PaimonSparkTestBase 
with StreamTest {
 
     Assertions.assertThat(where).isEqualTo(whereExpected)
   }
+
+  def lastSnapshotCommand(table: AbstractFileStoreTable): CommitKind = {
+    table.snapshotManager().latestSnapshot().commitKind()
+  }
+
+  def lastSnapshotId(table: AbstractFileStoreTable): Long = {
+    table.snapshotManager().latestSnapshotId()
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
index 472f41118..4837bca79 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -15,8 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
+import org.apache.paimon.spark.{PaimonSparkTestBase, SparkInputPartition, 
SparkTable}
 import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.sql.Row

Reply via email to