This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ef4182bffb [core] Format code, remove redundant code (#5885)
ef4182bffb is described below
commit ef4182bffbd37d5b4f45ba0f05928141d2a5c949
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jul 14 15:52:58 2025 +0800
[core] Format code, remove redundant code (#5885)
---
.../paimon/table/source/AbstractDataTableScan.java | 3 +-
.../paimon/table/source/DataTableBatchScan.java | 1 -
.../apache/paimon/flink/action/CompactAction.java | 1 -
.../paimon/spark/sql/DeletionVectorTest.scala | 74 ----------------------
4 files changed, 1 insertion(+), 78 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 76b348691e..b019f5dbda 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -84,7 +84,6 @@ abstract class AbstractDataTableScan implements DataTableScan
{
private final TableQueryAuth queryAuth;
@Nullable private RowType readType;
- @Nullable private Predicate predicate;
protected AbstractDataTableScan(
TableSchema schema,
@@ -99,7 +98,7 @@ abstract class AbstractDataTableScan implements DataTableScan
{
@Override
public InnerTableScan withFilter(Predicate predicate) {
- this.predicate = predicate;
+ snapshotReader.withFilter(predicate);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 32ac9f5d1f..d7f02c06ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -62,7 +62,6 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
@Override
public InnerTableScan withFilter(Predicate predicate) {
super.withFilter(predicate);
- snapshotReader.withFilter(predicate);
return this;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 5528fb2ee5..fa6af663e0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -206,7 +206,6 @@ public class CompactAction extends TableActionBase {
boolean fullMode =
partitions.stream()
.allMatch(part -> part.size() ==
partitionType.getFieldCount());
- PartitionPredicate partitionFilter;
if (fullMode) {
List<BinaryRow> binaryPartitions =
createBinaryPartitions(partitions, partitionType,
partitionDefaultName);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index ffc31bb46f..f85213da7c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -443,80 +443,6 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
}
}
- test(s"Paimon DeletionVector: delete for append partitioned table with
bucket = 1") {
- val bucket = -1
- withTable("T") {
- val bucketKey = if (bucket > 1) {
- ", 'bucket-key' = 'id'"
- } else {
- ""
- }
- spark.sql(
- s"""
- |CREATE TABLE T (id INT, name STRING, pt STRING)
- |PARTITIONED BY(pt)
- |TBLPROPERTIES ('deletion-vectors.enabled' = 'true',
'deletion-vectors.bitmap64' = '${Random
- .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
- |""".stripMargin)
-
- val table = loadTable("T")
- val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
-
- def getDeletionVectors(ptValues: Seq[String]): Map[String,
DeletionVector] = {
- getLatestDeletionVectors(table, dvMaintainerFactory,
ptValues.map(BinaryRow.singleColumn))
- }
-
- spark.sql(
- "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c',
'2025'), (4, 'd', '2025')")
- val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
- Assertions.assertEquals(0, deletionVectors1.size)
-
- val cond1 = "id = 2"
- val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
- runAndCheckSplitScan(s"DELETE FROM T WHERE $cond1")
- checkAnswer(
- spark.sql(s"SELECT * from T ORDER BY id"),
- Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") ::
Nil)
- val deletionVectors2 = getDeletionVectors(Seq("2024", "2025"))
- Assertions.assertEquals(1, deletionVectors2.size)
- deletionVectors2
- .foreach {
- case (filePath, dv) =>
- rowMetaInfo1(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
- }
-
- val cond2 = "id = 3"
- val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
- runAndCheckSplitScan(s"DELETE FROM T WHERE $cond2")
- checkAnswer(
- spark.sql(s"SELECT * from T ORDER BY id"),
- Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
- val deletionVectors3 = getDeletionVectors(Seq("2024"))
- Assertions.assertTrue(deletionVectors2 == deletionVectors3)
- val deletionVectors4 = getDeletionVectors(Seq("2024", "2025"))
- deletionVectors4
- .foreach {
- case (filePath, dv) =>
- rowMetaInfo2(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
- }
-
- spark.sql("""CALL sys.compact(table => 'T', partitions => "pt =
'2024'")""")
- Assertions.assertTrue(getDeletionVectors(Seq("2024")).isEmpty)
- Assertions.assertTrue(getDeletionVectors(Seq("2025")).nonEmpty)
- checkAnswer(
- spark.sql(s"SELECT * from T ORDER BY id"),
- Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
-
- spark.sql("""CALL sys.compact(table => 'T', where => "pt = '2025'")""")
- Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
- Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
- checkAnswer(
- spark.sql(s"SELECT * from T ORDER BY id"),
- Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
- }
- }
-
test("Paimon deletionVector: deletion vector write verification") {
withTable("T") {
spark.sql(s"""