yzeng1618 commented on code in PR #10557:
URL: https://github.com/apache/seatunnel/pull/10557#discussion_r2887166145
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java:
##########
@@ -255,17 +258,71 @@ public void testRabbitMQUSingDefaultConfig(TestContainer
container) throws Excep
// init consumer client before executeJob start in every testContainer
RabbitmqClient sinkRabbitmqClient = getRabbitmqClient(sinkQueueName);
+ BlockingQueue<DeliveryMessage> queue = new LinkedBlockingQueue<>();
+ DefaultConsumer consumer =
sinkRabbitmqClient.getQueueingConsumer(queue, sinkQueueName);
- Handover handover = new Handover<>();
- DefaultConsumer consumer =
sinkRabbitmqClient.getQueueingConsumer(handover);
+ // Pre-start consumption to prevent message loss in fast-finishing
Batch jobs
sinkRabbitmqClient.getChannel().basicConsume(sinkQueueName, true,
consumer);
- // assert execute Job code
- Container.ExecResult execResult = null;
- try {
- execResult =
container.executeJob("/rabbitmq-to-rabbitmq-using-default-config.conf");
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
+
+ Container.ExecResult execResult =
+
container.executeJob("/rabbitmq-to-rabbitmq-using-default-config.conf");
Assertions.assertEquals(0, execResult.getExitCode());
+
+ sinkRabbitmqClient.close();
+ }
+
+ @TestTemplate
+ public void testRabbitMQMultiTableE2E(TestContainer container) throws
Exception {
+ // Define schemas for two different tables/queues
+ SeaTunnelRowType type1 =
+ new SeaTunnelRowType(
+ new String[] {"id", "name"},
+ new SeaTunnelDataType[] {BasicType.LONG_TYPE,
BasicType.STRING_TYPE});
+ SeaTunnelRowType type2 =
+ new SeaTunnelRowType(
+ new String[] {"id", "age"},
+ new SeaTunnelDataType[] {BasicType.LONG_TYPE,
BasicType.INT_TYPE});
+
+ // Send 10 records to each unique RabbitMQ queue
+ sendData("multi_table_1", type1, 10);
+ sendData("multi_table_2", type2, 10);
+
+ // Wait for messages to be fully persisted in RabbitMQ broker
+ Thread.sleep(3000);
+
+ // Execute the SeaTunnel synchronization job
+ // The job uses a multi-table configuration to consume from both
queues simultaneously
+ Container.ExecResult execResult =
container.executeJob("/rabbitmq_multitable.conf");
+
+ // Validate that the job finished successfully (exit code 0)
+ // If the multi-table routing or schema mapping fails, the job will
crash with exit code 1
+ Assertions.assertEquals(
+ 0, execResult.getExitCode(), "The SeaTunnel job should finish
with exit code 0.");
Review Comment:
The E2E test only checks exit code. Since both tables share a single sink
queue, per-table routing correctness (e.g., correct tableId injection) is never
verified. Suggest using separate Assert Sinks per table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]