Adamyuanyuan opened a new issue, #10288:
URL: https://github.com/apache/seatunnel/issues/10288

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
    Goal: Use database-side sampling capabilities (Oracle `SAMPLE`, Postgres 
`TABLESAMPLE`, etc.) plus quantile boundaries to reduce split generation cost 
and improve parallel balance for highly skewed data.
   
   
   ### 0. Background: Why we need a “faster and more balanced” splitter
   
   The current splitter algorithms can already cover most extraction scenarios, 
but there is still room for improvement in some extreme cases.
   
   In practice, we have very large data sources (e.g., Oracle / PostgreSQL). A 
single table can be ~300GB with ~1B rows, and the ID column is not 
auto-increment (e.g., government ID-like numbers). In such cases the split key 
distribution is highly skewed, and the existing splitter strategies are not 
always cost-effective.
   
   When JDBC Source reads in parallel using `partition_column`, it must first 
split the full table (or query result) into multiple splits. The splitting 
strategy directly impacts:
   
   - Job startup time (database pressure / time cost during split planning)
   - Parallelism utilization (many empty splits vs. a few long-tail splits)
   - Stability (many SQLs vs. a long scan)
   
   When the split key is severely skewed (large gaps, hot/cold regions, value 
range much larger than row count), simply dividing the value range evenly often 
leads to unbalanced workloads. SeaTunnel already has optimizations for uneven 
distributions (DynamicChunkSplitter + sampling sharding), but there is still an 
opportunity to reduce the cost of split planning by leveraging database-native 
sampling.
   
   ---
   
   ### 1. Current approach: SeaTunnel DynamicChunkSplitter
   
   > Note: DynamicChunkSplitter targets `split.size` (rows per split) and 
switches strategies based on data distribution.
   
   At a high level, DynamicChunkSplitter is a trade-off between “fewer SQLs” 
and “less scanning”:
   
   - If shard count is very large: it prefers scanning the split key column 
once (sampling sharding) rather than issuing thousands of `queryNextChunkMax` 
SQLs.
   - If shard count is not large: it prefers multiple small boundary queries to 
avoid a full scan.
   
   ---
   
   ### 2. Limitations of the current approach
   
   #### 2.1 Cost of sampling sharding: it often scans the entire split-key 
column (or full query result)
   
   The SQL of `sampleDataFromColumn` is essentially `SELECT split_col FROM 
...`, and the client must iterate the ResultSet to “take 1 row every N rows”. 
This means:
   
   - For huge tables: split planning can still trigger a long full scan 
(although only one column and can be streamed).
   - The driver/network still streams all rows to the client (the client just 
discards most of them).
   - The sample can still be large: sample size is roughly `rowCount / 
inverseSamplingRate`, and the client needs to store and sort the sample.
   
   #### 2.2 Sample representativeness: taking “1 every N rows” is not truly 
random sampling
   
   The current sampling is a form of systematic sampling based on the ResultSet 
order, rather than random sampling at the database level:
   
   - If the result order is correlated (table layout / index / execution plan), 
the sample may be biased, reducing quantile boundary quality.
   - In extreme cases it may still produce unbalanced splits (e.g., hot data 
concentrated in certain physical ranges).
   
   #### 2.3 Query mode can be more expensive
   
   If users provide a query (with WHERE/JOIN) and also specify 
`partition_column`:
   
   - Sampling sharding must iterate the entire query result set.
   - Some complex queries are even more expensive than scanning a single column 
from a table.
   
   ---
   
   ### 3. Proposed idea: use database-side sampling to get representative 
samples/boundaries faster
   
   #### 3.1 Core goal
   
   During split planning, push “sampling” down to the database, so the database 
returns only a small number of samples / boundaries:
   
   - Avoid scanning the entire split-key column
   - Reduce network transfer
   - Shorten job startup time
   - Keep the “sampling-based balanced splitting” effect for skewed data
   
   #### 3.2 Implementation pattern
   
   Example pseudo SQL for Oracle/Postgres (dialects vary by database):
   
   ```sql
   WITH sampled AS (
     SELECT col
     FROM table SAMPLE(<percent>)
     WHERE col IS NOT NULL
   ),
   buckets AS (
     SELECT NTILE(<bucket_number>) OVER (ORDER BY col) AS bucket_no, col
     FROM sampled
   )
   SELECT MAX(col) AS boundary
   FROM buckets
   WHERE bucket_no < <bucket_number>
   GROUP BY bucket_no
   ORDER BY bucket_no;
   ```
   
   Advantages: only returns `bucket_number - 1` boundary values; lower client 
overhead; faster planning for huge tables.
   
   Risks: relies on window functions/CTE; support varies across databases and 
versions, so this needs per-dialect implementations and a robust fallback.
   
   #### 3.3 Applicable scenarios & expected benefits
   
   Best-fit scenarios:
   
   - Very large tables (10M/100M/1B rows) with obvious skew on the split key
   - Databases with efficient sampling (e.g., Oracle/Postgres block-level 
sampling), with dialect-specific support
   - Users care about database pressure and time cost during job startup
   
   Expected benefits:
   
   - Network transfer goes from “stream all rows and drop most” to “return only 
samples/boundaries”
   - The number of splits can be controlled by `bucket_number`, making it 
easier to align with source parallelism
   
   ---
   
   ### 4. Risks and constraints
   
   - Database pressure: for huge tables, if sampling percentage is set too 
high, database-side sampling can still be costly. In our tests, 0.1% sampling 
is manageable for billion-row tables.
   - Approximation: sampling is inherently approximate; usually acceptable.
   - Reproducibility: random sampling can produce different boundaries between 
runs; usually acceptable because splitting is only a parallel read strategy, 
but should be documented clearly.
   
   ---
   
   ### 5. Configurations
   
   - `split.sampled_balanced_sharding`: enable database-side sampled balanced 
sharding (default `false`)
   - `split.sampling_percentage`: sampling ratio in `(0.0, 1.0]` (default 
`0.001`, i.e. 0.1%)
   - `split.bucket_number`: target bucket/split count (default `10`; must be 
`>= 1`; `1` means no splitting)
   
   ---
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to