nzw921rx opened a new issue, #11061: URL: https://github.com/apache/seatunnel/issues/11061
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description ### Background SeaTunnel's Transform layer currently only supports row-level operations — column mapping, type conversion, regex replacement, and simple filtering. The existing `SQL` Transform is powered by a hand-written ZetaSQLEngine (JSQLParser + imperative if/else), which explicitly blocks JOIN, GROUP BY, window functions, subqueries, and set operations due to architectural limitations (no relational algebra layer, no type system, no optimizer). This means SeaTunnel effectively only performs **EL** (Extract + Load). The **T** (Transform) is severely limited — any complex data transformation must be pushed to downstream databases, requiring extra pipelines and external coordination. ### Motivation Users frequently need in-pipeline transformations that go beyond row-level operations: - **Multi-source merge**: JOIN two source tables before writing to sink — currently requires two separate pipelines - **Deduplication**: `ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC)` — not possible today - **Pre-load aggregation**: `GROUP BY department HAVING COUNT(*) > 5` — must rely on downstream DB - **Top-N filtering**: `ORDER BY score DESC LIMIT 100` — blocked by engine These are standard SQL operations supported by every database. SeaTunnel's Transform layer should be able to handle them natively. ### Goal Add a **new** Transform plugin `Calcite` based on [Apache Calcite](https://calcite.apache.org/) that provides: 1. **Full SQL support from day one**: JOIN, GROUP BY, window functions, subqueries, ORDER BY / LIMIT, set operations (UNION / INTERSECT / MINUS), 200+ built-in functions 2. **Multi-table merge**: Accept multiple same-schema upstream tables (via `plugin_input` list) — the engine unions them into a single stream, then Calcite performs aggregation, deduplication, and analytics on the merged data. Typical use case: cross-shard statistics for sharded databases. 3. **UDF registration**: Declarative user-defined function registration via configuration, with built-in UDFs (`mask` / `mask_hash` for dual-field data masking, `url_encode` / `url_decode` for URL encoding). Note: JSON operations use Calcite's native SQL:2016 JSON functions (`JSON_VALUE`, `JSON_QUERY`, `JSON_EXISTS`) — no UDF needed. 4. **Memory protection**: Configurable `max_partition_rows` and `partition_overflow_policy` (FAIL / SKIP / TRUNCATE) to prevent OOM on large partitions 5. **`--check` integration**: SQL syntax validation via `ConditionExtension` at submission time, with WARNING for window functions without `PARTITION BY` 6. **Documentation**: Complete en + zh docs with usage examples and memory guidance 7. **E2E tests**: Covering all SQL features, multi-table JOIN, and UDF scenarios ### Non-Goal 1. **Not replacing the existing `SQL` Transform** — it remains unchanged for lightweight row-level operations. Both plugins coexist. 2. **Not supporting streaming windows** (TUMBLE / HOP / SESSION) — these require event time, watermark, and window trigger mechanisms that belong to a stream processing engine, not a batch Transform. 3. **Not implementing disk spill** in the first version — memory protection uses fail/skip/truncate strategies. Disk-based external sort for large JOINs can be explored as a future enhancement. ### Technical Design **Architecture**: ``` Existing SQL Transform (unchanged): User SQL → JSQLParser(parse) → ZetaSQLEngine(type + eval) → SeaTunnelRow New Calcite Transform: User SQL → Calcite(parse + validate + optimize) → SeaTunnelRow Adapter(execute) → SeaTunnelRow ``` **Integration points**: | Component | Description | |-----------|-------------| | Schema Adapter | Map `CatalogTable` → Calcite `RelDataType`, register `SeaTunnelRow` as Calcite tables | | Type Mapping | Bidirectional `SeaTunnelDataType` ↔ Calcite `RelDataType` / `SqlTypeName` | | Execution | Calcite compiles SQL to relational algebra; SeaTunnel provides `Enumerable` row iteration | | Multi-table merge | Multiple same-schema `CatalogTable`s from `plugin_input` are unioned by the engine into a single stream; Calcite sees one logical table | | UDF | Users register functions via Calcite's `ScalarFunction` / `AggregateFunction` interfaces | | Dependency | `calcite-core` shaded under `org.apache.seatunnel.shade` | **Memory protection for JOIN & window functions**: Calcite's `EnumerableWindow` loads all rows of a partition into memory. Protection is at the SeaTunnel adapter layer: ```hocon transform { Calcite { query = "SELECT ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts) ..." max_partition_rows = 100000 partition_overflow_policy = "SKIP" # SKIP / TRUNCATE / FAIL (default: FAIL) } } ``` | Policy | Behavior | |--------|----------| | **FAIL** (default) | Throw exception with partition details, abort safely — no silent OOM | | **SKIP** | Skip overflowed partition, log WARNING, continue | | **TRUNCATE** | Keep first N rows, discard rest, log WARNING | **UDF configuration**: ```hocon transform { Calcite { udfs = [ { name = "mask_phone", class = "com.example.udf.MaskPhoneFunction" } ] query = "SELECT id, mask_phone(phone) as phone FROM source_table" } } ``` Built-in UDFs: - `mask(value, start, end, char)` / `mask_hash(value, start, end, char)` — data masking pair: `mask` returns the masked display value, `mask_hash` returns the SHA-256 hash of the original value for downstream JOIN / GROUP BY / DISTINCT comparison. e.g. `mask(phone, 3, 7, '*')` → `138****5678`, `mask_hash(phone, 3, 7, '*')` → `a3f8...` - `url_encode(value)` / `url_decode(value)` — URL encoding/decoding, common in web data ETL > JSON operations do **not** require a UDF — use Calcite's native SQL:2016 JSON functions: `JSON_VALUE(col, '$.path')`, `JSON_QUERY(col, '$.path')`, `JSON_EXISTS(col, '$.path')`. ### Usage Scenario ### 1. Sharded table merge + cross-shard aggregation ```hocon source { MySQL { plugin_output = "users_shard_0", query = "SELECT * FROM users", url = "jdbc:mysql://shard0/db" ... } } source { MySQL { plugin_output = "users_shard_1", query = "SELECT * FROM users", url = "jdbc:mysql://shard1/db" ... } } source { MySQL { plugin_output = "users_shard_2", query = "SELECT * FROM users", url = "jdbc:mysql://shard2/db" ... } } transform { Calcite { plugin_input = ["users_shard_0", "users_shard_1", "users_shard_2"] plugin_output = "result" query = """ SELECT city, COUNT(*) as user_count, AVG(age) as avg_age FROM source_table GROUP BY city ORDER BY user_count DESC """ } } sink { StarRocks { plugin_input = ["result"] ... } } ``` ### 2. Window function deduplication ```hocon transform { Calcite { query = """ SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY updated_at DESC) as rn FROM source_table ) t WHERE rn = 1 """ } } ``` ### 3. Pre-load aggregation ```hocon transform { Calcite { query = """ SELECT department, COUNT(*) as headcount, AVG(salary) as avg_salary FROM employees GROUP BY department HAVING COUNT(*) > 5 """ } } ``` ### 4. Data masking (dual-field: display + hash) ```hocon transform { Calcite { query = """ SELECT id, name, mask(phone, 3, 7, '*') as phone_masked, mask_hash(phone, 3, 7, '*') as phone_hash FROM source_table """ } } -- Output: | id | name | phone_masked | phone_hash | -- | 1 | Tom | 138****5678 | a3f8... (SHA-256 of 13812345678) | -- Downstream JOIN/GROUP BY uses phone_hash for correct comparison ``` ### 5. JSON extraction (Calcite native, no UDF) ```hocon transform { Calcite { query = """ SELECT id, JSON_VALUE(ext_info, '$.address.city') as city, JSON_VALUE(ext_info, '$.tags[0]') as first_tag FROM source_table """ } } ``` ### Related issues _No response_ ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
