Anthippi opened a new pull request, #10557:
URL: https://github.com/apache/seatunnel/pull/10557

   ### Purpose of this pull request
   
   This PR introduces Multi-Table read support for the RabbitMQ Source 
Connector. 
   Part of #10425
   
   **Implementation Details:**
   1. **`SupportMultipleTable` Implementation:** Followed the official 
contribution guide to implement the multi-table API:
      - Added the `TABLE_CONFIGS` option to 
`RabbitmqSourceFactory#optionRule()` with exclusive constraints against legacy 
single-table options.
      - Refactored `RabbitmqSource` initialization to parse the 
`ReadonlyConfig` and construct a `List<CatalogTable>`. It iterates over 
`table_configs` for multi-table setups, while maintaining a fallback block to 
support legacy single-table configurations.
      - Overrode `getProducedCatalogTables()` to return this pre-computed 
`List<CatalogTable>`, properly exposing the multi-table metadata (schemas and 
table identifiers) to the SeaTunnel engine.
   2. **Reader Initialization & Message Origin Tracking:** Since a single 
`RabbitmqSourceReader` can now consume from multiple queues/tables 
concurrently, we introduced the `DeliveryMessage` wrapper class. 
`QueueingConsumer` and `RabbitmqClient` were updated to tag every incoming 
RabbitMQ `Delivery` with its originating `splitId` (queue name) before placing 
it in the reader's internal `BlockingQueue`.
   3. **Exact TableId Binding:** To prevent rows from losing their identifiers 
in multi-table mode, `RabbitmqSourceReader` caches the exact `TablePath` 
(Catalog/Database/Table) generated by `CatalogTableUtil` during initialization. 
At runtime, it uses the `splitId` to inject the correct `TableId` into each 
`SeaTunnelRow` via `row.setTableId()`.
   4. **Split Distribution & Graceful Shutdown:** `RabbitmqSplitEnumerator` now 
discovers and assigns splits per table/queue. Implemented `signalNoMoreSplits` 
in the enumerator and `handleNoMoreSplits` in the reader to prevent the job 
from hanging in bounded/E2E environments when parallelism is higher than the 
available splits.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. Users can now use the `tables_configs` array in the RabbitMQ source 
configuration to read from multiple queues simultaneously within a single job.
   
   *Previous behavior:* The RabbitMQ source only supported a single 
queue/schema configuration at the root level.
   *New behavior:* Supports `tables_configs` for multi-table mode, while 
remaining fully backward compatible with the legacy single-queue configuration 
logic.
   
   ### How was this patch tested?
   
   - Added `RabbitmqSourceTest` to verify config parsing, fallback logic, and 
table identifier generation.
   - Added `RabbitmqSplitEnumeratorTest` to verify multi-table split discovery 
and assignment logic.
   - Added `RabbitmqSourceFactoryTest` to ensure validation rules and option 
keys are correctly registered.
   - Updated E2E Integration test (`RabbitmqIT.java`) by adding 
`testRabbitMQMultiTableE2E`. This tests the complete pipeline by generating 
data for two different schemas/queues, consuming them concurrently, and 
validating the correct row distribution per table using the Assert Sink (with 
`rule_value` assertions).
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If necessary, please update `incompatible-changes.md` to describe the 
incompatibility caused by this PR.
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
   
   *(Note: Since this PR enhances an existing connector rather than creating a 
new one from scratch, steps 1, 2, 3, and 5 in the checklist above are not 
applicable. Step 4 has been fulfilled by updating the existing RabbitMQ E2E 
tests).*


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to