davidzollo commented on issue #10425: URL: https://github.com/apache/seatunnel/issues/10425#issuecomment-3904720765
> [@davidzollo](https://github.com/davidzollo) [@DanielCarter-stack](https://github.com/DanielCarter-stack) I've been working on the Multi-Table implementation for the RabbitMQ Source. I have successfully implemented the discovery phase and split assignment, but I am currently blocked by a `NullPointerException` during the data collection phase. When deserializing messages using `JsonDeserializationSchema`, the produced `SeaTunnelRow` has a null `rowType`. In Multi-Table mode, the engine calls SeaTunnelRow.getBytesSize(), which leads to this crash: > > java.lang.NullPointerException: Cannot invoke "org.apache.seatunnel.api.table.type.SeaTunnelRowType.getFieldType(int)" because "rowType" is null > at org.apache.seatunnel.api.table.type.SeaTunnelRow.getBytesSize(SeaTunnelRow.java:122) > at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:103) > I have updated the `SourceReader` to maintain a map of `DeserializationSchema` and `SeaTunnelRowType` per queue. I am explicitly setting `record.setTableId(queueName)` inside the collector. Despite setting the tableId, the engine still requires rowType for size calculation. > > The main issue is that while I have the correct `SeaTunnelRowType` available in the Reader, I haven't found a way to pass it into the `SeaTunnelRow` object. The class lacks a public `setRowType` method, and the `JsonDeserializationSchema` only populates the data fields, leaving the type information null. > > I would like to ask: > > * What is the intended way to bind `SeaTunnelRowType` to the row in a Multi-Table source before it is collected? In Multi-Table mode, **SeaTunnelRowType is not intended to be bound directly to each Row object.** ### Intended Way SeaTunnel's design separates type information from Row data, associating them via `TableId`. 1. **Collector Maintains the Mapping**: The `SeaTunnelSourceCollector` internally maintains a `Map<String, SeaTunnelRowType> rowTypeMap`. This map stores the mapping from `tableId` to `SeaTunnelRowType`. This information comes from the list of `CatalogTable`s registered during Source initialization. 2. **Row Carries the ID**: Each `SeaTunnelRow` object only needs to carry the correct `tableId` (via `row.setTableId(String id)`). 3. **Runtime Binding**: When the Collector receives data and needs to calculate the size (`getBytesSize`) or perform other operations, it uses `row.getTableId()` to look up the corresponding `RowType` in the `rowTypeMap`, and then passes it to the generic `SeaTunnelRow` methods for calculation. **Summary**: You do not (and cannot) inject `RowType` into `SeaTunnelRow`. The only thing you need to do is ensure that the `TableId` string set by the data producer (Reader/Deserializer) **matches exactly** the `TableId` string registered on the Engine side (Collector). If you encounter an NPE, it is because these two strings do not match (e.g., one has a database prefix and the other doesn't; or as we found earlier, one is a normal string and the other is the `toString` representation of an Optional). > * Is this a known issue with the current `JsonDeserializationSchema` implementation for multi-table scenarios? We didn't find this problem. You can help fix it, thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
