davidzollo commented on issue #10425:
URL: https://github.com/apache/seatunnel/issues/10425#issuecomment-3904720765

   > [@davidzollo](https://github.com/davidzollo) 
[@DanielCarter-stack](https://github.com/DanielCarter-stack) I've been working 
on the Multi-Table implementation for the RabbitMQ Source. I have successfully 
implemented the discovery phase and split assignment, but I am currently 
blocked by a `NullPointerException` during the data collection phase. When 
deserializing messages using `JsonDeserializationSchema`, the produced 
`SeaTunnelRow` has a null `rowType`. In Multi-Table mode, the engine calls 
SeaTunnelRow.getBytesSize(), which leads to this crash:
   > 
   > java.lang.NullPointerException: Cannot invoke 
"org.apache.seatunnel.api.table.type.SeaTunnelRowType.getFieldType(int)" 
because "rowType" is null
   >     at 
org.apache.seatunnel.api.table.type.SeaTunnelRow.getBytesSize(SeaTunnelRow.java:122)
   >     at 
org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:103)
   > I have updated the `SourceReader` to maintain a map of 
`DeserializationSchema` and `SeaTunnelRowType` per queue. I am explicitly 
setting `record.setTableId(queueName)` inside the collector. Despite setting 
the tableId, the engine still requires rowType for size calculation.
   > 
   > The main issue is that while I have the correct `SeaTunnelRowType` 
available in the Reader, I haven't found a way to pass it into the 
`SeaTunnelRow` object. The class lacks a public `setRowType` method, and the 
`JsonDeserializationSchema` only populates the data fields, leaving the type 
information null.
   > 
   > I would like to ask:
   > 
   > * What is the intended way to bind `SeaTunnelRowType` to the row in a 
Multi-Table source before it is collected?
   
   In Multi-Table mode, **SeaTunnelRowType is not intended to be bound directly 
to each Row object.**
   
   ### Intended Way
   
   SeaTunnel's design separates type information from Row data, associating 
them via `TableId`.
   
   1.  **Collector Maintains the Mapping**:
       The `SeaTunnelSourceCollector` internally maintains a `Map<String, 
SeaTunnelRowType> rowTypeMap`. This map stores the mapping from `tableId` to 
`SeaTunnelRowType`. This information comes from the list of `CatalogTable`s 
registered during Source initialization.
   
   2.  **Row Carries the ID**:
       Each `SeaTunnelRow` object only needs to carry the correct `tableId` 
(via `row.setTableId(String id)`).
   
   3.  **Runtime Binding**:
       When the Collector receives data and needs to calculate the size 
(`getBytesSize`) or perform other operations, it uses `row.getTableId()` to 
look up the corresponding `RowType` in the `rowTypeMap`, and then passes it to 
the generic `SeaTunnelRow` methods for calculation.
   
   **Summary**: You do not (and cannot) inject `RowType` into `SeaTunnelRow`. 
The only thing you need to do is ensure that the `TableId` string set by the 
data producer (Reader/Deserializer) **matches exactly** the `TableId` string 
registered on the Engine side (Collector). If you encounter an NPE, it is 
because these two strings do not match (e.g., one has a database prefix and the 
other doesn't; or as we found earlier, one is a normal string and the other is 
the `toString` representation of an Optional).
   
   > * Is this a known issue with the current `JsonDeserializationSchema` 
implementation for multi-table scenarios?
   We didn't find this problem. You can help fix it, thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to