This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 12b9686d3 [core] Introduce read-optimized system table (#2359)
12b9686d3 is described below
commit 12b9686d3ccce3902bcd085126fea9c6f760489a
Author: tsreaper <[email protected]>
AuthorDate: Wed Nov 22 16:59:22 2023 +0800
[core] Introduce read-optimized system table (#2359)
---
docs/content/how-to/system-tables.md | 23 ++++
docs/content/maintenance/read-performance.md | 4 +-
.../paimon/table/AbstractFileStoreTable.java | 2 +-
.../paimon/table/system/ReadOptimizedTable.java | 153 +++++++++++++++++++++
.../paimon/table/system/SystemTableLoader.java | 3 +
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 48 +++++++
.../apache/paimon/flink/CatalogTableITCase.java | 22 +++
7 files changed, 252 insertions(+), 3 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index 8f2f7615b..dbeb76dac 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -130,6 +130,29 @@ SELECT * FROM MyTable$audit_log;
*/
```
+### Read-optimized Table
+
+If you require extreme reading performance and can accept reading slightly old
data,
+you can use the `ro` (read-optimized) system table.
+Read-optimized system table improves reading performance by only scanning
files which does not need merging.
+
+For primary-key tables, `ro` system table only scans files on the topmost
level.
+That is to say, `ro` system table only produces the result of the latest full
compaction.
+
+{{< hint info >}}
+
+It is possible that different buckets carry out full compaction at difference
times,
+so it is possible that the values of different keys come from different
snapshots.
+
+{{< /hint >}}
+
+For append-only tables, as all files can be read without merging,
+`ro` system table acts like the normal append-only table.
+
+```sql
+SELECT * FROM MyTable$ro;
+```
+
### Files Table
You can query the files of the table with specific snapshot.
diff --git a/docs/content/maintenance/read-performance.md
b/docs/content/maintenance/read-performance.md
index 70b35d14e..796391c68 100644
--- a/docs/content/maintenance/read-performance.md
+++ b/docs/content/maintenance/read-performance.md
@@ -44,8 +44,8 @@ it still cannot catch up with the ordinary AppendOnly table.
If you want to query fast enough in certain scenarios, but can only find older
data, you can:
-1. Configure 'full-compaction.delta-commits', when writing data (currently
only Flink), full compaction will be performed periodically.
-2. Configure 'scan.mode' to 'compacted-full', when reading data, snapshot of
full compaction is picked. Read performance is good.
+1. Configure 'full-compaction.delta-commits' when writing data (currently only
in Flink). For streaming jobs, full compaction will then be performed
periodically; For batch jobs, full compaction will be carried out when the job
ends.
+2. Query from [read-optimized system table]({{< ref
"how-to/system-tables#read-optimized-table" >}}). Reading from results of full
compaction avoids merging records with the same key, thus improving reading
performance.
You can flexibly balance query performance and data latency when reading.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 840173ff3..7e206d787 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -155,7 +155,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
public abstract SplitGenerator splitGenerator();
- protected abstract boolean supportStreamingReadOverwrite();
+ public abstract boolean supportStreamingReadOverwrite();
public abstract BiConsumer<FileStoreScan, Predicate>
nonPartitionFilterConsumer();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
new file mode 100644
index 000000000..13d617cf8
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/**
+ * A {@link Table} optimized for reading by avoiding merging files.
+ *
+ * <ul>
+ * <li>For primary key tables, this system table only scans files on top
level.
+ * <li>For append only tables, as all files can be read without merging,
this system table does
+ * nothing special.
+ * </ul>
+ */
+public class ReadOptimizedTable implements DataTable, ReadonlyTable {
+
+ public static final String READ_OPTIMIZED = "ro";
+
+ private final AbstractFileStoreTable dataTable;
+
+ public ReadOptimizedTable(FileStoreTable dataTable) {
+ this.dataTable = (AbstractFileStoreTable) dataTable;
+ }
+
+ @Override
+ public String name() {
+ return dataTable.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
+ }
+
+ @Override
+ public RowType rowType() {
+ return dataTable.rowType();
+ }
+
+ @Override
+ public List<String> partitionKeys() {
+ return dataTable.partitionKeys();
+ }
+
+ @Override
+ public Map<String, String> options() {
+ return dataTable.options();
+ }
+
+ @Override
+ public List<String> primaryKeys() {
+ return dataTable.primaryKeys();
+ }
+
+ @Override
+ public SnapshotReader newSnapshotReader() {
+ if (dataTable.schema().primaryKeys().size() > 0) {
+ return dataTable
+ .newSnapshotReader()
+ .withLevelFilter(level -> level ==
coreOptions().numLevels() - 1);
+ } else {
+ return dataTable.newSnapshotReader();
+ }
+ }
+
+ @Override
+ public InnerTableScan newScan() {
+ return new InnerTableScanImpl(
+ coreOptions(),
+ newSnapshotReader(),
+ snapshotManager(),
+ DefaultValueAssigner.create(dataTable.schema()));
+ }
+
+ @Override
+ public InnerStreamTableScan newStreamScan() {
+ return new InnerStreamTableScanImpl(
+ coreOptions(),
+ newSnapshotReader(),
+ snapshotManager(),
+ dataTable.supportStreamingReadOverwrite(),
+ DefaultValueAssigner.create(dataTable.schema()));
+ }
+
+ @Override
+ public CoreOptions coreOptions() {
+ return dataTable.coreOptions();
+ }
+
+ @Override
+ public Path location() {
+ return dataTable.location();
+ }
+
+ @Override
+ public SnapshotManager snapshotManager() {
+ return dataTable.snapshotManager();
+ }
+
+ @Override
+ public TagManager tagManager() {
+ return dataTable.tagManager();
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return dataTable.newRead();
+ }
+
+ @Override
+ public Table copy(Map<String, String> dynamicOptions) {
+ return new ReadOptimizedTable(dataTable.copy(dynamicOptions));
+ }
+
+ @Override
+ public FileIO fileIO() {
+ return dataTable.fileIO();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 8b4048284..aa98d74aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -41,6 +41,7 @@ import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
+import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
import static
org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
@@ -73,6 +74,8 @@ public class SystemTableLoader {
return new TagsTable(fileIO, location);
case CONSUMERS:
return new ConsumersTable(fileIO, location);
+ case READ_OPTIMIZED:
+ return new ReadOptimizedTable(dataTable);
default:
return null;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index a06af7b74..e8115754d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -54,6 +54,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.system.AuditLogTable;
import org.apache.paimon.table.system.FileMonitorTable;
+import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -1101,6 +1102,53 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
commit.close();
}
+ @Test
+ public void testReadOptimizedTable() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
+ write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 11, 110L));
+ write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 201L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ ReadOptimizedTable roTable = new ReadOptimizedTable(table);
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+
+ SnapshotReader snapshotReader = roTable.newSnapshotReader();
+ TableRead read = roTable.newRead();
+ List<String> result =
+ getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ assertThat(result).isEmpty();
+
+ write.compact(binaryRow(1), 0, true);
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 10, 100]", "+I[1,
11, 110]");
+
+ write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 101L));
+ write.write(rowDataWithKind(RowKind.INSERT, 2, 21, 210L));
+ write.compact(binaryRow(2), 0, true);
+ commit.commit(3, write.prepareCommit(true, 3));
+
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "+I[1, 10, 100]", "+I[1, 11, 110]", "+I[2, 20, 201]",
"+I[2, 21, 210]");
+
+ write.close();
+ commit.close();
+ }
+
@Override
protected FileStoreTable createFileStoreTable(Consumer<Options> configure)
throws Exception {
return createFileStoreTable(configure, ROW_TYPE);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 4ded19c85..234656ee9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -773,4 +773,26 @@ public class CatalogTableITCase extends CatalogITCaseBase {
+ ") ")
.doesNotContain("schema");
}
+
+ @Test
+ public void testReadOptimizedTable() {
+ sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED)");
+
+ // full compaction will always be performed at the end of batch jobs,
as long as
+ // full-compaction.delta-commits is set, regardless of its value
+ sql(
+ "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' =
'100') */ VALUES (1, 10), (2, 20)");
+ List<Row> result = sql("SELECT k, v FROM T$ro ORDER BY k");
+ assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
+
+ // no compaction, so result of ro table does not change
+ sql("INSERT INTO T VALUES (1, 11), (3, 30)");
+ result = sql("SELECT k, v FROM T$ro ORDER BY k");
+ assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
+
+ sql(
+ "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' =
'100') */ VALUES (2, 21), (3, 31)");
+ result = sql("SELECT k, v FROM T$ro ORDER BY k");
+ assertThat(result).containsExactly(Row.of(1, 11), Row.of(2, 21),
Row.of(3, 31));
+ }
}