This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 940c21435 [core] Support system table which store partition related
information (#1646)
940c21435 is described below
commit 940c21435619342058a1f80a50c19e07ce449a13
Author: john <[email protected]>
AuthorDate: Sat Jul 29 22:15:15 2023 +0800
[core] Support system table which store partition related information
(#1646)
---
docs/content/how-to/system-tables.md | 16 +
.../paimon/table/system/PartitionsTable.java | 340 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 3 +
.../apache/paimon/flink/CatalogTableITCase.java | 45 +++
4 files changed, 404 insertions(+)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index f82d8d10e..e97ba55f3 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -230,3 +230,19 @@ SELECT * FROM MyTable$manifests /*+
OPTIONS('scan.snapshot-id'='1') */;
1 rows in set
*/
```
+
+## Partitions Table
+
+You can query the partition files of the table.
+
+```sql
+SELECT * FROM MyTable$partitions;
+
+/*
++---------------+----------------+--------------------+
+| partition | record_count | file_size_in_bytes|
++---------------+----------------+--------------------+
+| [1] | 1 | 645 |
++---------------+----------------+--------------------+
+*/
+```
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
new file mode 100644
index 000000000..914036284
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.LazyGenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for showing partitions info. */
+public class PartitionsTable implements ReadonlyTable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String PARTITIONS = "partitions";
+
+ public static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "partition",
SerializationUtils.newStringType(true)),
+ new DataField(2, "record_count", new
BigIntType(false)),
+ new DataField(3, "file_size_in_bytes", new
BigIntType(false))));
+
+ private final FileStoreTable storeTable;
+
+ public PartitionsTable(FileStoreTable storeTable) {
+ this.storeTable = storeTable;
+ }
+
+ @Override
+ public String name() {
+ return storeTable.name() + SYSTEM_TABLE_SPLITTER + PARTITIONS;
+ }
+
+ @Override
+ public RowType rowType() {
+ return TABLE_TYPE;
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.singletonList("file_path");
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new PartitionsScan(storeTable);
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new PartitionsRead(new SchemaManager(storeTable.fileIO(),
storeTable.location()));
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new PartitionsTable(storeTable.copy(dynamicOptions));
+ }
+
+ private static class PartitionsScan extends ReadOnceTableScan {
+
+ private final FileStoreTable storeTable;
+
+ private PartitionsScan(FileStoreTable storeTable) {
+ this.storeTable = storeTable;
+ }
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ // TODO
+ return this;
+ }
+
+ @Override
+ public Plan innerPlan() {
+ return () -> Collections.singletonList(new
PartitionsSplit(storeTable));
+ }
+ }
+
+ private static class PartitionsSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable storeTable;
+
+ private Map<BinaryRow, Long> partitionRowCountMap =
+ new ConcurrentHashMap<BinaryRow, Long>();
+
+ private PartitionsSplit(FileStoreTable storeTable) {
+ this.storeTable = storeTable;
+ }
+
+ @Override
+ public long rowCount() {
+ TableScan.Plan plan = plan();
+ Map<BinaryRow, Long> currentPartitionMap =
+ plan.splits().stream()
+ .collect(
+ Collectors.groupingBy(
+ s -> ((DataSplit) s).partition(),
+ Collectors.summingLong(
+ s -> ((DataSplit)
s).dataFiles().size())));
+ currentPartitionMap.forEach((k, v) ->
partitionRowCountMap.merge(k, v, Long::sum));
+ return partitionRowCountMap.values().stream().mapToLong(v ->
v).sum();
+ }
+
+ private TableScan.Plan plan() {
+ return storeTable.newScan().plan();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionsSplit that = (PartitionsSplit) o;
+ return Objects.equals(storeTable, that.storeTable);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(storeTable);
+ }
+ }
+
+ private static class PartitionsRead implements InnerTableRead {
+
+ private final SchemaManager schemaManager;
+
+ private int[][] projection;
+
+ private PartitionsRead(SchemaManager schemaManager) {
+ this.schemaManager = schemaManager;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ // TODO
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ this.projection = projection;
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ if (!(split instanceof PartitionsSplit)) {
+ throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
+ }
+ PartitionsSplit filesSplit = (PartitionsSplit) split;
+ FileStoreTable table = filesSplit.storeTable;
+ TableScan.Plan plan = filesSplit.plan();
+ if (plan.splits().isEmpty()) {
+ return new IteratorRecordReader<>(Collections.emptyIterator());
+ }
+ List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
+ RowDataToObjectArrayConverter partitionConverter =
+ new
RowDataToObjectArrayConverter(table.schema().logicalPartitionType());
+
+ for (Split dataSplit : plan.splits()) {
+ iteratorList.add(
+ Iterators.transform(
+ ((DataSplit) dataSplit).dataFiles().iterator(),
+ file -> toRow((DataSplit) dataSplit,
partitionConverter, file)));
+ }
+ Iterator<InternalRow> rows =
Iterators.concat(iteratorList.iterator());
+ // Group by partition and sum the others
+ Iterator<InternalRow> resultRows = groupAndSum(rows);
+
+ if (projection != null) {
+ resultRows =
+ Iterators.transform(
+ resultRows, row ->
ProjectedRow.from(projection).replaceRow(row));
+ }
+
+ return new IteratorRecordReader<>(resultRows);
+ }
+
+ private LazyGenericRow toRow(
+ DataSplit dataSplit,
+ RowDataToObjectArrayConverter partitionConverter,
+ DataFileMeta dataFileMeta) {
+
+ BinaryString partitionId =
+ dataSplit.partition() == null
+ ? null
+ : BinaryString.fromString(
+ Arrays.toString(
+
partitionConverter.convert(dataSplit.partition())));
+ @SuppressWarnings("unchecked")
+ Supplier<Object>[] fields =
+ new Supplier[] {
+ () -> partitionId, dataFileMeta::rowCount,
dataFileMeta::fileSize
+ };
+
+ return new LazyGenericRow(fields);
+ }
+ }
+
+ public static Iterator<InternalRow> groupAndSum(Iterator<InternalRow>
rows) {
+ return new GroupedIterator(rows);
+ }
+
+ /** group by partition and sum the recordCount and fileBytes . */
+ static class GroupedIterator implements Iterator<InternalRow> {
+ private final Iterator<InternalRow> rows;
+ private final Map<BinaryString, Partition> groupedData;
+ private Iterator<Partition> resultIterator;
+
+ public GroupedIterator(Iterator<InternalRow> rows) {
+ this.rows = rows;
+ this.groupedData = new HashMap<>();
+ groupAndSum();
+ }
+
+ private void groupAndSum() {
+ while (rows.hasNext()) {
+ InternalRow row = rows.next();
+ BinaryString partitionId = row.getString(0);
+ long recordCount = row.getLong(1);
+ long fileSizeInBytes = row.getLong(2);
+
+ // Grouping and summing
+ if (groupedData.containsKey(partitionId)) {
+ Partition rowData = groupedData.get(partitionId);
+ rowData.recordCount += recordCount;
+ rowData.fileSizeInBytes += fileSizeInBytes;
+ } else {
+ groupedData.put(
+ partitionId, new Partition(partitionId,
recordCount, fileSizeInBytes));
+ }
+ }
+ resultIterator = groupedData.values().iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return resultIterator.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ if (hasNext()) {
+ Partition partition = resultIterator.next();
+ return GenericRow.of(
+ partition.partition, partition.recordCount,
partition.fileSizeInBytes);
+ } else {
+ throw new NoSuchElementException("No more elements in the
iterator.");
+ }
+ }
+ }
+
+ static class Partition {
+ private BinaryString partition;
+ private long recordCount;
+ private long fileSizeInBytes;
+
+ Partition(BinaryString partition, long recordCount, long
fileSizeInBytes) {
+ this.partition = partition;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ }
+
+ public long recordCount() {
+ return recordCount;
+ }
+
+ public long fileSize() {
+ return fileSizeInBytes;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 2bfc39fad..511886cb2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -30,6 +30,7 @@ import static
org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
+import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static org.apache.paimon.table.system.TagsTable.TAGS;
@@ -49,6 +50,8 @@ public class SystemTableLoader {
return new OptionsTable(fileIO, location);
case SCHEMAS:
return new SchemasTable(fileIO, location);
+ case PARTITIONS:
+ return new PartitionsTable(dataTable);
case AUDIT_LOG:
return new AuditLogTable(dataTable);
case FILES:
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e38bea757..7befc61af 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -547,4 +547,49 @@ public class CatalogTableITCase extends CatalogITCaseBase {
List<Row> result = sql("SELECT * FROM T$consumers");
assertThat(result).containsExactly(Row.of("my1", 3L));
}
+
+ @Test
+ public void testPartitionsTable() throws Exception {
+ sql(
+ "CREATE TABLE T_VALUE_COUNT (a INT, p INT, b BIGINT, c STRING)
"
+ + "PARTITIONED BY (p) "
+ + "WITH ('write-mode'='change-log')"); // change log
with value count table
+ assertFilesTable("T_VALUE_COUNT");
+
+ sql(
+ "CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING,
PRIMARY KEY (a, p) NOT ENFORCED) "
+ + "PARTITIONED BY (p) "
+ + "WITH ('write-mode'='change-log')"); // change log
with key table
+ assertFilesTable("T_WITH_KEY");
+
+ sql(
+ "CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING)
"
+ + "PARTITIONED BY (p) "
+ + "WITH ('write-mode'='append-only')"); // append only
table
+ assertPartitionsTable("T_APPEND_ONLY");
+ }
+
+ private void assertPartitionsTable(String tableName) throws Exception {
+ assertThat(sql(String.format("SELECT * FROM %s$partitions",
tableName))).isEmpty();
+ // TODO should use sql for schema evolution after flink supports it.
+ SchemaManager schemaManager =
+ new SchemaManager(
+ LocalFileIO.create(),
+ new Path(path, String.format("default.db/%s",
tableName)));
+ sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2,
'S1')", tableName));
+ sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2,
'S4')", tableName));
+ List<Row> rows1 = sql(String.format("SELECT * FROM %s$partitions",
tableName));
+ for (Row row : rows1) {
+ assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]");
+ assertThat((long) row.getField(2)).isGreaterThan(0L); // check
file size
+ }
+
+ sql(String.format("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2,
'S4')", tableName));
+ sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2,
'S4')", tableName));
+
+ List<Row> rows2 = sql(String.format("SELECT * FROM %s$partitions",
tableName));
+ for (Row row : rows2) {
+ assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]",
"[3]", "[4]");
+ }
+ }
}