This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 940c21435 [core] Support  system table which store partition related 
information  (#1646)
940c21435 is described below

commit 940c21435619342058a1f80a50c19e07ce449a13
Author: john <[email protected]>
AuthorDate: Sat Jul 29 22:15:15 2023 +0800

    [core] Support  system table which store partition related information  
(#1646)
---
 docs/content/how-to/system-tables.md               |  16 +
 .../paimon/table/system/PartitionsTable.java       | 340 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |   3 +
 .../apache/paimon/flink/CatalogTableITCase.java    |  45 +++
 4 files changed, 404 insertions(+)

diff --git a/docs/content/how-to/system-tables.md 
b/docs/content/how-to/system-tables.md
index f82d8d10e..e97ba55f3 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -230,3 +230,19 @@ SELECT * FROM MyTable$manifests /*+ 
OPTIONS('scan.snapshot-id'='1') */;
 1 rows in set
 */
 ```
+
+## Partitions Table
+
+You can query the partition files of the table.
+
+```sql
+SELECT * FROM MyTable$partitions;
+
+/*
++---------------+----------------+--------------------+
+|  partition    |   record_count |  file_size_in_bytes|
++---------------+----------------+--------------------+
+|  [1]          |           1    |             645    |
++---------------+----------------+--------------------+
+*/
+```
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
new file mode 100644
index 000000000..914036284
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.LazyGenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/** A {@link Table} for showing partitions info. */
+public class PartitionsTable implements ReadonlyTable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String PARTITIONS = "partitions";
+
+    public static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(0, "partition", 
SerializationUtils.newStringType(true)),
+                            new DataField(2, "record_count", new 
BigIntType(false)),
+                            new DataField(3, "file_size_in_bytes", new 
BigIntType(false))));
+
+    private final FileStoreTable storeTable;
+
+    public PartitionsTable(FileStoreTable storeTable) {
+        this.storeTable = storeTable;
+    }
+
+    @Override
+    public String name() {
+        return storeTable.name() + SYSTEM_TABLE_SPLITTER + PARTITIONS;
+    }
+
+    @Override
+    public RowType rowType() {
+        return TABLE_TYPE;
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return Collections.singletonList("file_path");
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        return new PartitionsScan(storeTable);
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new PartitionsRead(new SchemaManager(storeTable.fileIO(), 
storeTable.location()));
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new PartitionsTable(storeTable.copy(dynamicOptions));
+    }
+
+    private static class PartitionsScan extends ReadOnceTableScan {
+
+        private final FileStoreTable storeTable;
+
+        private PartitionsScan(FileStoreTable storeTable) {
+            this.storeTable = storeTable;
+        }
+
+        @Override
+        public InnerTableScan withFilter(Predicate predicate) {
+            // TODO
+            return this;
+        }
+
+        @Override
+        public Plan innerPlan() {
+            return () -> Collections.singletonList(new 
PartitionsSplit(storeTable));
+        }
+    }
+
+    private static class PartitionsSplit implements Split {
+
+        private static final long serialVersionUID = 1L;
+
+        private final FileStoreTable storeTable;
+
+        private Map<BinaryRow, Long> partitionRowCountMap =
+                new ConcurrentHashMap<BinaryRow, Long>();
+
+        private PartitionsSplit(FileStoreTable storeTable) {
+            this.storeTable = storeTable;
+        }
+
+        @Override
+        public long rowCount() {
+            TableScan.Plan plan = plan();
+            Map<BinaryRow, Long> currentPartitionMap =
+                    plan.splits().stream()
+                            .collect(
+                                    Collectors.groupingBy(
+                                            s -> ((DataSplit) s).partition(),
+                                            Collectors.summingLong(
+                                                    s -> ((DataSplit) 
s).dataFiles().size())));
+            currentPartitionMap.forEach((k, v) -> 
partitionRowCountMap.merge(k, v, Long::sum));
+            return partitionRowCountMap.values().stream().mapToLong(v -> 
v).sum();
+        }
+
+        private TableScan.Plan plan() {
+            return storeTable.newScan().plan();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PartitionsSplit that = (PartitionsSplit) o;
+            return Objects.equals(storeTable, that.storeTable);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(storeTable);
+        }
+    }
+
+    private static class PartitionsRead implements InnerTableRead {
+
+        private final SchemaManager schemaManager;
+
+        private int[][] projection;
+
+        private PartitionsRead(SchemaManager schemaManager) {
+            this.schemaManager = schemaManager;
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            // TODO
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withProjection(int[][] projection) {
+            this.projection = projection;
+            return this;
+        }
+
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            if (!(split instanceof PartitionsSplit)) {
+                throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
+            }
+            PartitionsSplit filesSplit = (PartitionsSplit) split;
+            FileStoreTable table = filesSplit.storeTable;
+            TableScan.Plan plan = filesSplit.plan();
+            if (plan.splits().isEmpty()) {
+                return new IteratorRecordReader<>(Collections.emptyIterator());
+            }
+            List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
+            RowDataToObjectArrayConverter partitionConverter =
+                    new 
RowDataToObjectArrayConverter(table.schema().logicalPartitionType());
+
+            for (Split dataSplit : plan.splits()) {
+                iteratorList.add(
+                        Iterators.transform(
+                                ((DataSplit) dataSplit).dataFiles().iterator(),
+                                file -> toRow((DataSplit) dataSplit, 
partitionConverter, file)));
+            }
+            Iterator<InternalRow> rows = 
Iterators.concat(iteratorList.iterator());
+            // Group by partition and sum the others
+            Iterator<InternalRow> resultRows = groupAndSum(rows);
+
+            if (projection != null) {
+                resultRows =
+                        Iterators.transform(
+                                resultRows, row -> 
ProjectedRow.from(projection).replaceRow(row));
+            }
+
+            return new IteratorRecordReader<>(resultRows);
+        }
+
+        private LazyGenericRow toRow(
+                DataSplit dataSplit,
+                RowDataToObjectArrayConverter partitionConverter,
+                DataFileMeta dataFileMeta) {
+
+            BinaryString partitionId =
+                    dataSplit.partition() == null
+                            ? null
+                            : BinaryString.fromString(
+                                    Arrays.toString(
+                                            
partitionConverter.convert(dataSplit.partition())));
+            @SuppressWarnings("unchecked")
+            Supplier<Object>[] fields =
+                    new Supplier[] {
+                        () -> partitionId, dataFileMeta::rowCount, 
dataFileMeta::fileSize
+                    };
+
+            return new LazyGenericRow(fields);
+        }
+    }
+
+    public static Iterator<InternalRow> groupAndSum(Iterator<InternalRow> 
rows) {
+        return new GroupedIterator(rows);
+    }
+
+    /** group by partition and sum the recordCount and fileBytes . */
+    static class GroupedIterator implements Iterator<InternalRow> {
+        private final Iterator<InternalRow> rows;
+        private final Map<BinaryString, Partition> groupedData;
+        private Iterator<Partition> resultIterator;
+
+        public GroupedIterator(Iterator<InternalRow> rows) {
+            this.rows = rows;
+            this.groupedData = new HashMap<>();
+            groupAndSum();
+        }
+
+        private void groupAndSum() {
+            while (rows.hasNext()) {
+                InternalRow row = rows.next();
+                BinaryString partitionId = row.getString(0);
+                long recordCount = row.getLong(1);
+                long fileSizeInBytes = row.getLong(2);
+
+                // Grouping and summing
+                if (groupedData.containsKey(partitionId)) {
+                    Partition rowData = groupedData.get(partitionId);
+                    rowData.recordCount += recordCount;
+                    rowData.fileSizeInBytes += fileSizeInBytes;
+                } else {
+                    groupedData.put(
+                            partitionId, new Partition(partitionId, 
recordCount, fileSizeInBytes));
+                }
+            }
+            resultIterator = groupedData.values().iterator();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return resultIterator.hasNext();
+        }
+
+        @Override
+        public InternalRow next() {
+            if (hasNext()) {
+                Partition partition = resultIterator.next();
+                return GenericRow.of(
+                        partition.partition, partition.recordCount, 
partition.fileSizeInBytes);
+            } else {
+                throw new NoSuchElementException("No more elements in the 
iterator.");
+            }
+        }
+    }
+
+    static class Partition {
+        private BinaryString partition;
+        private long recordCount;
+        private long fileSizeInBytes;
+
+        Partition(BinaryString partition, long recordCount, long 
fileSizeInBytes) {
+            this.partition = partition;
+            this.recordCount = recordCount;
+            this.fileSizeInBytes = fileSizeInBytes;
+        }
+
+        public long recordCount() {
+            return recordCount;
+        }
+
+        public long fileSize() {
+            return fileSizeInBytes;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 2bfc39fad..511886cb2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -30,6 +30,7 @@ import static 
org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
 import static org.apache.paimon.table.system.FilesTable.FILES;
 import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
+import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
 import static org.apache.paimon.table.system.TagsTable.TAGS;
@@ -49,6 +50,8 @@ public class SystemTableLoader {
                 return new OptionsTable(fileIO, location);
             case SCHEMAS:
                 return new SchemasTable(fileIO, location);
+            case PARTITIONS:
+                return new PartitionsTable(dataTable);
             case AUDIT_LOG:
                 return new AuditLogTable(dataTable);
             case FILES:
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e38bea757..7befc61af 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -547,4 +547,49 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         List<Row> result = sql("SELECT * FROM T$consumers");
         assertThat(result).containsExactly(Row.of("my1", 3L));
     }
+
+    @Test
+    public void testPartitionsTable() throws Exception {
+        sql(
+                "CREATE TABLE T_VALUE_COUNT (a INT, p INT, b BIGINT, c STRING) 
"
+                        + "PARTITIONED BY (p) "
+                        + "WITH ('write-mode'='change-log')"); // change log 
with value count table
+        assertFilesTable("T_VALUE_COUNT");
+
+        sql(
+                "CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING, 
PRIMARY KEY (a, p) NOT ENFORCED) "
+                        + "PARTITIONED BY (p) "
+                        + "WITH ('write-mode'='change-log')"); // change log 
with key table
+        assertFilesTable("T_WITH_KEY");
+
+        sql(
+                "CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING) 
"
+                        + "PARTITIONED BY (p) "
+                        + "WITH ('write-mode'='append-only')"); // append only 
table
+        assertPartitionsTable("T_APPEND_ONLY");
+    }
+
+    private void assertPartitionsTable(String tableName) throws Exception {
+        assertThat(sql(String.format("SELECT * FROM %s$partitions", 
tableName))).isEmpty();
+        // TODO should use sql for schema evolution after flink supports it.
+        SchemaManager schemaManager =
+                new SchemaManager(
+                        LocalFileIO.create(),
+                        new Path(path, String.format("default.db/%s", 
tableName)));
+        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 
'S1')", tableName));
+        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 
'S4')", tableName));
+        List<Row> rows1 = sql(String.format("SELECT * FROM %s$partitions", 
tableName));
+        for (Row row : rows1) {
+            assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]");
+            assertThat((long) row.getField(2)).isGreaterThan(0L); // check 
file size
+        }
+
+        sql(String.format("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 
'S4')", tableName));
+        sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 
'S4')", tableName));
+
+        List<Row> rows2 = sql(String.format("SELECT * FROM %s$partitions", 
tableName));
+        for (Row row : rows2) {
+            assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]", 
"[3]", "[4]");
+        }
+    }
 }

Reply via email to