nzw921rx opened a new issue, #11061:
URL: https://github.com/apache/seatunnel/issues/11061

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   ### Background
   
   SeaTunnel's Transform layer currently only supports row-level operations — 
column mapping, type conversion, regex replacement, and simple filtering. The 
existing `SQL` Transform is powered by a hand-written ZetaSQLEngine (JSQLParser 
+ imperative if/else), which explicitly blocks JOIN, GROUP BY, window 
functions, subqueries, and set operations due to architectural limitations (no 
relational algebra layer, no type system, no optimizer).
   
   This means SeaTunnel effectively only performs **EL** (Extract + Load). The 
**T** (Transform) is severely limited — any complex data transformation must be 
pushed to downstream databases, requiring extra pipelines and external 
coordination.
   
   ### Motivation
   
   Users frequently need in-pipeline transformations that go beyond row-level 
operations:
   
   - **Multi-source merge**: JOIN two source tables before writing to sink — 
currently requires two separate pipelines
   - **Deduplication**: `ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC)` 
— not possible today
   - **Pre-load aggregation**: `GROUP BY department HAVING COUNT(*) > 5` — must 
rely on downstream DB
   - **Top-N filtering**: `ORDER BY score DESC LIMIT 100` — blocked by engine
   
   These are standard SQL operations supported by every database. SeaTunnel's 
Transform layer should be able to handle them natively.
   
   ### Goal
   
   Add a **new** Transform plugin `Calcite` based on [Apache 
Calcite](https://calcite.apache.org/) that provides:
   
   1. **Full SQL support from day one**: JOIN, GROUP BY, window functions, 
subqueries, ORDER BY / LIMIT, set operations (UNION / INTERSECT / MINUS), 200+ 
built-in functions
   2. **Multi-table merge**: Accept multiple same-schema upstream tables (via 
`plugin_input` list) — the engine unions them into a single stream, then 
Calcite performs aggregation, deduplication, and analytics on the merged data. 
Typical use case: cross-shard statistics for sharded databases.
   3. **UDF registration**: Declarative user-defined function registration via 
configuration, with built-in UDFs (`mask` / `mask_hash` for dual-field data 
masking, `url_encode` / `url_decode` for URL encoding). Note: JSON operations 
use Calcite's native SQL:2016 JSON functions (`JSON_VALUE`, `JSON_QUERY`, 
`JSON_EXISTS`) — no UDF needed.
   4. **Memory protection**: Configurable `max_partition_rows` and 
`partition_overflow_policy` (FAIL / SKIP / TRUNCATE) to prevent OOM on large 
partitions
   5. **`--check` integration**: SQL syntax validation via `ConditionExtension` 
at submission time, with WARNING for window functions without `PARTITION BY`
   6. **Documentation**: Complete en + zh docs with usage examples and memory 
guidance
   7. **E2E tests**: Covering all SQL features, multi-table JOIN, and UDF 
scenarios
   
   ### Non-Goal
   
   1. **Not replacing the existing `SQL` Transform** — it remains unchanged for 
lightweight row-level operations. Both plugins coexist.
   2. **Not supporting streaming windows** (TUMBLE / HOP / SESSION) — these 
require event time, watermark, and window trigger mechanisms that belong to a 
stream processing engine, not a batch Transform.
   3. **Not implementing disk spill** in the first version — memory protection 
uses fail/skip/truncate strategies. Disk-based external sort for large JOINs 
can be explored as a future enhancement.
   
   ### Technical Design
   
   **Architecture**:
   
   ```
   Existing SQL Transform (unchanged):
     User SQL → JSQLParser(parse) → ZetaSQLEngine(type + eval) → SeaTunnelRow
   
   New Calcite Transform:
     User SQL → Calcite(parse + validate + optimize) → SeaTunnelRow 
Adapter(execute) → SeaTunnelRow
   ```
   
   **Integration points**:
   
   | Component | Description |
   |-----------|-------------|
   | Schema Adapter | Map `CatalogTable` → Calcite `RelDataType`, register 
`SeaTunnelRow` as Calcite tables |
   | Type Mapping | Bidirectional `SeaTunnelDataType` ↔ Calcite `RelDataType` / 
`SqlTypeName` |
   | Execution | Calcite compiles SQL to relational algebra; SeaTunnel provides 
`Enumerable` row iteration |
   | Multi-table merge | Multiple same-schema `CatalogTable`s from 
`plugin_input` are unioned by the engine into a single stream; Calcite sees one 
logical table |
   | UDF | Users register functions via Calcite's `ScalarFunction` / 
`AggregateFunction` interfaces |
   | Dependency | `calcite-core` shaded under `org.apache.seatunnel.shade` |
   
   **Memory protection for JOIN & window functions**:
   
   Calcite's `EnumerableWindow` loads all rows of a partition into memory. 
Protection is at the SeaTunnel adapter layer:
   
   ```hocon
   transform {
     Calcite {
       query = "SELECT ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts) ..."
       max_partition_rows = 100000
       partition_overflow_policy = "SKIP"  # SKIP / TRUNCATE / FAIL (default: 
FAIL)
     }
   }
   ```
   
   | Policy | Behavior |
   |--------|----------|
   | **FAIL** (default) | Throw exception with partition details, abort safely 
— no silent OOM |
   | **SKIP** | Skip overflowed partition, log WARNING, continue |
   | **TRUNCATE** | Keep first N rows, discard rest, log WARNING |
   
   **UDF configuration**:
   
   ```hocon
   transform {
     Calcite {
       udfs = [
         { name = "mask_phone", class = "com.example.udf.MaskPhoneFunction" }
       ]
       query = "SELECT id, mask_phone(phone) as phone FROM source_table"
     }
   }
   ```
   
   Built-in UDFs:
   - `mask(value, start, end, char)` / `mask_hash(value, start, end, char)` — 
data masking pair: `mask` returns the masked display value, `mask_hash` returns 
the SHA-256 hash of the original value for downstream JOIN / GROUP BY / 
DISTINCT comparison. e.g. `mask(phone, 3, 7, '*')` → `138****5678`, 
`mask_hash(phone, 3, 7, '*')` → `a3f8...`
   - `url_encode(value)` / `url_decode(value)` — URL encoding/decoding, common 
in web data ETL
   
   > JSON operations do **not** require a UDF — use Calcite's native SQL:2016 
JSON functions: `JSON_VALUE(col, '$.path')`, `JSON_QUERY(col, '$.path')`, 
`JSON_EXISTS(col, '$.path')`.
   
   
   ### Usage Scenario
   
   ### 1. Sharded table merge + cross-shard aggregation
   
   ```hocon
   source { MySQL { plugin_output = "users_shard_0", query = "SELECT * FROM 
users", url = "jdbc:mysql://shard0/db" ... } }
   source { MySQL { plugin_output = "users_shard_1", query = "SELECT * FROM 
users", url = "jdbc:mysql://shard1/db" ... } }
   source { MySQL { plugin_output = "users_shard_2", query = "SELECT * FROM 
users", url = "jdbc:mysql://shard2/db" ... } }
   transform {
     Calcite {
       plugin_input = ["users_shard_0", "users_shard_1", "users_shard_2"]
       plugin_output = "result"
       query = """
         SELECT city, COUNT(*) as user_count, AVG(age) as avg_age
         FROM source_table
         GROUP BY city
         ORDER BY user_count DESC
       """
     }
   }
   sink { StarRocks { plugin_input = ["result"] ... } }
   ```
   
   ### 2. Window function deduplication
   
   ```hocon
   transform {
     Calcite {
       query = """
         SELECT * FROM (
           SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY 
updated_at DESC) as rn
           FROM source_table
         ) t WHERE rn = 1
       """
     }
   }
   ```
   
   ### 3. Pre-load aggregation
   
   ```hocon
   transform {
     Calcite {
       query = """
         SELECT department, COUNT(*) as headcount, AVG(salary) as avg_salary
         FROM employees GROUP BY department HAVING COUNT(*) > 5
       """
     }
   }
   ```
   
   ### 4. Data masking (dual-field: display + hash)
   
   ```hocon
   transform {
     Calcite {
       query = """
         SELECT id, name,
                mask(phone, 3, 7, '*')       as phone_masked,
                mask_hash(phone, 3, 7, '*')  as phone_hash
         FROM source_table
       """
     }
   }
   -- Output: | id | name | phone_masked  | phone_hash                         |
   --         | 1  | Tom  | 138****5678   | a3f8...  (SHA-256 of 13812345678)  |
   -- Downstream JOIN/GROUP BY uses phone_hash for correct comparison
   ```
   
   ### 5. JSON extraction (Calcite native, no UDF)
   
   ```hocon
   transform {
     Calcite {
       query = """
         SELECT id,
                JSON_VALUE(ext_info, '$.address.city') as city,
                JSON_VALUE(ext_info, '$.tags[0]') as first_tag
         FROM source_table
       """
     }
   }
   ```
   
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to