This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ee1d5417e [core] Fix FilesTable splits too big to distribute (#3454)
ee1d5417e is described below
commit ee1d5417e2af218af4a04448a306fa99e68ac258
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 31 19:39:19 2024 +0800
[core] Fix FilesTable splits too big to distribute (#3454)
---
.../apache/paimon/table/source/SingletonSplit.java | 28 ++++++
.../table/system/AggregationFieldsTable.java | 18 +---
.../paimon/table/system/AllTableOptionsTable.java | 23 +----
.../apache/paimon/table/system/BranchesTable.java | 16 +---
.../paimon/table/system/CatalogOptionsTable.java | 11 +--
.../apache/paimon/table/system/ConsumersTable.java | 18 +---
.../org/apache/paimon/table/system/FilesTable.java | 104 ++++++++++-----------
.../apache/paimon/table/system/ManifestsTable.java | 34 ++-----
.../apache/paimon/table/system/OptionsTable.java | 16 +---
.../paimon/table/system/PartitionsTable.java | 8 +-
.../apache/paimon/table/system/SchemasTable.java | 22 +----
.../apache/paimon/table/system/SnapshotsTable.java | 9 +-
.../apache/paimon/table/system/StatisticTable.java | 23 +----
.../org/apache/paimon/table/system/TagsTable.java | 16 +---
14 files changed, 125 insertions(+), 221 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java
new file mode 100644
index 000000000..e5a649a72
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/SingletonSplit.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+/** Singleton split use for system table, in which, scan always just produce
one split. */
+public abstract class SingletonSplit implements Split {
+
+ @Override
+ public long rowCount() {
+ return 1;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 29bec8502..0aa17b1b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
@@ -120,32 +121,21 @@ public class AggregationFieldsTable implements
ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new AggregationSplit(
- new SchemaManager(fileIO,
location).listAllIds().size(),
- location));
+ return () -> Collections.singletonList(new
AggregationSplit(location));
}
}
/** {@link Split} implementation for {@link AggregationFieldsTable}. */
- private static class AggregationSplit implements Split {
+ private static class AggregationSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private AggregationSplit(long rowCount, Path location) {
- this.rowCount = rowCount;
+ private AggregationSplit(Path location) {
this.location = location;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
index a7b1236bf..eb50a38a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
@@ -42,7 +43,6 @@ import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -121,33 +121,20 @@ public class AllTableOptionsTable implements
ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new AllTableSplit(
- options(fileIO,
allTablePaths).values().stream()
- .flatMap(t -> t.values().stream())
- .reduce(0, (a, b) -> a + b.size(),
Integer::sum),
- allTablePaths));
+ return () -> Collections.singletonList(new
AllTableSplit(allTablePaths));
}
}
- private static class AllTableSplit implements Split {
+ private static class AllTableSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Map<String, Map<String, Path>> allTablePaths;
- private AllTableSplit(long rowCount, Map<String, Map<String, Path>>
allTablePaths) {
- this.rowCount = rowCount;
+ private AllTableSplit(Map<String, Map<String, Path>> allTablePaths) {
this.allTablePaths = allTablePaths;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -192,7 +179,7 @@ public class AllTableOptionsTable implements ReadonlyTable {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof AllTableSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
index 7ae31095c..f37370641 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -122,28 +123,19 @@ public class BranchesTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- FileStoreTable table = FileStoreTableFactory.create(fileIO,
location);
- long rowCount = table.branchManager().branchCount();
- return () -> Collections.singletonList(new BranchesSplit(rowCount,
location));
+ return () -> Collections.singletonList(new
BranchesSplit(location));
}
}
- private static class BranchesSplit implements Split {
+ private static class BranchesSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private BranchesSplit(long rowCount, Path location) {
- this.rowCount = rowCount;
+ private BranchesSplit(Path location) {
this.location = location;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
index 2e94edf1e..699d0324f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/CatalogOptionsTable.java
@@ -30,6 +30,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
@@ -39,7 +40,6 @@ import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -113,7 +113,7 @@ public class CatalogOptionsTable implements ReadonlyTable {
}
}
- private static class CatalogOptionsSplit implements Split {
+ private static class CatalogOptionsSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
@@ -123,11 +123,6 @@ public class CatalogOptionsTable implements ReadonlyTable {
this.catalogOptions = catalogOptions.toMap();
}
- @Override
- public long rowCount() {
- return catalogOptions.size();
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -168,7 +163,7 @@ public class CatalogOptionsTable implements ReadonlyTable {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof CatalogOptionsTable.CatalogOptionsSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index a3ec3017e..05b3382d2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -114,32 +115,21 @@ public class ConsumersTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new ConsumersTable.ConsumersSplit(
- new ConsumerManager(fileIO,
location).listAllIds().size(),
- location));
+ return () -> Collections.singletonList(new
ConsumersTable.ConsumersSplit(location));
}
}
/** {@link Split} implementation for {@link ConsumersTable}. */
- private static class ConsumersSplit implements Split {
+ private static class ConsumersSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private ConsumersSplit(long rowCount, Path location) {
- this.rowCount = rowCount;
+ private ConsumersSplit(Path location) {
this.location = location;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 5f47e574b..bb29a0bd7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -43,6 +43,7 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
@@ -62,7 +63,6 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -134,7 +134,7 @@ public class FilesTable implements ReadonlyTable {
@Override
public InnerTableScan newScan() {
- return new FilesScan(storeTable);
+ return new FilesScan();
}
@Override
@@ -150,16 +150,10 @@ public class FilesTable implements ReadonlyTable {
private static class FilesScan extends ReadOnceTableScan {
- private final FileStoreTable storeTable;
-
@Nullable private LeafPredicate partitionPredicate;
@Nullable private LeafPredicate bucketPredicate;
@Nullable private LeafPredicate levelPredicate;
- private FilesScan(FileStoreTable storeTable) {
- this.storeTable = storeTable;
- }
-
@Override
public InnerTableScan withFilter(Predicate pushdown) {
if (pushdown == null) {
@@ -176,12 +170,51 @@ public class FilesTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- // plan here, just set the result of plan to split
- TableScan.Plan plan = tablePlan();
- return () -> Collections.singletonList(new
FilesSplit(plan.splits()));
+ return () ->
+ Collections.singletonList(
+ new FilesSplit(partitionPredicate,
bucketPredicate, levelPredicate));
+ }
+ }
+
+ private static class FilesSplit extends SingletonSplit {
+
+ @Nullable private final LeafPredicate partitionPredicate;
+ @Nullable private final LeafPredicate bucketPredicate;
+ @Nullable private final LeafPredicate levelPredicate;
+
+ private FilesSplit(
+ @Nullable LeafPredicate partitionPredicate,
+ @Nullable LeafPredicate bucketPredicate,
+ @Nullable LeafPredicate levelPredicate) {
+ this.partitionPredicate = partitionPredicate;
+ this.bucketPredicate = bucketPredicate;
+ this.levelPredicate = levelPredicate;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FilesSplit that = (FilesSplit) o;
+ return Objects.equals(partitionPredicate, that.partitionPredicate)
+ && Objects.equals(bucketPredicate, that.bucketPredicate)
+ && Objects.equals(this.levelPredicate,
that.levelPredicate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionPredicate, bucketPredicate,
levelPredicate);
+ }
+
+ public List<Split> splits(FileStoreTable storeTable) {
+ return tablePlan(storeTable).splits();
}
- private TableScan.Plan tablePlan() {
+ private TableScan.Plan tablePlan(FileStoreTable storeTable) {
InnerTableScan scan = storeTable.newScan();
if (partitionPredicate != null) {
if (partitionPredicate.function() instanceof Equal) {
@@ -221,46 +254,6 @@ public class FilesTable implements ReadonlyTable {
}
}
- private static class FilesSplit implements Split {
-
- private static final long serialVersionUID = 1L;
-
- private final List<Split> splits;
-
- private FilesSplit(List<Split> splits) {
- this.splits = splits;
- }
-
- @Override
- public long rowCount() {
- return splits.stream()
- .map(s -> (DataSplit) s)
- .mapToLong(s -> s.dataFiles().size())
- .sum();
- }
-
- public List<Split> splits() {
- return splits;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- FilesSplit that = (FilesSplit) o;
- return Objects.equals(splits, that.splits);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(splits);
- }
- }
-
private static class FilesRead implements InnerTableRead {
private final SchemaManager schemaManager;
@@ -292,12 +285,13 @@ public class FilesTable implements ReadonlyTable {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof FilesSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
FilesSplit filesSplit = (FilesSplit) split;
- if (filesSplit.splits().isEmpty()) {
+ List<Split> splits = filesSplit.splits(storeTable);
+ if (splits.isEmpty()) {
return new IteratorRecordReader<>(Collections.emptyIterator());
}
@@ -332,7 +326,7 @@ public class FilesTable implements ReadonlyTable {
});
}
};
- for (Split dataSplit : filesSplit.splits()) {
+ for (Split dataSplit : splits) {
iteratorList.add(
Iterators.transform(
((DataSplit) dataSplit).dataFiles().iterator(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index 7d7a9570e..e07ff602c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -48,13 +49,11 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -109,7 +108,7 @@ public class ManifestsTable implements ReadonlyTable {
return new ManifestsTable(dataTable.copy(dynamicOptions));
}
- private class ManifestsScan extends ReadOnceTableScan {
+ private static class ManifestsScan extends ReadOnceTableScan {
@Override
public InnerTableScan withFilter(Predicate predicate) {
@@ -119,41 +118,22 @@ public class ManifestsTable implements ReadonlyTable {
@Override
protected Plan innerPlan() {
- return () ->
- Collections.singletonList(new
ManifestsSplit(allManifests(dataTable).size()));
+ return () -> Collections.singletonList(new ManifestsSplit());
}
}
- private static class ManifestsSplit implements Split {
+ private static class ManifestsSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
-
- private ManifestsSplit(long rowCount) {
- this.rowCount = rowCount;
- }
-
- @Override
- public long rowCount() {
- return rowCount;
- }
+ private ManifestsSplit() {}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ManifestsSplit that = (ManifestsSplit) o;
- return Objects.equals(rowCount, that.rowCount);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(rowCount);
+ return o != null && getClass() == o.getClass();
}
}
@@ -185,7 +165,7 @@ public class ManifestsTable implements ReadonlyTable {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof ManifestsSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index b740ddec1..0c8ac2f25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
@@ -112,29 +113,20 @@ public class OptionsTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new OptionsSplit(options(fileIO, location).size(),
location));
+ return () -> Collections.singletonList(new OptionsSplit(location));
}
}
- private static class OptionsSplit implements Split {
+ private static class OptionsSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private OptionsSplit(long rowCount, Path location) {
- this.rowCount = rowCount;
+ private OptionsSplit(Path location) {
this.location = location;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index c08cdfaa3..76c0768ee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -124,15 +125,10 @@ public class PartitionsTable implements ReadonlyTable {
}
}
- private static class PartitionsSplit implements Split {
+ private static class PartitionsSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- @Override
- public long rowCount() {
- return 1;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index 127323b42..2dcbcd2d9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -47,7 +48,6 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -127,33 +127,21 @@ public class SchemasTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new SchemasSplit(
- new SchemaManager(fileIO,
location).listAllIds().size(),
- location));
+ return () -> Collections.singletonList(new SchemasSplit(location));
}
}
/** {@link Split} implementation for {@link SchemasTable}. */
- private static class SchemasSplit implements Split {
+ private static class SchemasSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private SchemasSplit(long rowCount, Path location) {
- this.rowCount = rowCount;
+ private SchemasSplit(Path location) {
this.location = location;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
- @Override
public boolean equals(Object o) {
if (this == o) {
return true;
@@ -198,7 +186,7 @@ public class SchemasTable implements ReadonlyTable {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof SchemasSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index b8ff9cd8b..991129038 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -155,7 +156,7 @@ public class SnapshotsTable implements ReadonlyTable {
}
}
- private static class SnapshotsSplit implements Split {
+ private static class SnapshotsSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
@@ -165,12 +166,6 @@ public class SnapshotsTable implements ReadonlyTable {
this.location = location;
}
- @Override
- public long rowCount() {
- // dummy 1, just 1 parallelism
- return 1;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
index 682b5b775..600c85a6b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
@@ -43,7 +44,6 @@ import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;
-import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
@@ -125,33 +125,18 @@ public class StatisticTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- long rowCount;
- try {
- rowCount = new SnapshotManager(fileIO,
location).snapshotCount();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return () ->
- Collections.singletonList(
- new StatisticTable.StatisticSplit(rowCount,
location));
+ return () -> Collections.singletonList(new
StatisticTable.StatisticSplit(location));
}
}
- private static class StatisticSplit implements Split {
+ private static class StatisticSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private StatisticSplit(long rowCount, Path location) {
+ private StatisticSplit(Path location) {
this.location = location;
- this.rowCount = rowCount;
- }
-
- @Override
- public long rowCount() {
- return rowCount;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index 0c1c4aa29..9e311ae15 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.tag.Tag;
@@ -126,29 +127,20 @@ public class TagsTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () ->
- Collections.singletonList(
- new TagsSplit(new TagManager(fileIO,
location).tagCount(), location));
+ return () -> Collections.singletonList(new TagsSplit(location));
}
}
- private static class TagsSplit implements Split {
+ private static class TagsSplit extends SingletonSplit {
private static final long serialVersionUID = 1L;
- private final long rowCount;
private final Path location;
- private TagsSplit(long rowCount, Path location) {
- this.rowCount = rowCount;
+ private TagsSplit(Path location) {
this.location = location;
}
- @Override
- public long rowCount() {
- return rowCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {