yzeng1618 commented on code in PR #10557:
URL: https://github.com/apache/seatunnel/pull/10557#discussion_r2887166145


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java:
##########
@@ -255,17 +258,71 @@ public void testRabbitMQUSingDefaultConfig(TestContainer 
container) throws Excep
 
         // init consumer client before executeJob start in every testContainer
         RabbitmqClient sinkRabbitmqClient = getRabbitmqClient(sinkQueueName);
+        BlockingQueue<DeliveryMessage> queue = new LinkedBlockingQueue<>();
+        DefaultConsumer consumer = 
sinkRabbitmqClient.getQueueingConsumer(queue, sinkQueueName);
 
-        Handover handover = new Handover<>();
-        DefaultConsumer consumer = 
sinkRabbitmqClient.getQueueingConsumer(handover);
+        // Pre-start consumption to prevent message loss in fast-finishing 
Batch jobs
         sinkRabbitmqClient.getChannel().basicConsume(sinkQueueName, true, 
consumer);
-        // assert execute Job code
-        Container.ExecResult execResult = null;
-        try {
-            execResult = 
container.executeJob("/rabbitmq-to-rabbitmq-using-default-config.conf");
-        } catch (IOException | InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+
+        Container.ExecResult execResult =
+                
container.executeJob("/rabbitmq-to-rabbitmq-using-default-config.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
+
+        sinkRabbitmqClient.close();
+    }
+
+    @TestTemplate
+    public void testRabbitMQMultiTableE2E(TestContainer container) throws 
Exception {
+        // Define schemas for two different tables/queues
+        SeaTunnelRowType type1 =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name"},
+                        new SeaTunnelDataType[] {BasicType.LONG_TYPE, 
BasicType.STRING_TYPE});
+        SeaTunnelRowType type2 =
+                new SeaTunnelRowType(
+                        new String[] {"id", "age"},
+                        new SeaTunnelDataType[] {BasicType.LONG_TYPE, 
BasicType.INT_TYPE});
+
+        // Send 10 records to each unique RabbitMQ queue
+        sendData("multi_table_1", type1, 10);
+        sendData("multi_table_2", type2, 10);
+
+        // Wait for messages to be fully persisted in RabbitMQ broker
+        Thread.sleep(3000);
+
+        // Execute the SeaTunnel synchronization job
+        // The job uses a multi-table configuration to consume from both 
queues simultaneously
+        Container.ExecResult execResult = 
container.executeJob("/rabbitmq_multitable.conf");
+
+        // Validate that the job finished successfully (exit code 0)
+        // If the multi-table routing or schema mapping fails, the job will 
crash with exit code 1
+        Assertions.assertEquals(
+                0, execResult.getExitCode(), "The SeaTunnel job should finish 
with exit code 0.");

Review Comment:
   The E2E test only checks exit code. Since both tables share a single sink 
queue, per-table routing correctness (e.g., correct tableId injection) is never 
verified. Suggest using separate Assert Sinks per table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to