This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e2573f8b0 [core][spark] Use list partitions with predicate (#3267)
e2573f8b0 is described below

commit e2573f8b0ae6afccd64602857c0d4b397ac2ed5c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 26 15:36:42 2024 +0800

    [core][spark] Use list partitions with predicate (#3267)
---
 .../java/org/apache/paimon/operation/FileStoreScan.java     |  6 ++++++
 .../java/org/apache/paimon/operation/PartitionExpire.java   |  9 ++-------
 .../main/java/org/apache/paimon/table/source/TableScan.java |  2 +-
 .../apache/paimon/table/source/snapshot/SnapshotReader.java |  4 +++-
 .../paimon/table/source/snapshot/SnapshotReaderImpl.java    | 13 +++++++------
 .../java/org/apache/paimon/table/system/AuditLogTable.java  |  6 ++++++
 .../spark/commands/DeleteFromPaimonTableCommand.scala       |  5 +++--
 7 files changed, 28 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index a963e8d42..481010269 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -83,6 +83,12 @@ public interface FileStoreScan {
 
     List<PartitionEntry> readPartitionEntries();
 
+    default List<BinaryRow> listPartitions() {
+        return readPartitionEntries().stream()
+                .map(PartitionEntry::partition)
+                .collect(Collectors.toList());
+    }
+
     /** Result plan of this scan. */
     interface Plan {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 895121ac4..94a35cb54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.types.RowType;
@@ -38,7 +37,6 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /** Expire partitions. */
 public class PartitionExpire {
@@ -117,11 +115,8 @@ public class PartitionExpire {
     }
 
     private List<BinaryRow> readPartitions(LocalDateTime expireDateTime) {
-        return scan.withPartitionFilter(new 
PartitionTimePredicate(expireDateTime)).plan().files()
-                .stream()
-                .map(ManifestEntry::partition)
-                .distinct()
-                .collect(Collectors.toList());
+        return scan.withPartitionFilter(new 
PartitionTimePredicate(expireDateTime))
+                .listPartitions();
     }
 
     private class PartitionTimePredicate implements PartitionPredicate {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index ba8b0b5e1..791d61f41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -35,7 +35,7 @@ public interface TableScan {
     /** Plan splits, throws {@link EndOfScanException} if the scan is ended. */
     Plan plan();
 
-    /** Get partitions from simple manifest entries. */
+    /** List partitions. */
     List<BinaryRow> listPartitions();
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index bf59a01e7..ac84f4f16 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -54,6 +54,8 @@ public interface SnapshotReader {
 
     SnapshotReader withPartitionFilter(Map<String, String> partitionSpec);
 
+    SnapshotReader withPartitionFilter(Predicate predicate);
+
     SnapshotReader withMode(ScanMode scanMode);
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
@@ -74,7 +76,7 @@ public interface SnapshotReader {
 
     Plan readIncrementalDiff(Snapshot before);
 
-    /** Get partitions from a snapshot. */
+    /** List partitions. */
     List<BinaryRow> partitions();
 
     List<PartitionEntry> partitionEntries();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index c36427298..2ed258e85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
@@ -73,7 +72,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private final FileStoreScan scan;
     private final TableSchema tableSchema;
     private final CoreOptions options;
-    private final MergeEngine mergeEngine;
     private final boolean deletionVectors;
     private final SnapshotManager snapshotManager;
     private final ConsumerManager consumerManager;
@@ -101,7 +99,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.options = options;
-        this.mergeEngine = options.mergeEngine();
         this.deletionVectors = options.deletionVectorsEnabled();
         this.snapshotManager = snapshotManager;
         this.consumerManager =
@@ -166,6 +163,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withPartitionFilter(Predicate predicate) {
+        scan.withPartitionFilter(predicate);
+        return this;
+    }
+
     @Override
     public SnapshotReader withFilter(Predicate predicate) {
         List<String> partitionKeys = tableSchema.partitionKeys();
@@ -307,9 +310,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
     @Override
     public List<BinaryRow> partitions() {
-        return scan.readPartitionEntries().stream()
-                .map(PartitionEntry::partition)
-                .collect(Collectors.toList());
+        return scan.listPartitions();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index ef47e1209..b6cfa5c8f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -245,6 +245,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withPartitionFilter(Predicate predicate) {
+            snapshotReader.withPartitionFilter(predicate);
+            return this;
+        }
+
         @Override
         public SnapshotReader withMode(ScanMode scanMode) {
             snapshotReader.withMode(scanMode);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index c955acbdf..30a2e6eae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.spark.PaimonSplitScan
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
@@ -74,8 +75,8 @@ case class DeleteFromPaimonTableCommand(
       }
 
       if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
-        val allPartitions = table.newReadBuilder.newScan.listPartitions.asScala
-        val matchedPartitions = 
allPartitions.filter(partitionPredicate.get.test)
+        val matchedPartitions =
+          
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
         val rowDataPartitionComputer = new RowDataPartitionComputer(
           CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
           table.schema().logicalPartitionType(),

Reply via email to