This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 350990a8c [core] System files table distributed execute remote (#4204)
350990a8c is described below

commit 350990a8c4844852a7bc00ccc518c189d98a013c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Sep 23 14:20:01 2024 +0800

    [core] System files table distributed execute remote (#4204)
---
 .../org/apache/paimon/table/system/FilesTable.java | 71 +++++++++++++---------
 .../apache/paimon/table/system/FilesTableTest.java |  3 +-
 2 files changed, 43 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 99a9298d3..598cd1c9f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
@@ -47,6 +48,7 @@ import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -75,6 +77,7 @@ import java.util.Objects;
 import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -134,7 +137,7 @@ public class FilesTable implements ReadonlyTable {
 
     @Override
     public InnerTableScan newScan() {
-        return new FilesScan();
+        return new FilesScan(storeTable);
     }
 
     @Override
@@ -154,6 +157,12 @@ public class FilesTable implements ReadonlyTable {
         @Nullable private LeafPredicate bucketPredicate;
         @Nullable private LeafPredicate levelPredicate;
 
+        private final FileStoreTable fileStoreTable;
+
+        public FilesScan(FileStoreTable fileStoreTable) {
+            this.fileStoreTable = fileStoreTable;
+        }
+
         @Override
         public InnerTableScan withFilter(Predicate pushdown) {
             if (pushdown == null) {
@@ -170,23 +179,46 @@ public class FilesTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
+            SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
+            if (partitionPredicate != null && partitionPredicate.function() 
instanceof Equal) {
+                String partitionStr = 
partitionPredicate.literals().get(0).toString();
+                if (partitionStr.startsWith("[")) {
+                    partitionStr = partitionStr.substring(1);
+                }
+                if (partitionStr.endsWith("]")) {
+                    partitionStr = partitionStr.substring(0, 
partitionStr.length() - 1);
+                }
+                String[] partFields = partitionStr.split(", ");
+                LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
+                List<String> partitionKeys = fileStoreTable.partitionKeys();
+                if (partitionKeys.size() != partFields.length) {
+                    return Collections::emptyList;
+                }
+                for (int i = 0; i < partitionKeys.size(); i++) {
+                    partSpec.put(partitionKeys.get(i), partFields[i]);
+                }
+                snapshotReader.withPartitionFilter(partSpec);
+                // TODO support range?
+            }
+
             return () ->
-                    Collections.singletonList(
-                            new FilesSplit(partitionPredicate, 
bucketPredicate, levelPredicate));
+                    snapshotReader.partitions().stream()
+                            .map(p -> new FilesSplit(p, bucketPredicate, 
levelPredicate))
+                            .collect(Collectors.toList());
         }
     }
 
     private static class FilesSplit extends SingletonSplit {
 
-        @Nullable private final LeafPredicate partitionPredicate;
+        @Nullable private final BinaryRow partition;
         @Nullable private final LeafPredicate bucketPredicate;
         @Nullable private final LeafPredicate levelPredicate;
 
         private FilesSplit(
-                @Nullable LeafPredicate partitionPredicate,
+                @Nullable BinaryRow partition,
                 @Nullable LeafPredicate bucketPredicate,
                 @Nullable LeafPredicate levelPredicate) {
-            this.partitionPredicate = partitionPredicate;
+            this.partition = partition;
             this.bucketPredicate = bucketPredicate;
             this.levelPredicate = levelPredicate;
         }
@@ -200,14 +232,14 @@ public class FilesTable implements ReadonlyTable {
                 return false;
             }
             FilesSplit that = (FilesSplit) o;
-            return Objects.equals(partitionPredicate, that.partitionPredicate)
+            return Objects.equals(partition, that.partition)
                     && Objects.equals(bucketPredicate, that.bucketPredicate)
                     && Objects.equals(this.levelPredicate, 
that.levelPredicate);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(partitionPredicate, bucketPredicate, 
levelPredicate);
+            return Objects.hash(partition, bucketPredicate, levelPredicate);
         }
 
         public List<Split> splits(FileStoreTable storeTable) {
@@ -216,27 +248,8 @@ public class FilesTable implements ReadonlyTable {
 
         private TableScan.Plan tablePlan(FileStoreTable storeTable) {
             InnerTableScan scan = storeTable.newScan();
-            if (partitionPredicate != null) {
-                if (partitionPredicate.function() instanceof Equal) {
-                    String partitionStr = 
partitionPredicate.literals().get(0).toString();
-                    if (partitionStr.startsWith("[")) {
-                        partitionStr = partitionStr.substring(1);
-                    }
-                    if (partitionStr.endsWith("]")) {
-                        partitionStr = partitionStr.substring(0, 
partitionStr.length() - 1);
-                    }
-                    String[] partFields = partitionStr.split(", ");
-                    LinkedHashMap<String, String> partSpec = new 
LinkedHashMap<>();
-                    List<String> partitionKeys = storeTable.partitionKeys();
-                    if (partitionKeys.size() != partFields.length) {
-                        return Collections::emptyList;
-                    }
-                    for (int i = 0; i < partitionKeys.size(); i++) {
-                        partSpec.put(partitionKeys.get(i), partFields[i]);
-                    }
-                    scan.withPartitionFilter(partSpec);
-                }
-                // TODO support range?
+            if (partition != null) {
+                scan.withPartitionFilter(Collections.singletonList(partition));
             }
             if (bucketPredicate != null) {
                 scan.withBucketFilter(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
index 0d7a8b497..89fb201fa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java
@@ -159,8 +159,7 @@ public class FilesTableTest extends TableTestBase {
     }
 
     @Test
-    public void testReadFilesFromNotExistSnapshot() throws Exception {
-
+    public void testReadFilesFromNotExistSnapshot() {
         filesTable =
                 (FilesTable)
                         filesTable.copy(

Reply via email to