This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 350990a8c [core] System files table distributed execute remote (#4204)
350990a8c is described below
commit 350990a8c4844852a7bc00ccc518c189d98a013c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Sep 23 14:20:01 2024 +0800
[core] System files table distributed execute remote (#4204)
---
.../org/apache/paimon/table/system/FilesTable.java | 71 +++++++++++++---------
.../apache/paimon/table/system/FilesTableTest.java | 3 +-
2 files changed, 43 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 99a9298d3..598cd1c9f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
@@ -47,6 +48,7 @@ import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -75,6 +77,7 @@ import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -134,7 +137,7 @@ public class FilesTable implements ReadonlyTable {
@Override
public InnerTableScan newScan() {
- return new FilesScan();
+ return new FilesScan(storeTable);
}
@Override
@@ -154,6 +157,12 @@ public class FilesTable implements ReadonlyTable {
@Nullable private LeafPredicate bucketPredicate;
@Nullable private LeafPredicate levelPredicate;
+ private final FileStoreTable fileStoreTable;
+
+ public FilesScan(FileStoreTable fileStoreTable) {
+ this.fileStoreTable = fileStoreTable;
+ }
+
@Override
public InnerTableScan withFilter(Predicate pushdown) {
if (pushdown == null) {
@@ -170,23 +179,46 @@ public class FilesTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
+ SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
+ if (partitionPredicate != null && partitionPredicate.function()
instanceof Equal) {
+ String partitionStr =
partitionPredicate.literals().get(0).toString();
+ if (partitionStr.startsWith("[")) {
+ partitionStr = partitionStr.substring(1);
+ }
+ if (partitionStr.endsWith("]")) {
+ partitionStr = partitionStr.substring(0,
partitionStr.length() - 1);
+ }
+ String[] partFields = partitionStr.split(", ");
+ LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+ List<String> partitionKeys = fileStoreTable.partitionKeys();
+ if (partitionKeys.size() != partFields.length) {
+ return Collections::emptyList;
+ }
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ partSpec.put(partitionKeys.get(i), partFields[i]);
+ }
+ snapshotReader.withPartitionFilter(partSpec);
+ // TODO support range?
+ }
+
return () ->
- Collections.singletonList(
- new FilesSplit(partitionPredicate,
bucketPredicate, levelPredicate));
+ snapshotReader.partitions().stream()
+ .map(p -> new FilesSplit(p, bucketPredicate,
levelPredicate))
+ .collect(Collectors.toList());
}
}
private static class FilesSplit extends SingletonSplit {
- @Nullable private final LeafPredicate partitionPredicate;
+ @Nullable private final BinaryRow partition;
@Nullable private final LeafPredicate bucketPredicate;
@Nullable private final LeafPredicate levelPredicate;
private FilesSplit(
- @Nullable LeafPredicate partitionPredicate,
+ @Nullable BinaryRow partition,
@Nullable LeafPredicate bucketPredicate,
@Nullable LeafPredicate levelPredicate) {
- this.partitionPredicate = partitionPredicate;
+ this.partition = partition;
this.bucketPredicate = bucketPredicate;
this.levelPredicate = levelPredicate;
}
@@ -200,14 +232,14 @@ public class FilesTable implements ReadonlyTable {
return false;
}
FilesSplit that = (FilesSplit) o;
- return Objects.equals(partitionPredicate, that.partitionPredicate)
+ return Objects.equals(partition, that.partition)
&& Objects.equals(bucketPredicate, that.bucketPredicate)
&& Objects.equals(this.levelPredicate,
that.levelPredicate);
}
@Override
public int hashCode() {
- return Objects.hash(partitionPredicate, bucketPredicate,
levelPredicate);
+ return Objects.hash(partition, bucketPredicate, levelPredicate);
}
public List<Split> splits(FileStoreTable storeTable) {
@@ -216,27 +248,8 @@ public class FilesTable implements ReadonlyTable {
private TableScan.Plan tablePlan(FileStoreTable storeTable) {
InnerTableScan scan = storeTable.newScan();
- if (partitionPredicate != null) {
- if (partitionPredicate.function() instanceof Equal) {
- String partitionStr =
partitionPredicate.literals().get(0).toString();
- if (partitionStr.startsWith("[")) {
- partitionStr = partitionStr.substring(1);
- }
- if (partitionStr.endsWith("]")) {
- partitionStr = partitionStr.substring(0,
partitionStr.length() - 1);
- }
- String[] partFields = partitionStr.split(", ");
- LinkedHashMap<String, String> partSpec = new
LinkedHashMap<>();
- List<String> partitionKeys = storeTable.partitionKeys();
- if (partitionKeys.size() != partFields.length) {
- return Collections::emptyList;
- }
- for (int i = 0; i < partitionKeys.size(); i++) {
- partSpec.put(partitionKeys.get(i), partFields[i]);
- }
- scan.withPartitionFilter(partSpec);
- }
- // TODO support range?
+ if (partition != null) {
+ scan.withPartitionFilter(Collections.singletonList(partition));
}
if (bucketPredicate != null) {
scan.withBucketFilter(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 0d7a8b497..89fb201fa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -159,8 +159,7 @@ public class FilesTableTest extends TableTestBase {
}
@Test
- public void testReadFilesFromNotExistSnapshot() throws Exception {
-
+ public void testReadFilesFromNotExistSnapshot() {
filesTable =
(FilesTable)
filesTable.copy(