nzw921rx commented on issue #11061:
URL: https://github.com/apache/seatunnel/issues/11061#issuecomment-4678987292
# Calcite Transform Architecture
## High-Level Dataflow
```mermaid
flowchart TD
Source["Source\n(single table / multi-table CDC / sharded tables)"]
Row["SeaTunnelRow\neach row carries tableId"]
Multi["CalciteMultiCatalogTransform\nroute by row.getTableId()"]
subgraph TA["CalciteTransform (Table A)"]
EA["Calcite Engine A\nparse → validate → execute"]
end
subgraph TB["CalciteTransform (Table B)"]
EB["Calcite Engine B\nparse → validate → execute"]
end
TC["IdentityTransform\n(Table C — passthrough)"]
Result["List‹SeaTunnelRow›\n(0..N rows)"]
Sink["Sink"]
Source --> Row --> Multi
Multi --> EA
Multi --> EB
Multi --> TC
EA --> Result
EB --> Result
TC --> Result
Result --> Sink
```
## Execution Model
CalciteTransform implements the `SeaTunnelFlatMapTransform` interface. The
core method is `flatMap(SeaTunnelRow) → List<SeaTunnelRow>`.
The MVP supports row-level execution only — each incoming row is immediately
executed through Calcite and results are returned with zero buffering.
```text
flatMap(inputRow) → List<SeaTunnelRow> (0..N rows)
→ convert SeaTunnelRow to Object[], inject into Calcite single-row table
→ Calcite executes the pre-compiled execution plan
→ convert ResultSet back to SeaTunnelRow
→ return result list:
WHERE not matched → empty list (filtered)
regular SELECT → 1 row (mapped)
UNNEST expansion → N rows (flattened)
```
**Execution plan is compiled once**: CalciteTransform completes SQL parse →
validate → compile during initialization (`open()`), generating a Calcite
`Bindable` execution plan that is cached. At runtime, each row only involves
data injection + plan execution — no recompilation.
Behavior is similar to the existing SQL Transform, but benefits from
Calcite's complete function library (200+) and automatic type inference. No
parallelism constraints — the transform runs at the same parallelism as the
upstream Source.
CalciteTransform **does not restrict which SQL syntax the user can write**.
Since no data is buffered, Calcite always sees a table with exactly 1 row per
`flatMap()` call. If the user writes GROUP BY / ORDER BY / window functions,
Calcite executes them on a 1-row table — the result is equivalent to returning
the row as-is (COUNT is always 1, sorting 1 row is a no-op, ROW_NUMBER is
always 1). No rejection, no error — the user decides whether the semantics make
sense.
## Error Handling
- **Initialization phase**: SQL parse or validation failure (syntax error,
column not found, type incompatibility) → throws exception immediately, job
fails to start, no cluster resources wasted
- **Runtime phase**: Row execution exception (e.g., JSON path not found
returns null, CAST overflow) → follows Calcite standard semantics: `JSON_VALUE`
with missing path returns null, `CAST` overflow throws exception causing job
failure. No silent exception swallowing, no row skipping
## Multi-Table Routing
A single Source in SeaTunnel can produce data from multiple logical tables
(e.g., CDC whole-database sync, sharded tables). All rows enter the Transform
as a mixed stream, with each `SeaTunnelRow` carrying a `tableId` field.
CalciteMultiCatalogTransform extends `AbstractMultiCatalogFlatMapTransform`,
reusing the framework's existing multi-table dispatch:
```text
Construction:
iterate inputCatalogTables
→ table_path specified in table_transform? → create CalciteTransform
with per-table config
→ matches table_match_regex? → create CalciteTransform
with global config
→ neither matches → create IdentityTransform
(passthrough)
Runtime:
flatMap(row)
→ transformMap.get(row.getTableId()).flatMap(row)
```
Each CalciteTransform instance has its own independent Calcite engine
(independent Schema, independent execution plan). Tables are fully isolated
from each other.
## Type Bridging
CalciteTransform performs bidirectional mapping between two type systems:
**Input direction** (SeaTunnel → Calcite): When registering a `CatalogTable`
as a Calcite table, `SeaTunnelDataType` is mapped to `RelDataType`.
**Output direction** (Calcite → SeaTunnel): The output type inferred by
Calcite after SQL execution (`validatedRowType`) is mapped back to
`SeaTunnelRowType` for downstream schema.
Mapping rules:
| SeaTunnel Type | Calcite SqlTypeName | Notes |
|---------------|---------------------|-------|
| STRING | VARCHAR | |
| BOOLEAN | BOOLEAN | |
| TINYINT / SMALLINT / INT / BIGINT | TINYINT / SMALLINT / INTEGER / BIGINT
| |
| FLOAT / DOUBLE | FLOAT / DOUBLE | |
| DECIMAL(p, s) | DECIMAL(p, s) | Preserves precision and scale |
| DATE | DATE | |
| TIME | TIME | |
| TIMESTAMP | TIMESTAMP | |
| BYTES | VARBINARY | |
| ARRAY | ARRAY | Element type mapped recursively |
| MAP | MAP | Key/value types mapped recursively |
| ROW | ROW | Sub-fields mapped recursively |
| NULL | NULL | |
Unmappable types throw an exception during initialization, causing job
startup failure.
Output schema is fully derived by Calcite's type inference —
CalciteTransform does not need to compute it manually.
## UDF
Users can register custom functions via the `udfs` configuration
(implementing Calcite's `ScalarFunction` interface).
Calcite itself provides 200+ built-in functions (math, string, date, JSON,
etc.) that are available without registration.
## Dependency Isolation
A new `seatunnel-shade/seatunnel-calcite` module relocates Calcite and its
transitive dependencies:
- `org.apache.calcite` → `org.apache.seatunnel.shade.org.apache.calcite`
- `org.apache.calcite.avatica` →
`org.apache.seatunnel.shade.org.apache.calcite.avatica`
- Transitive dependencies (Guava, Janino, etc.) are relocated accordingly
This avoids classpath conflicts with the Zeta engine (Hazelcast) and other
SeaTunnel modules.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]