This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 1c677142b4c16bc1729d1499ea45cf41321614ba
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 31 19:39:19 2024 +0800

    [core] Fix FilesTable splits too big to distribute (#3454)
---
 .../apache/paimon/table/source/SingletonSplit.java |  28 ++++++
 .../table/system/AggregationFieldsTable.java       |  18 +---
 .../paimon/table/system/AllTableOptionsTable.java  |  23 +----
 .../apache/paimon/table/system/BranchesTable.java  |  16 +---
 .../paimon/table/system/CatalogOptionsTable.java   |  11 +--
 .../apache/paimon/table/system/ConsumersTable.java |  18 +---
 .../org/apache/paimon/table/system/FilesTable.java | 104 ++++++++++-----------
 .../apache/paimon/table/system/ManifestsTable.java |  34 ++-----
 .../apache/paimon/table/system/OptionsTable.java   |  16 +---
 .../paimon/table/system/PartitionsTable.java       |   8 +-
 .../apache/paimon/table/system/SchemasTable.java   |  22 +----
 .../apache/paimon/table/system/SnapshotsTable.java |   9 +-
 .../apache/paimon/table/system/StatisticTable.java |  23 +----
 .../org/apache/paimon/table/system/TagsTable.java  |  16 +---
 14 files changed, 125 insertions(+), 221 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java
new file mode 100644
index 000000000..e5a649a72
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+/** Singleton split use for system table, in which, scan always just produce 
one split. */
+public abstract class SingletonSplit implements Split {
+
+    @Override
+    public long rowCount() {
+        return 1;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 29bec8502..0aa17b1b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
@@ -120,32 +121,21 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new AggregationSplit(
-                                    new SchemaManager(fileIO, 
location).listAllIds().size(),
-                                    location));
+            return () -> Collections.singletonList(new 
AggregationSplit(location));
         }
     }
 
     /** {@link Split} implementation for {@link AggregationFieldsTable}. */
-    private static class AggregationSplit implements Split {
+    private static class AggregationSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private AggregationSplit(long rowCount, Path location) {
-            this.rowCount = rowCount;
+        private AggregationSplit(Path location) {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
index a7b1236bf..eb50a38a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
@@ -42,7 +43,6 @@ import org.apache.paimon.utils.ProjectedRow;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -121,33 +121,20 @@ public class AllTableOptionsTable implements 
ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new AllTableSplit(
-                                    options(fileIO, 
allTablePaths).values().stream()
-                                            .flatMap(t -> t.values().stream())
-                                            .reduce(0, (a, b) -> a + b.size(), 
Integer::sum),
-                                    allTablePaths));
+            return () -> Collections.singletonList(new 
AllTableSplit(allTablePaths));
         }
     }
 
-    private static class AllTableSplit implements Split {
+    private static class AllTableSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Map<String, Map<String, Path>> allTablePaths;
 
-        private AllTableSplit(long rowCount, Map<String, Map<String, Path>> 
allTablePaths) {
-            this.rowCount = rowCount;
+        private AllTableSplit(Map<String, Map<String, Path>> allTablePaths) {
             this.allTablePaths = allTablePaths;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -192,7 +179,7 @@ public class AllTableOptionsTable implements ReadonlyTable {
         }
 
         @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof AllTableSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 7ae31095c..f37370641 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -122,28 +123,19 @@ public class BranchesTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            FileStoreTable table = FileStoreTableFactory.create(fileIO, 
location);
-            long rowCount = table.branchManager().branchCount();
-            return () -> Collections.singletonList(new BranchesSplit(rowCount, 
location));
+            return () -> Collections.singletonList(new 
BranchesSplit(location));
         }
     }
 
-    private static class BranchesSplit implements Split {
+    private static class BranchesSplit extends SingletonSplit {
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private BranchesSplit(long rowCount, Path location) {
-            this.rowCount = rowCount;
+        private BranchesSplit(Path location) {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
index 2e94edf1e..699d0324f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
@@ -39,7 +40,6 @@ import org.apache.paimon.utils.ProjectedRow;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -113,7 +113,7 @@ public class CatalogOptionsTable implements ReadonlyTable {
         }
     }
 
-    private static class CatalogOptionsSplit implements Split {
+    private static class CatalogOptionsSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
@@ -123,11 +123,6 @@ public class CatalogOptionsTable implements ReadonlyTable {
             this.catalogOptions = catalogOptions.toMap();
         }
 
-        @Override
-        public long rowCount() {
-            return catalogOptions.size();
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -168,7 +163,7 @@ public class CatalogOptionsTable implements ReadonlyTable {
         }
 
         @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof CatalogOptionsTable.CatalogOptionsSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index a3ec3017e..05b3382d2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -114,32 +115,21 @@ public class ConsumersTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new ConsumersTable.ConsumersSplit(
-                                    new ConsumerManager(fileIO, 
location).listAllIds().size(),
-                                    location));
+            return () -> Collections.singletonList(new 
ConsumersTable.ConsumersSplit(location));
         }
     }
 
     /** {@link Split} implementation for {@link ConsumersTable}. */
-    private static class ConsumersSplit implements Split {
+    private static class ConsumersSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private ConsumersSplit(long rowCount, Path location) {
-            this.rowCount = rowCount;
+        private ConsumersSplit(Path location) {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 1f6eb2e0e..df5a13881 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -43,6 +43,7 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
@@ -62,7 +63,6 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -134,7 +134,7 @@ public class FilesTable implements ReadonlyTable {
 
     @Override
     public InnerTableScan newScan() {
-        return new FilesScan(storeTable);
+        return new FilesScan();
     }
 
     @Override
@@ -150,16 +150,10 @@ public class FilesTable implements ReadonlyTable {
 
     private static class FilesScan extends ReadOnceTableScan {
 
-        private final FileStoreTable storeTable;
-
         @Nullable private LeafPredicate partitionPredicate;
         @Nullable private LeafPredicate bucketPredicate;
         @Nullable private LeafPredicate levelPredicate;
 
-        private FilesScan(FileStoreTable storeTable) {
-            this.storeTable = storeTable;
-        }
-
         @Override
         public InnerTableScan withFilter(Predicate pushdown) {
             if (pushdown == null) {
@@ -176,12 +170,51 @@ public class FilesTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            // plan here, just set the result of plan to split
-            TableScan.Plan plan = tablePlan();
-            return () -> Collections.singletonList(new 
FilesSplit(plan.splits()));
+            return () ->
+                    Collections.singletonList(
+                            new FilesSplit(partitionPredicate, 
bucketPredicate, levelPredicate));
+        }
+    }
+
+    private static class FilesSplit extends SingletonSplit {
+
+        @Nullable private final LeafPredicate partitionPredicate;
+        @Nullable private final LeafPredicate bucketPredicate;
+        @Nullable private final LeafPredicate levelPredicate;
+
+        private FilesSplit(
+                @Nullable LeafPredicate partitionPredicate,
+                @Nullable LeafPredicate bucketPredicate,
+                @Nullable LeafPredicate levelPredicate) {
+            this.partitionPredicate = partitionPredicate;
+            this.bucketPredicate = bucketPredicate;
+            this.levelPredicate = levelPredicate;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            FilesSplit that = (FilesSplit) o;
+            return Objects.equals(partitionPredicate, that.partitionPredicate)
+                    && Objects.equals(bucketPredicate, that.bucketPredicate)
+                    && Objects.equals(this.levelPredicate, 
that.levelPredicate);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partitionPredicate, bucketPredicate, 
levelPredicate);
+        }
+
+        public List<Split> splits(FileStoreTable storeTable) {
+            return tablePlan(storeTable).splits();
         }
 
-        private TableScan.Plan tablePlan() {
+        private TableScan.Plan tablePlan(FileStoreTable storeTable) {
             InnerTableScan scan = storeTable.newScan();
             if (partitionPredicate != null) {
                 if (partitionPredicate.function() instanceof Equal) {
@@ -221,46 +254,6 @@ public class FilesTable implements ReadonlyTable {
         }
     }
 
-    private static class FilesSplit implements Split {
-
-        private static final long serialVersionUID = 1L;
-
-        private final List<Split> splits;
-
-        private FilesSplit(List<Split> splits) {
-            this.splits = splits;
-        }
-
-        @Override
-        public long rowCount() {
-            return splits.stream()
-                    .map(s -> (DataSplit) s)
-                    .mapToLong(s -> s.dataFiles().size())
-                    .sum();
-        }
-
-        public List<Split> splits() {
-            return splits;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            FilesSplit that = (FilesSplit) o;
-            return Objects.equals(splits, that.splits);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(splits);
-        }
-    }
-
     private static class FilesRead implements InnerTableRead {
 
         private final SchemaManager schemaManager;
@@ -292,12 +285,13 @@ public class FilesTable implements ReadonlyTable {
         }
 
         @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof FilesSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
             FilesSplit filesSplit = (FilesSplit) split;
-            if (filesSplit.splits().isEmpty()) {
+            List<Split> splits = filesSplit.splits(storeTable);
+            if (splits.isEmpty()) {
                 return new IteratorRecordReader<>(Collections.emptyIterator());
             }
 
@@ -332,7 +326,7 @@ public class FilesTable implements ReadonlyTable {
                                     });
                         }
                     };
-            for (Split dataSplit : filesSplit.splits()) {
+            for (Split dataSplit : splits) {
                 iteratorList.add(
                         Iterators.transform(
                                 ((DataSplit) dataSplit).dataFiles().iterator(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 7d7a9570e..e07ff602c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -48,13 +49,11 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
 
@@ -109,7 +108,7 @@ public class ManifestsTable implements ReadonlyTable {
         return new ManifestsTable(dataTable.copy(dynamicOptions));
     }
 
-    private class ManifestsScan extends ReadOnceTableScan {
+    private static class ManifestsScan extends ReadOnceTableScan {
 
         @Override
         public InnerTableScan withFilter(Predicate predicate) {
@@ -119,41 +118,22 @@ public class ManifestsTable implements ReadonlyTable {
 
         @Override
         protected Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(new 
ManifestsSplit(allManifests(dataTable).size()));
+            return () -> Collections.singletonList(new ManifestsSplit());
         }
     }
 
-    private static class ManifestsSplit implements Split {
+    private static class ManifestsSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
-
-        private ManifestsSplit(long rowCount) {
-            this.rowCount = rowCount;
-        }
-
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
+        private ManifestsSplit() {}
 
         @Override
         public boolean equals(Object o) {
             if (this == o) {
                 return true;
             }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ManifestsSplit that = (ManifestsSplit) o;
-            return Objects.equals(rowCount, that.rowCount);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(rowCount);
+            return o != null && getClass() == o.getClass();
         }
     }
 
@@ -185,7 +165,7 @@ public class ManifestsTable implements ReadonlyTable {
         }
 
         @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof ManifestsSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index b740ddec1..0c8ac2f25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
@@ -112,29 +113,20 @@ public class OptionsTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new OptionsSplit(options(fileIO, location).size(), 
location));
+            return () -> Collections.singletonList(new OptionsSplit(location));
         }
     }
 
-    private static class OptionsSplit implements Split {
+    private static class OptionsSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private OptionsSplit(long rowCount, Path location) {
-            this.rowCount = rowCount;
+        private OptionsSplit(Path location) {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index c08cdfaa3..76c0768ee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -124,15 +125,10 @@ public class PartitionsTable implements ReadonlyTable {
         }
     }
 
-    private static class PartitionsSplit implements Split {
+    private static class PartitionsSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        @Override
-        public long rowCount() {
-            return 1;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index 127323b42..2dcbcd2d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -47,7 +48,6 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -127,33 +127,21 @@ public class SchemasTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new SchemasSplit(
-                                    new SchemaManager(fileIO, 
location).listAllIds().size(),
-                                    location));
+            return () -> Collections.singletonList(new SchemasSplit(location));
         }
     }
 
     /** {@link Split} implementation for {@link SchemasTable}. */
-    private static class SchemasSplit implements Split {
+    private static class SchemasSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private SchemasSplit(long rowCount, Path location) {
-            this.rowCount = rowCount;
+        private SchemasSplit(Path location) {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
-        @Override
         public boolean equals(Object o) {
             if (this == o) {
                 return true;
@@ -198,7 +186,7 @@ public class SchemasTable implements ReadonlyTable {
         }
 
         @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof SchemasSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index b8ff9cd8b..991129038 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -155,7 +156,7 @@ public class SnapshotsTable implements ReadonlyTable {
         }
     }
 
-    private static class SnapshotsSplit implements Split {
+    private static class SnapshotsSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
@@ -165,12 +166,6 @@ public class SnapshotsTable implements ReadonlyTable {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            // dummy 1, just 1 parallelism
-            return 1;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
index 682b5b775..600c85a6b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
@@ -43,7 +44,6 @@ import org.apache.paimon.utils.IteratorRecordReader;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.SerializationUtils;
-import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
@@ -125,33 +125,18 @@ public class StatisticTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            long rowCount;
-            try {
-                rowCount = new SnapshotManager(fileIO, 
location).snapshotCount();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            return () ->
-                    Collections.singletonList(
-                            new StatisticTable.StatisticSplit(rowCount, 
location));
+            return () -> Collections.singletonList(new 
StatisticTable.StatisticSplit(location));
         }
     }
 
-    private static class StatisticSplit implements Split {
+    private static class StatisticSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private StatisticSplit(long rowCount, Path location) {
+        private StatisticSplit(Path location) {
             this.location = location;
-            this.rowCount = rowCount;
-        }
-
-        @Override
-        public long rowCount() {
-            return rowCount;
         }
 
         @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 0c1c4aa29..9e311ae15 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.tag.Tag;
@@ -126,29 +127,20 @@ public class TagsTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () ->
-                    Collections.singletonList(
-                            new TagsSplit(new TagManager(fileIO, 
location).tagCount(), location));
+            return () -> Collections.singletonList(new TagsSplit(location));
         }
     }
 
-    private static class TagsSplit implements Split {
+    private static class TagsSplit extends SingletonSplit {
 
         private static final long serialVersionUID = 1L;
 
-        private final long rowCount;
         private final Path location;
 
-        private TagsSplit(long rowCount, Path location) {
-            this.rowCount = rowCount;
+        private TagsSplit(Path location) {
             this.location = location;
         }
 
-        @Override
-        public long rowCount() {
-            return rowCount;
-        }
-
         @Override
         public boolean equals(Object o) {
             if (this == o) {


Reply via email to