kaori-seasons opened a new pull request, #6380:
URL: https://github.com/apache/paimon/pull/6380

   ### Purpose
   
   see: 
https://cwiki.apache.org/confluence/display/PAIMON/PIP-37%3A+Introduce+Chain+Table
   
   # Paimon Chain Table Implementation Plan
   
   ## 1. Overview
   
   Chain Table is a new feature in Paimon designed to solve the problem of 
periodically storing full data in data warehouses. It optimizes storage and 
computation performance by dividing data into delta branches and snapshot 
branches.
   
   ### 1.1 Motivation
   
   In data warehouse systems, there is a typical scenario: periodically storing 
full data (e.g., daily or hourly). However, between consecutive time intervals, 
most of the data is redundant, with only a small amount of newly changed data. 
Traditional processing methods have the following issues:
   
   1. **Full Computation**: Merge operations involve full computation and 
shuffle, resulting in poor performance
   2. **Full Storage**: Storing full data every day, where newly changed data 
typically accounts for only a small percentage (e.g., 1%)
   
   Chain Table optimizes through the following approaches:
   
   1. **Incremental Computation**: Offline ETL jobs only need to consume newly 
changed data for the day, without full merging
   2. **Incremental Storage**: Only store newly changed data each day, with 
periodic asynchronous compaction to build a global chain table
   
   ## 2. Design Solution
   
   ### 2.1 Configuration Options
   
   | Key | Default | Type | Description |
   | --- | --- | --- | --- |
   | chain-table.enabled | false | Boolean | Whether to enable chain table |
   | scan.fallback-snapshot-branch | (none) | String | Snapshot branch when 
fallback to chain read as partition does not exist in the main branch |
   | scan.fallback-delta-branch | (none) | String | Delta branch when fallback 
to chain as partition does not exist in the main branch |
   
   ### 2.2 Solution
   
   Add two new branches on top of the warehouse: delta and snapshot, which 
describe newly changed data and full data generated by chain compaction 
respectively.
   
   #### 2.2.1 Table Structure
   
   1. **Snapshot Branch**: Stores full data generated by chain compaction or 
bootstrap
   2. **Delta Branch**: Stores newly changed data
   
   #### 2.2.2 Write Strategy
   
   Write data to the corresponding branch based on branch configuration, using 
partition 20250722 as an example:
   
   1. **Incremental Write**: Write to t$branch_delta
   2. **Full Write (Chain Compaction)**: Write to t$branch_snapshot
   
   #### 2.2.3 Read Strategy
   
   ##### Full Batch Read
   
   Adopt corresponding strategies based on whether the partition exists in the 
snapshot branch:
   
   1. When querying partition 20250722, if the partition exists in 
t$branch_snapshot, read directly from t$branch_snapshot
   2. When querying partition 20250726, if the partition does not exist in 
t$branch_snapshot, read the nearest full partition from t$branch_snapshot and 
all incremental partitions up to the current time from t$branch_delta
   
   ##### Incremental Batch Read
   
   Read incremental partitions directly from t$branch_delta. For example, when 
querying partition 20250722, read directly from t$branch_delta
   
   ##### Stream Read
   
   Read data directly from t$branch_delta
   
   #### 2.2.4 Chain Compaction
   
   Merge the incremental data of the current cycle with the full data of the 
previous cycle to generate the full data for the day. For example, the full 
data for date=20250729 is generated by merging all incremental partitions from 
20250723 to 20250729 in t$branch_delta and the full data of 20250722 in 
t$branch_snapshot.
   
   ## 3. Implementation Plan
   
   ### 3.1 Core Class Design
   
   #### 3.1.1 ChainFileStoreTable
   
   Inherits from FallbackReadFileStoreTable, implementing chain table splitting 
and reading functionality.
   
   ```java
   public class ChainFileStoreTable extends FallbackReadFileStoreTable {
       private final FileStoreTable snapshotStoreTable;
       private final FileStoreTable deltaStoreTable;
       
       public ChainFileStoreTable(
               AbstractFileStoreTable snapshotStoreTable, 
               AbstractFileStoreTable deltaStoreTable) {
           super(snapshotStoreTable, deltaStoreTable);
           this.snapshotStoreTable = snapshotStoreTable;
           this.deltaStoreTable = deltaStoreTable;
       }
       
       // Implement chain table specific scan and read logic
       @Override
       public DataTableScan newScan() {
           return new ChainTableBatchScan(snapshotStoreTable.newScan(), 
deltaStoreTable.newScan());
       }
       
       @Override
       public InnerTableRead newRead() {
           return new ChainTableRead(snapshotStoreTable.newRead(), 
deltaStoreTable.newRead());
       }
   }
   ```
   
   #### 3.1.2 ChainTableBatchScan
   
   Implements batch scan logic for chain tables.
   
   ```java
   public class ChainTableBatchScan implements DataTableScan {
       private final DataTableScan snapshotScan;
       private final DataTableScan deltaScan;
       
       public ChainTableBatchScan(DataTableScan snapshotScan, DataTableScan 
deltaScan) {
           this.snapshotScan = snapshotScan;
           this.deltaScan = deltaScan;
       }
       
       @Override
       public TableScan.Plan plan() {
           // Implement chain table read strategy
           // 1. First read existing partitions from snapshot branch
           // 2. For non-existent partitions, get the nearest full data from 
snapshot branch
           // 3. Get incremental data for the corresponding time period from 
delta branch
           // 4. Merge data and return
           return new ChainTablePlan(snapshotScan.plan(), deltaScan.plan());
       }
   }
   ```
   
   #### 3.1.3 ChainTableRead
   
   Implements read logic for chain tables.
   
   ```java
   public class ChainTableRead implements InnerTableRead {
       private final InnerTableRead snapshotRead;
       private final InnerTableRead deltaRead;
       
       public ChainTableRead(InnerTableRead snapshotRead, InnerTableRead 
deltaRead) {
           this.snapshotRead = snapshotRead;
           this.deltaRead = deltaRead;
       }
       
       @Override
       public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
           // Create corresponding reader based on split type
           if (split instanceof ChainTableSplit) {
               ChainTableSplit chainSplit = (ChainTableSplit) split;
               // Implement merged reading of chain table data
               return new ChainTableRecordReader(chainSplit, snapshotRead, 
deltaRead);
           }
           // Default case, call parent logic
           return snapshotRead.createReader(split);
       }
   }
   ```
   
   ### 3.2 Configuration Implementation
   
   Add chain table related configurations in CoreOptions class:
   
   ```java
   // Chain Table related configurations
   public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
       key("chain-table.enabled")
           .booleanType()
           .defaultValue(false)
           .withDescription("Whether to enable chain table");
   
   public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
       key("scan.fallback-snapshot-branch")
           .stringType()
           .noDefaultValue()
           .withDescription("Snapshot branch when fallback to chain read as 
partition does not exist in the main branch");
   
   public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
       key("scan.fallback-delta-branch")
           .stringType()
           .noDefaultValue()
           .withDescription("Delta branch when fallback to chain as partition 
does not exist in the main branch");
   ```
   
   ### 3.3 Table Factory Modification
   
   Modify FileStoreTableFactory to support chain table creation:
   
   ```java
   public class FileStoreTableFactory {
       public static FileStoreTable create(
               FileIO fileIO,
               Path tablePath,
               TableSchema tableSchema,
               Options dynamicOptions,
               CatalogEnvironment catalogEnvironment) {
           
           // Check if chain table is enabled
           CoreOptions coreOptions = new CoreOptions(tableSchema.options());
           if (coreOptions.chainTableEnabled()) {
               // Create chain table instance
               return createChainTable(fileIO, tablePath, tableSchema, 
dynamicOptions, catalogEnvironment);
           }
           
           // Original logic
           FileStoreTable table =
                   createWithoutFallbackBranch(
                           fileIO, tablePath, tableSchema, dynamicOptions, 
catalogEnvironment);
   
           Options options = new Options(table.options());
           String fallbackBranch = 
options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
           if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
               // ... existing fallback branch logic
           }
   
           return table;
       }
       
       private static FileStoreTable createChainTable(
               FileIO fileIO,
               Path tablePath,
               TableSchema tableSchema,
               Options dynamicOptions,
               CatalogEnvironment catalogEnvironment) {
           
           CoreOptions coreOptions = new CoreOptions(tableSchema.options());
           String snapshotBranch = coreOptions.scanFallbackSnapshotBranch();
           String deltaBranch = coreOptions.scanFallbackDeltaBranch();
           
           // Create snapshot branch table
           TableSchema snapshotSchema = tableSchema.copy(
               Collections.singletonMap(CoreOptions.BRANCH.key(), 
snapshotBranch));
           FileStoreTable snapshotTable = createWithoutFallbackBranch(
                   fileIO, tablePath, snapshotSchema, dynamicOptions, 
catalogEnvironment);
           
           // Create delta branch table
           TableSchema deltaSchema = tableSchema.copy(
               Collections.singletonMap(CoreOptions.BRANCH.key(), deltaBranch));
           FileStoreTable deltaTable = createWithoutFallbackBranch(
                   fileIO, tablePath, deltaSchema, dynamicOptions, 
catalogEnvironment);
           
           // Create chain table instance
           return new ChainFileStoreTable(
               (AbstractFileStoreTable) snapshotTable, 
               (AbstractFileStoreTable) deltaTable);
       }
   }
   ```
   
   ## 4. Usage Examples
   
   ### 4.1 Create Table
   
   ```sql
   CREATE TABLE default.t (
     t1 string COMMENT 't1',
     t2 string COMMENT 't2',
     t3 string COMMENT 't3'
   ) PARTITIONED BY (date string COMMENT 'date')
   TBLPROPERTIES (
     'primary_key' = 'date,t1',
     'bucket' = '2',
     'bucket-key' = 't1',
     'partition.timestamp-pattern' = '$date',
     'partition.timestamp-formatter' = 'yyyyMMdd',
     'chain-table.enabled' = 'true',
     'scan.fallback-snapshot-branch' = 'snapshot',
     'scan.fallback-delta-branch' = 'delta'
   );
   ```
   
   ### 4.2 Create Branches
   
   ```sql
   CALL sys.create_branch('default.t', 'snapshot');
   CALL sys.create_branch('default.t', 'delta');
   ```
   
   ### 4.3 Write Data
   
   ```sql
   -- Full write
   INSERT INTO/overwrite `default`.`t$branch_snapshot`
   SELECT ...
   
   -- Incremental write
   INSERT INTO/overwrite `default`.`t$branch_delta`
   SELECT ...
   ```
   
   ### 4.4 Read Data
   
   ```sql
   -- Full query
   SELECT * FROM default.t WHERE date = '${date}'
   
   -- Incremental query
   SELECT * FROM `default`.`t$branch_delta` WHERE date = '${date}'
   
   -- Hybrid query
   SELECT * FROM default.t WHERE date = '${date}'
   UNION ALL
   SELECT * FROM `default`.`t$branch_delta` WHERE date = '${date-1}'
   ```
   
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to