This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ba4576657e [core] Support to query indexes (#4788)
ba4576657e is described below
commit ba4576657edf1dde60ac321423d4fa6032169119
Author: yuzelin <[email protected]>
AuthorDate: Fri Dec 27 13:40:34 2024 +0800
[core] Support to query indexes (#4788)
---
docs/content/concepts/system-tables.md | 19 ++
.../paimon/table/system/SystemTableLoader.java | 2 +
.../paimon/table/system/TableIndexesTable.java | 238 +++++++++++++++++++++
.../org/apache/paimon/flink/SystemTableITCase.java | 32 +++
4 files changed, 291 insertions(+)
diff --git a/docs/content/concepts/system-tables.md
b/docs/content/concepts/system-tables.md
index 5795aea419..92119874e2 100644
--- a/docs/content/concepts/system-tables.md
+++ b/docs/content/concepts/system-tables.md
@@ -389,6 +389,25 @@ SELECT * FROM T$statistics;
*/
```
+### Table Indexes Table
+
+You can query the table's index files generated for dynamic bucket table
(index_type = HASH) and deletion vectors
+(index_type = DELETION_VECTORS) through indexes table.
+
+```sql
+SELECT * FROM my_table$table_indexes;
+
+/*
++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
+| partition | bucket |
index_type | file_name | file_size |
row_count | dv_ranges |
++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
+| [2024-10-01] | 0 |
HASH | index-70abfebf-149e-4796-9f... | 12 |
3 | <NULL> |
+| [2024-10-01] | 0 |
DELETION_VECTORS | index-633857e7-cdce-47d2-87... | 33 |
1 | [(data-346cb9c8-4032-4d66-a... |
++--------------------------------+-------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
+2 rows in set
+*/
+```
+
## Global System Table
Global system tables contain the statistical information of all the tables
exists in paimon. For convenient of searching, we create a reference system
database called `sys`.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index b77b72e412..57c3c2caac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -48,6 +48,7 @@ import static
org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
import static org.apache.paimon.table.system.StatisticTable.STATISTICS;
+import static org.apache.paimon.table.system.TableIndexesTable.TABLE_INDEXES;
import static org.apache.paimon.table.system.TagsTable.TAGS;
/** Loader to load system {@link Table}s. */
@@ -70,6 +71,7 @@ public class SystemTableLoader {
.put(AGGREGATION_FIELDS, AggregationFieldsTable::new)
.put(STATISTICS, StatisticTable::new)
.put(BINLOG, BinlogTable::new)
+ .put(TABLE_INDEXES, TableIndexesTable::new)
.build();
public static final List<String> SYSTEM_TABLES = new
ArrayList<>(SYSTEM_TABLE_LOADERS.keySet());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
new file mode 100644
index 0000000000..08731e768a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.ReadOnceTableScan;
+import org.apache.paimon.table.source.SingletonSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** A {@link Table} for showing indexes. */
+public class TableIndexesTable implements ReadonlyTable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TableIndexesTable.class);
+
+ public static final String TABLE_INDEXES = "table_indexes";
+
+ public static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "partition",
SerializationUtils.newStringType(true)),
+ new DataField(1, "bucket", new IntType(false)),
+ new DataField(2, "index_type",
newStringType(false)),
+ new DataField(3, "file_name",
newStringType(false)),
+ new DataField(4, "file_size", new
BigIntType(false)),
+ new DataField(5, "row_count", new
BigIntType(false)),
+ new DataField(
+ 6,
+ "dv_ranges",
+ new ArrayType(true,
DeletionVectorMeta.SCHEMA))));
+
+ private final FileStoreTable dataTable;
+
+ public TableIndexesTable(FileStoreTable dataTable) {
+ this.dataTable = dataTable;
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new IndexesScan();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new IndexesRead(dataTable);
+ }
+
+ @Override
+ public String name() {
+ return dataTable.name() + SYSTEM_TABLE_SPLITTER + TABLE_INDEXES;
+ }
+
+ @Override
+ public RowType rowType() {
+ return TABLE_TYPE;
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return Collections.singletonList("file_name");
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new TableIndexesTable(dataTable.copy(dynamicOptions));
+ }
+
+ private static class IndexesScan extends ReadOnceTableScan {
+
+ @Override
+ public InnerTableScan withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ protected Plan innerPlan() {
+ return () -> Collections.singletonList(new IndexesSplit());
+ }
+ }
+
+ private static class IndexesSplit extends SingletonSplit {
+
+ private static final long serialVersionUID = 1L;
+
+ private IndexesSplit() {}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
+ }
+
+ private static class IndexesRead implements InnerTableRead {
+
+ private RowType readType;
+
+ private final FileStoreTable dataTable;
+
+ public IndexesRead(FileStoreTable dataTable) {
+ this.dataTable = dataTable;
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ this.readType = readType;
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) {
+ if (!(split instanceof IndexesSplit)) {
+ throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
+ }
+ List<IndexManifestEntry> manifestFileMetas =
allIndexEntries(dataTable);
+
+ RowDataToObjectArrayConverter partitionConverter =
+ new
RowDataToObjectArrayConverter(dataTable.schema().logicalPartitionType());
+
+ Iterator<InternalRow> rows =
+ Iterators.transform(
+ manifestFileMetas.iterator(),
+ indexManifestEntry -> toRow(indexManifestEntry,
partitionConverter));
+ if (readType != null) {
+ rows =
+ Iterators.transform(
+ rows,
+ row ->
+ ProjectedRow.from(readType,
TableIndexesTable.TABLE_TYPE)
+ .replaceRow(row));
+ }
+ return new IteratorRecordReader<>(rows);
+ }
+
+ private InternalRow toRow(
+ IndexManifestEntry indexManifestEntry,
+ RowDataToObjectArrayConverter partitionConverter) {
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+ indexManifestEntry.indexFile().deletionVectorMetas();
+ return GenericRow.of(
+ BinaryString.fromString(
+ Arrays.toString(
+
partitionConverter.convert(indexManifestEntry.partition()))),
+ indexManifestEntry.bucket(),
+
BinaryString.fromString(indexManifestEntry.indexFile().indexType()),
+
BinaryString.fromString(indexManifestEntry.indexFile().fileName()),
+ indexManifestEntry.indexFile().fileSize(),
+ indexManifestEntry.indexFile().rowCount(),
+ dvMetas == null
+ ? null
+ :
IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()));
+ }
+ }
+
+ private static List<IndexManifestEntry> allIndexEntries(FileStoreTable
dataTable) {
+ IndexFileHandler indexFileHandler =
dataTable.store().newIndexFileHandler();
+ Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
+ if (snapshot == null) {
+ LOG.warn("Check if your snapshot is empty.");
+ return Collections.emptyList();
+ }
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest == null ||
!indexFileHandler.existsManifest(indexManifest)) {
+ LOG.warn("indexManifest doesn't exist.");
+ return Collections.emptyList();
+ }
+
+ return indexFileHandler.readManifest(indexManifest);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
index 771f4acc5e..98ec635e85 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
@@ -63,4 +63,36 @@ public class SystemTableITCase extends CatalogTableITCase {
Row.of("+I", new Integer[] {1}, new Integer[] {3}),
Row.of("+I", new Integer[] {2}, new Integer[] {2}));
}
+
+ @Test
+ public void testIndexesTable() {
+ sql(
+ "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt,
a) NOT ENFORCED)"
+ + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true')");
+ sql(
+ "INSERT INTO T VALUES ('2024-10-01', 1,
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
+ sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01',
3, 'c_new1')");
+
+ List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type
= 'HASH'");
+ assertThat(rows.size()).isEqualTo(1);
+ Row row = rows.get(0);
+ assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
+ assertThat(row.getField(1)).isEqualTo(0);
+ assertThat(row.getField(2)).isEqualTo("HASH");
+ assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+ assertThat(row.getField(4)).isEqualTo(12L);
+ assertThat(row.getField(5)).isEqualTo(3L);
+ assertThat(row.getField(6)).isNull();
+
+ rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type =
'DELETION_VECTORS'");
+ assertThat(rows.size()).isEqualTo(1);
+ row = rows.get(0);
+ assertThat(row.getField(0)).isEqualTo("[2024-10-01]");
+ assertThat(row.getField(1)).isEqualTo(0);
+ assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
+ assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+ assertThat(row.getField(4)).isEqualTo(33L);
+ assertThat(row.getField(5)).isEqualTo(1L);
+ assertThat(row.getField(6)).isNotNull();
+ }
}