This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2573f8b0 [core][spark] Use list partitions with predicate (#3267)
e2573f8b0 is described below
commit e2573f8b0ae6afccd64602857c0d4b397ac2ed5c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 26 15:36:42 2024 +0800
[core][spark] Use list partitions with predicate (#3267)
---
.../java/org/apache/paimon/operation/FileStoreScan.java | 6 ++++++
.../java/org/apache/paimon/operation/PartitionExpire.java | 9 ++-------
.../main/java/org/apache/paimon/table/source/TableScan.java | 2 +-
.../apache/paimon/table/source/snapshot/SnapshotReader.java | 4 +++-
.../paimon/table/source/snapshot/SnapshotReaderImpl.java | 13 +++++++------
.../java/org/apache/paimon/table/system/AuditLogTable.java | 6 ++++++
.../spark/commands/DeleteFromPaimonTableCommand.scala | 5 +++--
7 files changed, 28 insertions(+), 17 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index a963e8d42..481010269 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -83,6 +83,12 @@ public interface FileStoreScan {
List<PartitionEntry> readPartitionEntries();
+ default List<BinaryRow> listPartitions() {
+ return readPartitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .collect(Collectors.toList());
+ }
+
/** Result plan of this scan. */
interface Plan {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 895121ac4..94a35cb54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.types.RowType;
@@ -38,7 +37,6 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/** Expire partitions. */
public class PartitionExpire {
@@ -117,11 +115,8 @@ public class PartitionExpire {
}
private List<BinaryRow> readPartitions(LocalDateTime expireDateTime) {
- return scan.withPartitionFilter(new
PartitionTimePredicate(expireDateTime)).plan().files()
- .stream()
- .map(ManifestEntry::partition)
- .distinct()
- .collect(Collectors.toList());
+ return scan.withPartitionFilter(new
PartitionTimePredicate(expireDateTime))
+ .listPartitions();
}
private class PartitionTimePredicate implements PartitionPredicate {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
index ba8b0b5e1..791d61f41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableScan.java
@@ -35,7 +35,7 @@ public interface TableScan {
/** Plan splits, throws {@link EndOfScanException} if the scan is ended. */
Plan plan();
- /** Get partitions from simple manifest entries. */
+ /** List partitions. */
List<BinaryRow> listPartitions();
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index bf59a01e7..ac84f4f16 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -54,6 +54,8 @@ public interface SnapshotReader {
SnapshotReader withPartitionFilter(Map<String, String> partitionSpec);
+ SnapshotReader withPartitionFilter(Predicate predicate);
+
SnapshotReader withMode(ScanMode scanMode);
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
@@ -74,7 +76,7 @@ public interface SnapshotReader {
Plan readIncrementalDiff(Snapshot before);
- /** Get partitions from a snapshot. */
+ /** List partitions. */
List<BinaryRow> partitions();
List<PartitionEntry> partitionEntries();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index c36427298..2ed258e85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.Snapshot;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
@@ -73,7 +72,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
private final FileStoreScan scan;
private final TableSchema tableSchema;
private final CoreOptions options;
- private final MergeEngine mergeEngine;
private final boolean deletionVectors;
private final SnapshotManager snapshotManager;
private final ConsumerManager consumerManager;
@@ -101,7 +99,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
- this.mergeEngine = options.mergeEngine();
this.deletionVectors = options.deletionVectorsEnabled();
this.snapshotManager = snapshotManager;
this.consumerManager =
@@ -166,6 +163,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withPartitionFilter(Predicate predicate) {
+ scan.withPartitionFilter(predicate);
+ return this;
+ }
+
@Override
public SnapshotReader withFilter(Predicate predicate) {
List<String> partitionKeys = tableSchema.partitionKeys();
@@ -307,9 +310,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Override
public List<BinaryRow> partitions() {
- return scan.readPartitionEntries().stream()
- .map(PartitionEntry::partition)
- .collect(Collectors.toList());
+ return scan.listPartitions();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index ef47e1209..b6cfa5c8f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -245,6 +245,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withPartitionFilter(Predicate predicate) {
+ snapshotReader.withPartitionFilter(predicate);
+ return this;
+ }
+
@Override
public SnapshotReader withMode(ScanMode scanMode) {
snapshotReader.withMode(scanMode);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index c955acbdf..30a2e6eae 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
@@ -74,8 +75,8 @@ case class DeleteFromPaimonTableCommand(
}
if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
- val allPartitions = table.newReadBuilder.newScan.listPartitions.asScala
- val matchedPartitions =
allPartitions.filter(partitionPredicate.get.test)
+ val matchedPartitions =
+
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
val rowDataPartitionComputer = new RowDataPartitionComputer(
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
table.schema().logicalPartitionType(),