nzw921rx commented on issue #11061:
URL: https://github.com/apache/seatunnel/issues/11061#issuecomment-4678987292

   # Calcite Transform Architecture
   
   ## High-Level Dataflow
   
   ```mermaid
   flowchart TD
       Source["Source\n(single table / multi-table CDC / sharded tables)"]
       Row["SeaTunnelRow\neach row carries tableId"]
       Multi["CalciteMultiCatalogTransform\nroute by row.getTableId()"]
   
       subgraph TA["CalciteTransform (Table A)"]
           EA["Calcite Engine A\nparse → validate → execute"]
       end
       subgraph TB["CalciteTransform (Table B)"]
           EB["Calcite Engine B\nparse → validate → execute"]
       end
       TC["IdentityTransform\n(Table C — passthrough)"]
   
       Result["List‹SeaTunnelRow›\n(0..N rows)"]
       Sink["Sink"]
   
       Source --> Row --> Multi
       Multi --> EA
       Multi --> EB
       Multi --> TC
       EA --> Result
       EB --> Result
       TC --> Result
       Result --> Sink
   ```
   
   ## Execution Model
   
   CalciteTransform implements the `SeaTunnelFlatMapTransform` interface. The 
core method is `flatMap(SeaTunnelRow) → List<SeaTunnelRow>`.
   
   The MVP supports row-level execution only — each incoming row is immediately 
executed through Calcite and results are returned with zero buffering.
   
   ```text
   flatMap(inputRow) → List<SeaTunnelRow> (0..N rows)
     → convert SeaTunnelRow to Object[], inject into Calcite single-row table
     → Calcite executes the pre-compiled execution plan
     → convert ResultSet back to SeaTunnelRow
     → return result list:
         WHERE not matched → empty list (filtered)
         regular SELECT    → 1 row (mapped)
         UNNEST expansion  → N rows (flattened)
   ```
   
   **Execution plan is compiled once**: CalciteTransform completes SQL parse → 
validate → compile during initialization (`open()`), generating a Calcite 
`Bindable` execution plan that is cached. At runtime, each row only involves 
data injection + plan execution — no recompilation.
   
   Behavior is similar to the existing SQL Transform, but benefits from 
Calcite's complete function library (200+) and automatic type inference. No 
parallelism constraints — the transform runs at the same parallelism as the 
upstream Source.
   
   CalciteTransform **does not restrict which SQL syntax the user can write**. 
Since no data is buffered, Calcite always sees a table with exactly 1 row per 
`flatMap()` call. If the user writes GROUP BY / ORDER BY / window functions, 
Calcite executes them on a 1-row table — the result is equivalent to returning 
the row as-is (COUNT is always 1, sorting 1 row is a no-op, ROW_NUMBER is 
always 1). No rejection, no error — the user decides whether the semantics make 
sense.
   
   ## Error Handling
   
   - **Initialization phase**: SQL parse or validation failure (syntax error, 
column not found, type incompatibility) → throws exception immediately, job 
fails to start, no cluster resources wasted
   - **Runtime phase**: Row execution exception (e.g., JSON path not found 
returns null, CAST overflow) → follows Calcite standard semantics: `JSON_VALUE` 
with missing path returns null, `CAST` overflow throws exception causing job 
failure. No silent exception swallowing, no row skipping
   
   ## Multi-Table Routing
   
   A single Source in SeaTunnel can produce data from multiple logical tables 
(e.g., CDC whole-database sync, sharded tables). All rows enter the Transform 
as a mixed stream, with each `SeaTunnelRow` carrying a `tableId` field.
   
   CalciteMultiCatalogTransform extends `AbstractMultiCatalogFlatMapTransform`, 
reusing the framework's existing multi-table dispatch:
   
   ```text
   Construction:
     iterate inputCatalogTables
       → table_path specified in table_transform?  → create CalciteTransform 
with per-table config
       → matches table_match_regex?                → create CalciteTransform 
with global config
       → neither matches                           → create IdentityTransform 
(passthrough)
   
   Runtime:
     flatMap(row)
       → transformMap.get(row.getTableId()).flatMap(row)
   ```
   
   Each CalciteTransform instance has its own independent Calcite engine 
(independent Schema, independent execution plan). Tables are fully isolated 
from each other.
   
   ## Type Bridging
   
   CalciteTransform performs bidirectional mapping between two type systems:
   
   **Input direction** (SeaTunnel → Calcite): When registering a `CatalogTable` 
as a Calcite table, `SeaTunnelDataType` is mapped to `RelDataType`.
   
   **Output direction** (Calcite → SeaTunnel): The output type inferred by 
Calcite after SQL execution (`validatedRowType`) is mapped back to 
`SeaTunnelRowType` for downstream schema.
   
   Mapping rules:
   
   | SeaTunnel Type | Calcite SqlTypeName | Notes |
   |---------------|---------------------|-------|
   | STRING | VARCHAR | |
   | BOOLEAN | BOOLEAN | |
   | TINYINT / SMALLINT / INT / BIGINT | TINYINT / SMALLINT / INTEGER / BIGINT 
| |
   | FLOAT / DOUBLE | FLOAT / DOUBLE | |
   | DECIMAL(p, s) | DECIMAL(p, s) | Preserves precision and scale |
   | DATE | DATE | |
   | TIME | TIME | |
   | TIMESTAMP | TIMESTAMP | |
   | BYTES | VARBINARY | |
   | ARRAY | ARRAY | Element type mapped recursively |
   | MAP | MAP | Key/value types mapped recursively |
   | ROW | ROW | Sub-fields mapped recursively |
   | NULL | NULL | |
   
   Unmappable types throw an exception during initialization, causing job 
startup failure.
   
   Output schema is fully derived by Calcite's type inference — 
CalciteTransform does not need to compute it manually.
   
   ## UDF
   
   Users can register custom functions via the `udfs` configuration 
(implementing Calcite's `ScalarFunction` interface).
   
   Calcite itself provides 200+ built-in functions (math, string, date, JSON, 
etc.) that are available without registration.
   
   ## Dependency Isolation
   
   A new `seatunnel-shade/seatunnel-calcite` module relocates Calcite and its 
transitive dependencies:
   
   - `org.apache.calcite` → `org.apache.seatunnel.shade.org.apache.calcite`
   - `org.apache.calcite.avatica` → 
`org.apache.seatunnel.shade.org.apache.calcite.avatica`
   - Transitive dependencies (Guava, Janino, etc.) are relocated accordingly
   
   This avoids classpath conflicts with the Zeta engine (Hazelcast) and other 
SeaTunnel modules.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to