Anthippi opened a new pull request, #10557:
URL: https://github.com/apache/seatunnel/pull/10557
### Purpose of this pull request
This PR introduces Multi-Table read support for the RabbitMQ Source
Connector.
Part of #10425
**Implementation Details:**
1. **`SupportMultipleTable` Implementation:** Followed the official
contribution guide to implement the multi-table API:
- Added the `TABLE_CONFIGS` option to
`RabbitmqSourceFactory#optionRule()` with exclusive constraints against legacy
single-table options.
- Refactored `RabbitmqSource` initialization to parse the
`ReadonlyConfig` and construct a `List<CatalogTable>`. It iterates over
`table_configs` for multi-table setups, while maintaining a fallback block to
support legacy single-table configurations.
- Overrode `getProducedCatalogTables()` to return this pre-computed
`List<CatalogTable>`, properly exposing the multi-table metadata (schemas and
table identifiers) to the SeaTunnel engine.
2. **Reader Initialization & Message Origin Tracking:** Since a single
`RabbitmqSourceReader` can now consume from multiple queues/tables
concurrently, we introduced the `DeliveryMessage` wrapper class.
`QueueingConsumer` and `RabbitmqClient` were updated to tag every incoming
RabbitMQ `Delivery` with its originating `splitId` (queue name) before placing
it in the reader's internal `BlockingQueue`.
3. **Exact TableId Binding:** To prevent rows from losing their identifiers
in multi-table mode, `RabbitmqSourceReader` caches the exact `TablePath`
(Catalog/Database/Table) generated by `CatalogTableUtil` during initialization.
At runtime, it uses the `splitId` to inject the correct `TableId` into each
`SeaTunnelRow` via `row.setTableId()`.
4. **Split Distribution & Graceful Shutdown:** `RabbitmqSplitEnumerator` now
discovers and assigns splits per table/queue. Implemented `signalNoMoreSplits`
in the enumerator and `handleNoMoreSplits` in the reader to prevent the job
from hanging in bounded/E2E environments when parallelism is higher than the
available splits.
### Does this PR introduce _any_ user-facing change?
Yes. Users can now use the `tables_configs` array in the RabbitMQ source
configuration to read from multiple queues simultaneously within a single job.
*Previous behavior:* The RabbitMQ source only supported a single
queue/schema configuration at the root level.
*New behavior:* Supports `tables_configs` for multi-table mode, while
remaining fully backward compatible with the legacy single-queue configuration
logic.
### How was this patch tested?
- Added `RabbitmqSourceTest` to verify config parsing, fallback logic, and
table identifier generation.
- Added `RabbitmqSplitEnumeratorTest` to verify multi-table split discovery
and assignment logic.
- Added `RabbitmqSourceFactoryTest` to ensure validation rules and option
keys are correctly registered.
- Updated E2E Integration test (`RabbitmqIT.java`) by adding
`testRabbitMQMultiTableE2E`. This tests the complete pipeline by generating
data for two different schemas/queues, consuming them concurrently, and
validating the correct row distribution per table using the Assert Sink (with
`rule_value` assertions).
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [x] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If necessary, please update `incompatible-changes.md` to describe the
incompatibility caused by this PR.
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
2. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
3. Add ci label in
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
4. Add e2e testcase in
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
5. Update connector
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
*(Note: Since this PR enhances an existing connector rather than creating a
new one from scratch, steps 1, 2, 3, and 5 in the checklist above are not
applicable. Step 4 has been fulfilled by updating the existing RabbitMQ E2E
tests).*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]