kaori-seasons opened a new pull request, #6380: URL: https://github.com/apache/paimon/pull/6380
### Purpose see: https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table # Paimon Chain Table Implementation Plan ## 1. Overview Chain Table is a new feature in Paimon designed to solve the problem of periodically storing full data in data warehouses. It optimizes storage and computation performance by dividing data into delta branches and snapshot branches. ### 1.1 Motivation In data warehouse systems, there is a typical scenario: periodically storing full data (e.g., daily or hourly). However, between consecutive time intervals, most of the data is redundant, with only a small amount of newly changed data. Traditional processing methods have the following issues: 1. **Full Computation**: Merge operations involve full computation and shuffle, resulting in poor performance 2. **Full Storage**: Storing full data every day, where newly changed data typically accounts for only a small percentage (e.g., 1%) Chain Table optimizes through the following approaches: 1. **Incremental Computation**: Offline ETL jobs only need to consume newly changed data for the day, without full merging 2. **Incremental Storage**: Only store newly changed data each day, with periodic asynchronous compaction to build a global chain table ## 2. Design Solution ### 2.1 Configuration Options | Key | Default | Type | Description | | --- | --- | --- | --- | | chain-table.enabled | false | Boolean | Whether to enable chain table | | scan.fallback-snapshot-branch | (none) | String | Snapshot branch when fallback to chain read as partition does not exist in the main branch | | scan.fallback-delta-branch | (none) | String | Delta branch when fallback to chain as partition does not exist in the main branch | ### 2.2 Solution Add two new branches on top of the warehouse: delta and snapshot, which describe newly changed data and full data generated by chain compaction respectively. #### 2.2.1 Table Structure 1. **Snapshot Branch**: Stores full data generated by chain compaction or bootstrap 2. **Delta Branch**: Stores newly changed data #### 2.2.2 Write Strategy Write data to the corresponding branch based on branch configuration, using partition 20250722 as an example: 1. **Incremental Write**: Write to t$branch_delta 2. **Full Write (Chain Compaction)**: Write to t$branch_snapshot #### 2.2.3 Read Strategy ##### Full Batch Read Adopt corresponding strategies based on whether the partition exists in the snapshot branch: 1. When querying partition 20250722, if the partition exists in t$branch_snapshot, read directly from t$branch_snapshot 2. When querying partition 20250726, if the partition does not exist in t$branch_snapshot, read the nearest full partition from t$branch_snapshot and all incremental partitions up to the current time from t$branch_delta ##### Incremental Batch Read Read incremental partitions directly from t$branch_delta. For example, when querying partition 20250722, read directly from t$branch_delta ##### Stream Read Read data directly from t$branch_delta #### 2.2.4 Chain Compaction Merge the incremental data of the current cycle with the full data of the previous cycle to generate the full data for the day. For example, the full data for date=20250729 is generated by merging all incremental partitions from 20250723 to 20250729 in t$branch_delta and the full data of 20250722 in t$branch_snapshot. ## 3. Implementation Plan ### 3.1 Core Class Design #### 3.1.1 ChainFileStoreTable Inherits from FallbackReadFileStoreTable, implementing chain table splitting and reading functionality. ```java public class ChainFileStoreTable extends FallbackReadFileStoreTable { private final FileStoreTable snapshotStoreTable; private final FileStoreTable deltaStoreTable; public ChainFileStoreTable( AbstractFileStoreTable snapshotStoreTable, AbstractFileStoreTable deltaStoreTable) { super(snapshotStoreTable, deltaStoreTable); this.snapshotStoreTable = snapshotStoreTable; this.deltaStoreTable = deltaStoreTable; } // Implement chain table specific scan and read logic @Override public DataTableScan newScan() { return new ChainTableBatchScan(snapshotStoreTable.newScan(), deltaStoreTable.newScan()); } @Override public InnerTableRead newRead() { return new ChainTableRead(snapshotStoreTable.newRead(), deltaStoreTable.newRead()); } } ``` #### 3.1.2 ChainTableBatchScan Implements batch scan logic for chain tables. ```java public class ChainTableBatchScan implements DataTableScan { private final DataTableScan snapshotScan; private final DataTableScan deltaScan; public ChainTableBatchScan(DataTableScan snapshotScan, DataTableScan deltaScan) { this.snapshotScan = snapshotScan; this.deltaScan = deltaScan; } @Override public TableScan.Plan plan() { // Implement chain table read strategy // 1. First read existing partitions from snapshot branch // 2. For non-existent partitions, get the nearest full data from snapshot branch // 3. Get incremental data for the corresponding time period from delta branch // 4. Merge data and return return new ChainTablePlan(snapshotScan.plan(), deltaScan.plan()); } } ``` #### 3.1.3 ChainTableRead Implements read logic for chain tables. ```java public class ChainTableRead implements InnerTableRead { private final InnerTableRead snapshotRead; private final InnerTableRead deltaRead; public ChainTableRead(InnerTableRead snapshotRead, InnerTableRead deltaRead) { this.snapshotRead = snapshotRead; this.deltaRead = deltaRead; } @Override public RecordReader<InternalRow> createReader(Split split) throws IOException { // Create corresponding reader based on split type if (split instanceof ChainTableSplit) { ChainTableSplit chainSplit = (ChainTableSplit) split; // Implement merged reading of chain table data return new ChainTableRecordReader(chainSplit, snapshotRead, deltaRead); } // Default case, call parent logic return snapshotRead.createReader(split); } } ``` ### 3.2 Configuration Implementation Add chain table related configurations in CoreOptions class: ```java // Chain Table related configurations public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED = key("chain-table.enabled") .booleanType() .defaultValue(false) .withDescription("Whether to enable chain table"); public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH = key("scan.fallback-snapshot-branch") .stringType() .noDefaultValue() .withDescription("Snapshot branch when fallback to chain read as partition does not exist in the main branch"); public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH = key("scan.fallback-delta-branch") .stringType() .noDefaultValue() .withDescription("Delta branch when fallback to chain as partition does not exist in the main branch"); ``` ### 3.3 Table Factory Modification Modify FileStoreTableFactory to support chain table creation: ```java public class FileStoreTableFactory { public static FileStoreTable create( FileIO fileIO, Path tablePath, TableSchema tableSchema, Options dynamicOptions, CatalogEnvironment catalogEnvironment) { // Check if chain table is enabled CoreOptions coreOptions = new CoreOptions(tableSchema.options()); if (coreOptions.chainTableEnabled()) { // Create chain table instance return createChainTable(fileIO, tablePath, tableSchema, dynamicOptions, catalogEnvironment); } // Original logic FileStoreTable table = createWithoutFallbackBranch( fileIO, tablePath, tableSchema, dynamicOptions, catalogEnvironment); Options options = new Options(table.options()); String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH); if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { // ... existing fallback branch logic } return table; } private static FileStoreTable createChainTable( FileIO fileIO, Path tablePath, TableSchema tableSchema, Options dynamicOptions, CatalogEnvironment catalogEnvironment) { CoreOptions coreOptions = new CoreOptions(tableSchema.options()); String snapshotBranch = coreOptions.scanFallbackSnapshotBranch(); String deltaBranch = coreOptions.scanFallbackDeltaBranch(); // Create snapshot branch table TableSchema snapshotSchema = tableSchema.copy( Collections.singletonMap(CoreOptions.BRANCH.key(), snapshotBranch)); FileStoreTable snapshotTable = createWithoutFallbackBranch( fileIO, tablePath, snapshotSchema, dynamicOptions, catalogEnvironment); // Create delta branch table TableSchema deltaSchema = tableSchema.copy( Collections.singletonMap(CoreOptions.BRANCH.key(), deltaBranch)); FileStoreTable deltaTable = createWithoutFallbackBranch( fileIO, tablePath, deltaSchema, dynamicOptions, catalogEnvironment); // Create chain table instance return new ChainFileStoreTable( (AbstractFileStoreTable) snapshotTable, (AbstractFileStoreTable) deltaTable); } } ``` ## 4. Usage Examples ### 4.1 Create Table ```sql CREATE TABLE default.t ( t1 string COMMENT 't1', t2 string COMMENT 't2', t3 string COMMENT 't3' ) PARTITIONED BY (date string COMMENT 'date') TBLPROPERTIES ( 'primary_key' = 'date,t1', 'bucket' = '2', 'bucket-key' = 't1', 'partition.timestamp-pattern' = '$date', 'partition.timestamp-formatter' = 'yyyyMMdd', 'chain-table.enabled' = 'true', 'scan.fallback-snapshot-branch' = 'snapshot', 'scan.fallback-delta-branch' = 'delta' ); ``` ### 4.2 Create Branches ```sql CALL sys.create_branch('default.t', 'snapshot'); CALL sys.create_branch('default.t', 'delta'); ``` ### 4.3 Write Data ```sql -- Full write INSERT INTO/overwrite `default`.`t$branch_snapshot` SELECT ... -- Incremental write INSERT INTO/overwrite `default`.`t$branch_delta` SELECT ... ``` ### 4.4 Read Data ```sql -- Full query SELECT * FROM default.t WHERE date = '${date}' -- Incremental query SELECT * FROM `default`.`t$branch_delta` WHERE date = '${date}' -- Hybrid query SELECT * FROM default.t WHERE date = '${date}' UNION ALL SELECT * FROM `default`.`t$branch_delta` WHERE date = '${date-1}' ``` <!-- What is the purpose of the change --> ### Tests <!-- List UT and IT cases to verify this change --> ### API and Format <!-- Does this change affect API or storage format --> ### Documentation <!-- Does this change introduce a new feature --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
