This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 927a668c370d61171e9fbbcd8416783f3233acbe Author: Zouxxyy <[email protected]> AuthorDate: Mon Dec 11 10:17:07 2023 +0800 [spark] Optimize compact procedure in non-sort scenario (#2457) --- docs/content/maintenance/dedicated-compaction.md | 4 +- .../apache/paimon/predicate/PredicateBuilder.java | 7 + .../AppendOnlyTableCompactionCoordinator.java | 4 +- .../AppendOnlyTableCompactionCoordinatorTest.java | 2 +- .../UnawareBucketCompactionTopoBuilder.java | 20 +-- .../paimon/spark/procedure/CompactProcedure.java | 200 +++++++++++++++++++-- .../org/apache/paimon/spark/sort/TableSorter.java | 6 +- .../spark/procedure/CompactProcedureTest.scala | 130 +++++++++++++- .../paimon/spark/sql/PaimonPushDownTest.scala | 3 +- 9 files changed, 332 insertions(+), 44 deletions(-) diff --git a/docs/content/maintenance/dedicated-compaction.md b/docs/content/maintenance/dedicated-compaction.md index 7e569178b..7f00f7355 100644 --- a/docs/content/maintenance/dedicated-compaction.md +++ b/docs/content/maintenance/dedicated-compaction.md @@ -223,8 +223,8 @@ For more usage of the compact_database action, see ## Sort Compact -If your table is configured with [dynamic bucket]({{< ref "concepts/primary-key-table#dynamic-bucket" >}}) -or [append table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}) , +If your table is configured with [dynamic bucket primary key table]({{< ref "concepts/primary-key-table#dynamic-bucket" >}}) +or [unaware bucket append table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}) , you can trigger a compact with specified column sort to speed up queries. ```bash diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index e2d529d6b..ee84d59d8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -360,6 +360,13 @@ public class PredicateBuilder { return predicate; } + public static Predicate partitions(List<Map<String, String>> partitions, RowType rowType) { + return PredicateBuilder.or( + partitions.stream() + .map(p -> PredicateBuilder.partition(p, rowType)) + .toArray(Predicate[]::new)); + } + public static Predicate equalPartition(BinaryRow partition, RowType partitionType) { Preconditions.checkArgument( partition.getFieldCount() == partitionType.getFieldCount(), diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java index 62a238419..e37d57336 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java @@ -212,12 +212,12 @@ public class AppendOnlyTableCompactionCoordinator { } public boolean readyToRemove() { - return toCompact.size() == 0 || age > REMOVE_AGE; + return toCompact.isEmpty() || age > REMOVE_AGE; } private List<List<DataFileMeta>> agePack() { List<List<DataFileMeta>> packed = pack(); - if (packed.size() == 0) { + if (packed.isEmpty()) { // non-packed, we need to grow up age, and check whether to compact once if (++age > COMPACT_AGE && toCompact.size() > 1) { List<DataFileMeta> all = new ArrayList<>(toCompact); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java index 7b20dc7f5..83e5be14c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java @@ -105,7 +105,7 @@ public class AppendOnlyTableCompactionCoordinatorTest { assertThat(compactionCoordinator.partitionCompactCoordinators.size()).isEqualTo(1); } - // age enough, generate less file comaction + // age enough, generate less file compaction List<AppendOnlyCompactionTask> tasks = compactionCoordinator.compactPlan(); assertThat(tasks.size()).isEqualTo(1); assertThat(new HashSet<>(files)) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java index 8c92cb1b2..84031474a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java @@ -24,7 +24,6 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.UnawareBucketCompactionSink; import org.apache.paimon.flink.source.BucketUnawareCompactSource; import org.apache.paimon.options.Options; -import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.AppendOnlyFileStoreTable; @@ -95,7 +94,12 @@ public class UnawareBucketCompactionTopoBuilder { long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis(); BucketUnawareCompactSource source = new BucketUnawareCompactSource( - table, isContinuous, scanInterval, getPartitionFilter()); + table, + isContinuous, + scanInterval, + specifiedPartitions != null + ? PredicateBuilder.partitions(specifiedPartitions, table.rowType()) + : null); return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier); } @@ -123,16 +127,4 @@ public class UnawareBucketCompactionTopoBuilder { } return new DataStream<>(env, transformation); } - - private Predicate getPartitionFilter() { - Predicate partitionPredicate = null; - if (specifiedPartitions != null) { - partitionPredicate = - PredicateBuilder.or( - specifiedPartitions.stream() - .map(p -> PredicateBuilder.partition(p, table.rowType())) - .toArray(Predicate[]::new)); - } - return partitionPredicate; - } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 7239efad3..13c086e24 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -20,16 +20,37 @@ package org.apache.paimon.spark.procedure; import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.spark.DynamicOverWrite$; +import org.apache.paimon.spark.SparkUtils; import org.apache.paimon.spark.commands.WriteIntoPaimonTable; import org.apache.paimon.spark.sort.TableSorter; import org.apache.paimon.table.AppendOnlyFileStoreTable; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CompactionTaskSerializer; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ParameterUtils; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -42,15 +63,26 @@ import org.apache.spark.sql.types.StructType; import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.spark.sql.types.DataTypes.StringType; -/** Compact procedure for tables. */ +/** + * Compact procedure. Usage: + * + * <pre><code> + * CALL sys.compact(table => 'tableId', [partitions => 'p1;p2'], [order_strategy => 'xxx'], [order_by => 'xxx']) + * </code></pre> + */ public class CompactProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = @@ -85,7 +117,7 @@ public class CompactProcedure extends BaseProcedure { public InternalRow[] call(InternalRow args) { Preconditions.checkArgument(args.numFields() >= 1); Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String partitionFilter = blank(args, 1) ? null : toWhere(args.getString(1)); + String partitions = blank(args, 1) ? null : args.getString(1); String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() : args.getString(2); List<String> sortColumns = blank(args, 3) @@ -106,14 +138,14 @@ public class CompactProcedure extends BaseProcedure { (FileStoreTable) table, sortType, sortColumns, - partitionFilter)); + partitions)); return new InternalRow[] {internalRow}; }); } @Override public String description() { - return "This procedure execute sort compact action on unaware-bucket table."; + return "This procedure execute compact action on paimon table."; } private boolean blank(InternalRow args, int index) { @@ -124,28 +156,160 @@ public class CompactProcedure extends BaseProcedure { FileStoreTable table, String sortType, List<String> sortColumns, - @Nullable String filter) { - CoreOptions coreOptions = table.store().options(); - - // sort only works with bucket=-1 yet - if (!TableSorter.OrderType.of(sortType).equals(TableSorter.OrderType.NONE)) { - if (!(table instanceof AppendOnlyFileStoreTable) || coreOptions.bucket() != -1) { - throw new UnsupportedOperationException( - "Spark compact with sort_type " - + sortType - + " only support unaware-bucket append-only table yet."); + @Nullable String partitions) { + table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false")); + BucketMode bucketMode = table.bucketMode(); + TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType); + + if (orderType.equals(TableSorter.OrderType.NONE)) { + JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext()); + Predicate filter = + StringUtils.isBlank(partitions) + ? null + : PredicateBuilder.partitions( + ParameterUtils.getPartitions(partitions), table.rowType()); + switch (bucketMode) { + case FIXED: + case DYNAMIC: + compactAwareBucketTable(table, filter, javaSparkContext); + break; + case UNAWARE: + compactUnAwareBucketTable(table, filter, javaSparkContext); + break; + default: + throw new UnsupportedOperationException( + "Spark compact with " + bucketMode + " is not support yet."); + } + } else { + switch (bucketMode) { + case UNAWARE: + sortCompactUnAwareBucketTable(table, orderType, sortColumns, partitions); + break; + default: + throw new UnsupportedOperationException( + "Spark compact with sort_type " + + sortType + + " only support unaware-bucket append-only table yet."); } } + return true; + } + + private void compactAwareBucketTable( + FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) { + InnerTableScan scan = table.newScan(); + if (filter != null) { + scan.withFilter(filter); + } + + List<Pair<BinaryRow, Integer>> partitionBuckets = + scan.plan().splits().stream() + .map(split -> (DataSplit) split) + .map(dataSplit -> Pair.of(dataSplit.partition(), dataSplit.bucket())) + .distinct() + .collect(Collectors.toList()); + + if (partitionBuckets.isEmpty()) { + return; + } - Dataset<Row> row = spark().read().format("paimon").load(coreOptions.path().toString()); - row = StringUtils.isBlank(filter) ? row : row.where(filter); + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + JavaRDD<CommitMessage> commitMessageJavaRDD = + javaSparkContext + .parallelize(partitionBuckets) + .mapPartitions( + (FlatMapFunction<Iterator<Pair<BinaryRow, Integer>>, CommitMessage>) + pairIterator -> { + IOManager ioManager = SparkUtils.createIOManager(); + BatchTableWrite write = writeBuilder.newWrite(); + write.withIOManager(ioManager); + try { + while (pairIterator.hasNext()) { + Pair<BinaryRow, Integer> pair = + pairIterator.next(); + write.compact( + pair.getLeft(), pair.getRight(), true); + } + return write.prepareCommit().iterator(); + } finally { + write.close(); + ioManager.close(); + } + }); + + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(commitMessageJavaRDD.collect()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void compactUnAwareBucketTable( + FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) { + AppendOnlyFileStoreTable fileStoreTable = (AppendOnlyFileStoreTable) table; + List<AppendOnlyCompactionTask> compactionTasks = + new AppendOnlyTableCompactionCoordinator(fileStoreTable, false, filter).run(); + if (compactionTasks.isEmpty()) { + return; + } + + CompactionTaskSerializer serializer = new CompactionTaskSerializer(); + List<byte[]> serializedTasks = new ArrayList<>(); + try { + for (AppendOnlyCompactionTask compactionTask : compactionTasks) { + serializedTasks.add(serializer.serialize(compactionTask)); + } + } catch (IOException e) { + throw new RuntimeException("serialize compaction task failed"); + } + + String commitUser = UUID.randomUUID().toString(); + JavaRDD<CommitMessage> commitMessageJavaRDD = + javaSparkContext + .parallelize(serializedTasks) + .mapPartitions( + (FlatMapFunction<Iterator<byte[]>, CommitMessage>) + taskIterator -> { + AppendOnlyFileStoreWrite write = + fileStoreTable.store().newWrite(commitUser); + CompactionTaskSerializer ser = + new CompactionTaskSerializer(); + ArrayList<CommitMessage> messages = new ArrayList<>(); + try { + while (taskIterator.hasNext()) { + AppendOnlyCompactionTask task = + ser.deserialize( + ser.getVersion(), + taskIterator.next()); + messages.add(task.doCompact(write)); + } + return messages.iterator(); + } finally { + write.close(); + } + }); + + try (TableCommitImpl commit = table.newCommit(commitUser)) { + commit.commit(commitMessageJavaRDD.collect()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void sortCompactUnAwareBucketTable( + FileStoreTable table, + TableSorter.OrderType orderType, + List<String> sortColumns, + @Nullable String partitions) { + Dataset<Row> row = + spark().read().format("paimon").load(table.coreOptions().path().toString()); + row = StringUtils.isBlank(partitions) ? row : row.where(toWhere(partitions)); new WriteIntoPaimonTable( table, DynamicOverWrite$.MODULE$, - TableSorter.getSorter(table, sortType, sortColumns).sort(row), + TableSorter.getSorter(table, orderType, sortColumns).sort(row), new Options()) .run(spark()); - return true; } @VisibleForTesting diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java index d6cdbefa8..b480dc8d4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java @@ -62,8 +62,8 @@ public abstract class TableSorter { public abstract Dataset<Row> sort(Dataset<Row> input); public static TableSorter getSorter( - FileStoreTable table, String sortStrategy, List<String> orderColumns) { - switch (OrderType.of(sortStrategy)) { + FileStoreTable table, TableSorter.OrderType orderType, List<String> orderColumns) { + switch (orderType) { case ORDER: return new OrderSorter(table, orderColumns); case ZORDER: @@ -79,7 +79,7 @@ public abstract class TableSorter { } }; default: - throw new IllegalArgumentException("cannot match order type: " + sortStrategy); + throw new IllegalArgumentException("cannot match order type: " + orderType); } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala index 5e826237b..788c8dd3e 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala @@ -17,17 +17,18 @@ */ package org.apache.paimon.spark.procedure +import org.apache.paimon.Snapshot.CommitKind import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.AbstractFileStoreTable import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.SparkSession.setActiveSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest import org.assertj.core.api.Assertions import java.util -/** Test sort compact procedure. See {@link CompactProcedure}. */ +/** Test sort compact procedure. See [[CompactProcedure]]. */ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ @@ -257,7 +258,122 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest { } } - test("Piamon test: toWhere method in CompactProcedure") { + test("Paimon Procedure: compact aware bucket pk table") { + Seq(1, -1).foreach( + bucket => { + withTable("T") { + spark.sql( + s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='$bucket', 'write-only'='true') + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')") + spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')") + + spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(3) + + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4) + + // compact condition no longer met + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4) + + checkAnswer( + spark.sql(s"SELECT * FROM T ORDER BY id"), + Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4, "d", "p2") :: Nil) + } + }) + } + + test("Paimon Procedure: compact aware bucket pk table with many small files") { + Seq(3, -1).foreach( + bucket => { + withTable("T") { + spark.sql( + s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='$bucket', 'write-only'='true', + |'source.split.target-size'='128m','source.split.open-file-cost'='32m') -- simulate multiple splits in a single bucket + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + val count = 100 + for (i <- 0 until count) { + spark.sql(s"INSERT INTO T VALUES ($i, 'a', 'p${i % 2}')") + } + + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + checkAnswer(spark.sql(s"SELECT COUNT(*) FROM T"), Row(count) :: Nil) + } + }) + } + + test("Paimon Procedure: compact unaware bucket append table") { + spark.sql( + s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2', 'compaction.max.file-num' = '3') + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')") + spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')") + spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')") + + spark.sql(s"CALL sys.compact(table => 'T', partitions => 'pt=p1')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4) + + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5) + + // compact condition no longer met + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5) + + checkAnswer( + spark.sql(s"SELECT * FROM T ORDER BY id"), + Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4, "d", "p2") :: Row( + 5, + "e", + "p1") :: Row(6, "f", "p2") :: Nil) + } + + test("Paimon Procedure: compact unaware bucket append table with many small files") { + spark.sql( + s""" + |CREATE TABLE T (id INT, value STRING, pt STRING) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.max.file-num' = '10') + |PARTITIONED BY (pt) + |""".stripMargin) + + val table = loadTable("T") + + val count = 100 + for (i <- 0 until count) { + spark.sql(s"INSERT INTO T VALUES ($i, 'a', 'p${i % 2}')") + } + + spark.sql(s"CALL sys.compact(table => 'T')") + Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue + checkAnswer(spark.sql(s"SELECT COUNT(*) FROM T"), Row(count) :: Nil) + } + + test("Paimon test: toWhere method in CompactProcedure") { val conditions = "f0=0,f1=0,f2=0;f0=1,f1=1,f2=1;f0=1,f1=2,f2=2;f3=3" val where = CompactProcedure.toWhere(conditions) @@ -266,4 +382,12 @@ class CompactProcedureTest extends PaimonSparkTestBase with StreamTest { Assertions.assertThat(where).isEqualTo(whereExpected) } + + def lastSnapshotCommand(table: AbstractFileStoreTable): CommitKind = { + table.snapshotManager().latestSnapshot().commitKind() + } + + def lastSnapshotId(table: AbstractFileStoreTable): Long = { + table.snapshotManager().latestSnapshotId() + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala index 472f41118..4837bca79 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala @@ -15,8 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.paimon.spark +package org.apache.paimon.spark.sql +import org.apache.paimon.spark.{PaimonSparkTestBase, SparkInputPartition, SparkTable} import org.apache.paimon.table.source.DataSplit import org.apache.spark.sql.Row
