This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 12b9686d3 [core] Introduce read-optimized system table (#2359)
12b9686d3 is described below

commit 12b9686d3ccce3902bcd085126fea9c6f760489a
Author: tsreaper <[email protected]>
AuthorDate: Wed Nov 22 16:59:22 2023 +0800

    [core] Introduce read-optimized system table (#2359)
---
 docs/content/how-to/system-tables.md               |  23 ++++
 docs/content/maintenance/read-performance.md       |   4 +-
 .../paimon/table/AbstractFileStoreTable.java       |   2 +-
 .../paimon/table/system/ReadOptimizedTable.java    | 153 +++++++++++++++++++++
 .../paimon/table/system/SystemTableLoader.java     |   3 +
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  48 +++++++
 .../apache/paimon/flink/CatalogTableITCase.java    |  22 +++
 7 files changed, 252 insertions(+), 3 deletions(-)

diff --git a/docs/content/how-to/system-tables.md 
b/docs/content/how-to/system-tables.md
index 8f2f7615b..dbeb76dac 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -130,6 +130,29 @@ SELECT * FROM MyTable$audit_log;
 */
 ```
 
+### Read-optimized Table
+
+If you require extreme reading performance and can accept reading slightly old 
data,
+you can use the `ro` (read-optimized) system table.
+Read-optimized system table improves reading performance by only scanning 
files which does not need merging.
+
+For primary-key tables, `ro` system table only scans files on the topmost 
level.
+That is to say, `ro` system table only produces the result of the latest full 
compaction.
+
+{{< hint info >}}
+
+It is possible that different buckets carry out full compaction at difference 
times,
+so it is possible that the values of different keys come from different 
snapshots.
+
+{{< /hint >}}
+
+For append-only tables, as all files can be read without merging,
+`ro` system table acts like the normal append-only table.
+
+```sql
+SELECT * FROM MyTable$ro;
+```
+
 ### Files Table
 You can query the files of the table with specific snapshot.
 
diff --git a/docs/content/maintenance/read-performance.md 
b/docs/content/maintenance/read-performance.md
index 70b35d14e..796391c68 100644
--- a/docs/content/maintenance/read-performance.md
+++ b/docs/content/maintenance/read-performance.md
@@ -44,8 +44,8 @@ it still cannot catch up with the ordinary AppendOnly table.
 
 If you want to query fast enough in certain scenarios, but can only find older 
data, you can:
 
-1. Configure 'full-compaction.delta-commits', when writing data (currently 
only Flink), full compaction will be performed periodically.
-2. Configure 'scan.mode' to 'compacted-full', when reading data, snapshot of 
full compaction is picked. Read performance is good.
+1. Configure 'full-compaction.delta-commits' when writing data (currently only 
in Flink). For streaming jobs, full compaction will then be performed 
periodically; For batch jobs, full compaction will be carried out when the job 
ends.
+2. Query from [read-optimized system table]({{< ref 
"how-to/system-tables#read-optimized-table" >}}). Reading from results of full 
compaction avoids merging records with the same key, thus improving reading 
performance.
 
 You can flexibly balance query performance and data latency when reading.
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 840173ff3..7e206d787 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -155,7 +155,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     public abstract SplitGenerator splitGenerator();
 
-    protected abstract boolean supportStreamingReadOverwrite();
+    public abstract boolean supportStreamingReadOverwrite();
 
     public abstract BiConsumer<FileStoreScan, Predicate> 
nonPartitionFilterConsumer();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
new file mode 100644
index 000000000..13d617cf8
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.ReadonlyTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
+
+/**
+ * A {@link Table} optimized for reading by avoiding merging files.
+ *
+ * <ul>
+ *   <li>For primary key tables, this system table only scans files on top 
level.
+ *   <li>For append only tables, as all files can be read without merging, 
this system table does
+ *       nothing special.
+ * </ul>
+ */
+public class ReadOptimizedTable implements DataTable, ReadonlyTable {
+
+    public static final String READ_OPTIMIZED = "ro";
+
+    private final AbstractFileStoreTable dataTable;
+
+    public ReadOptimizedTable(FileStoreTable dataTable) {
+        this.dataTable = (AbstractFileStoreTable) dataTable;
+    }
+
+    @Override
+    public String name() {
+        return dataTable.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED;
+    }
+
+    @Override
+    public RowType rowType() {
+        return dataTable.rowType();
+    }
+
+    @Override
+    public List<String> partitionKeys() {
+        return dataTable.partitionKeys();
+    }
+
+    @Override
+    public Map<String, String> options() {
+        return dataTable.options();
+    }
+
+    @Override
+    public List<String> primaryKeys() {
+        return dataTable.primaryKeys();
+    }
+
+    @Override
+    public SnapshotReader newSnapshotReader() {
+        if (dataTable.schema().primaryKeys().size() > 0) {
+            return dataTable
+                    .newSnapshotReader()
+                    .withLevelFilter(level -> level == 
coreOptions().numLevels() - 1);
+        } else {
+            return dataTable.newSnapshotReader();
+        }
+    }
+
+    @Override
+    public InnerTableScan newScan() {
+        return new InnerTableScanImpl(
+                coreOptions(),
+                newSnapshotReader(),
+                snapshotManager(),
+                DefaultValueAssigner.create(dataTable.schema()));
+    }
+
+    @Override
+    public InnerStreamTableScan newStreamScan() {
+        return new InnerStreamTableScanImpl(
+                coreOptions(),
+                newSnapshotReader(),
+                snapshotManager(),
+                dataTable.supportStreamingReadOverwrite(),
+                DefaultValueAssigner.create(dataTable.schema()));
+    }
+
+    @Override
+    public CoreOptions coreOptions() {
+        return dataTable.coreOptions();
+    }
+
+    @Override
+    public Path location() {
+        return dataTable.location();
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        return dataTable.snapshotManager();
+    }
+
+    @Override
+    public TagManager tagManager() {
+        return dataTable.tagManager();
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return dataTable.newRead();
+    }
+
+    @Override
+    public Table copy(Map<String, String> dynamicOptions) {
+        return new ReadOptimizedTable(dataTable.copy(dynamicOptions));
+    }
+
+    @Override
+    public FileIO fileIO() {
+        return dataTable.fileIO();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index 8b4048284..aa98d74aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -41,6 +41,7 @@ import static org.apache.paimon.table.system.FilesTable.FILES;
 import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
 import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS;
+import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED;
 import static org.apache.paimon.table.system.SchemasTable.SCHEMAS;
 import static 
org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE;
 import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS;
@@ -73,6 +74,8 @@ public class SystemTableLoader {
                 return new TagsTable(fileIO, location);
             case CONSUMERS:
                 return new ConsumersTable(fileIO, location);
+            case READ_OPTIMIZED:
+                return new ReadOptimizedTable(dataTable);
             default:
                 return null;
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index a06af7b74..e8115754d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -54,6 +54,7 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.system.AuditLogTable;
 import org.apache.paimon.table.system.FileMonitorTable;
+import org.apache.paimon.table.system.ReadOptimizedTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -1101,6 +1102,53 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         commit.close();
     }
 
+    @Test
+    public void testReadOptimizedTable() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
+        write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 11, 110L));
+        write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 201L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        ReadOptimizedTable roTable = new ReadOptimizedTable(table);
+        Function<InternalRow, String> rowDataToString =
+                row ->
+                        internalRowToString(
+                                row,
+                                DataTypes.ROW(
+                                        DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()));
+
+        SnapshotReader snapshotReader = roTable.newSnapshotReader();
+        TableRead read = roTable.newRead();
+        List<String> result =
+                getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        assertThat(result).isEmpty();
+
+        write.compact(binaryRow(1), 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 10, 100]", "+I[1, 
11, 110]");
+
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 101L));
+        write.write(rowDataWithKind(RowKind.INSERT, 2, 21, 210L));
+        write.compact(binaryRow(2), 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 10, 100]", "+I[1, 11, 110]", "+I[2, 20, 201]", 
"+I[2, 21, 210]");
+
+        write.close();
+        commit.close();
+    }
+
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Options> configure) 
throws Exception {
         return createFileStoreTable(configure, ROW_TYPE);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 4ded19c85..234656ee9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -773,4 +773,26 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                 + ") ")
                 .doesNotContain("schema");
     }
+
+    @Test
+    public void testReadOptimizedTable() {
+        sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED)");
+
+        // full compaction will always be performed at the end of batch jobs, 
as long as
+        // full-compaction.delta-commits is set, regardless of its value
+        sql(
+                "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' = 
'100') */ VALUES (1, 10), (2, 20)");
+        List<Row> result = sql("SELECT k, v FROM T$ro ORDER BY k");
+        assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
+
+        // no compaction, so result of ro table does not change
+        sql("INSERT INTO T VALUES (1, 11), (3, 30)");
+        result = sql("SELECT k, v FROM T$ro ORDER BY k");
+        assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
+
+        sql(
+                "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' = 
'100') */ VALUES (2, 21), (3, 31)");
+        result = sql("SELECT k, v FROM T$ro ORDER BY k");
+        assertThat(result).containsExactly(Row.of(1, 11), Row.of(2, 21), 
Row.of(3, 31));
+    }
 }

Reply via email to