This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a72743aa7946c4c0ce196abaf2504bc693093111
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 17 05:16:28 2025 +0200

    [improve][io] Replace Qpid in tests with RabbitMQ in Testcontainers and 
upgrade RabbitMQ client version (#25085)
    
    (cherry picked from commit d80401faba37d3707c5e1eb92fff29f8040c2a64)
---
 pom.xml                                            |  2 +-
 pulsar-io/rabbitmq/pom.xml                         | 19 +-----
 .../pulsar/io/rabbitmq/RabbitMQBrokerManager.java  | 42 +++++++------
 .../pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java  |  8 +--
 .../io/rabbitmq/source/RabbitMQSourceTest.java     |  8 +--
 pulsar-io/rabbitmq/src/test/resources/qpid.json    | 68 ----------------------
 6 files changed, 31 insertions(+), 116 deletions(-)

diff --git a/pom.xml b/pom.xml
index ea04ffe534a..b736bbaceb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,7 +226,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <cassandra.version>3.11.2</cassandra.version>
     <aerospike-client.version>4.5.0</aerospike-client.version>
     <kafka-client.version>3.9.0</kafka-client.version>
-    <rabbitmq-client.version>5.18.0</rabbitmq-client.version>
+    <rabbitmq-client.version>5.28.0</rabbitmq-client.version>
     <aws-sdk.version>1.12.788</aws-sdk.version>
     <aws-sdk2.version>2.32.28</aws-sdk2.version>
     <avro.version>1.11.4</avro.version>
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 39b2b3f8105..6f0ea7d1ef8 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -81,24 +81,9 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.qpid</groupId>
-      <artifactId>qpid-broker</artifactId>
-      <version>9.2.0</version>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>rabbitmq</artifactId>
       <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.qpid</groupId>
-          <artifactId>qpid-bdbstore</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.qpid</groupId>
-          
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.qpid</groupId>
-          <artifactId>qpid-broker-plugins-derby-store</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.awaitility</groupId>
diff --git 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
index 4ff8c61e4f4..ef69608eec5 100644
--- 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
+++ 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
@@ -18,36 +18,34 @@
  */
 package org.apache.pulsar.io.rabbitmq;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.qpid.server.SystemLauncher;
-import org.apache.qpid.server.model.SystemConfig;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
 
 public class RabbitMQBrokerManager {
+    private RabbitMQContainer rabbitMQContainer;
 
-    private final SystemLauncher systemLauncher = new SystemLauncher();
-
-    public void startBroker(String port) throws Exception {
-        Map<String, Object> brokerOptions = getBrokerOptions(port);
-        systemLauncher.startup(brokerOptions);
+    public void startBroker() throws Exception {
+        rabbitMQContainer = new 
RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"));
+        rabbitMQContainer.withVhost("default");
+        rabbitMQContainer.start();
     }
 
     public void stopBroker() {
-        systemLauncher.shutdown();
+        if (rabbitMQContainer != null) {
+            rabbitMQContainer.stop();
+            rabbitMQContainer = null;
+        }
+    }
+
+    public int getPort() {
+        return rabbitMQContainer.getAmqpPort();
     }
 
-    Map<String, Object> getBrokerOptions(String port) throws Exception {
-        Path tmpFolder = Files.createTempDirectory("qpidWork");
-        Map<String, Object> config = new HashMap<>();
-        config.put("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
-        config.put("qpid.amqp_port", port);
+    public String getUser() {
+        return rabbitMQContainer.getAdminUsername();
+    }
 
-        Map<String, Object> context = new HashMap<>();
-        context.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, 
"classpath:qpid.json");
-        context.put(SystemConfig.TYPE, "Memory");
-        context.put(SystemConfig.CONTEXT, config);
-        return context;
+    public String getPassword() {
+        return rabbitMQContainer.getAdminPassword();
     }
 }
diff --git 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
index f03a36ce114..51d09163829 100644
--- 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
+++ 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -40,7 +40,7 @@ public class RabbitMQSinkTest {
     @BeforeMethod
     public void setUp() throws Exception {
         rabbitMQBrokerManager = new RabbitMQBrokerManager();
-        rabbitMQBrokerManager.startBroker("5673");
+        rabbitMQBrokerManager.startBroker();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -52,10 +52,10 @@ public class RabbitMQSinkTest {
     public void testOpenAndWriteSink() throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put("host", "localhost");
-        configs.put("port", "5673");
+        configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort()));
         configs.put("virtualHost", "default");
-        configs.put("username", "guest");
-        configs.put("password", "guest");
+        configs.put("username", rabbitMQBrokerManager.getUser());
+        configs.put("password", rabbitMQBrokerManager.getPassword());
         configs.put("connectionName", "test-connection");
         configs.put("requestedChannelMax", "0");
         configs.put("requestedFrameMax", "0");
diff --git 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
index c798179f60e..4f139880ec1 100644
--- 
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
+++ 
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
@@ -37,7 +37,7 @@ public class RabbitMQSourceTest {
     @BeforeMethod
     public void setUp() throws Exception {
         rabbitMQBrokerManager = new RabbitMQBrokerManager();
-        rabbitMQBrokerManager.startBroker("5672");
+        rabbitMQBrokerManager.startBroker();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -49,10 +49,10 @@ public class RabbitMQSourceTest {
     public void testOpenAndWriteSink() throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put("host", "localhost");
-        configs.put("port", "5672");
+        configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort()));
         configs.put("virtualHost", "default");
-        configs.put("username", "guest");
-        configs.put("password", "guest");
+        configs.put("username", rabbitMQBrokerManager.getUser());
+        configs.put("password", rabbitMQBrokerManager.getPassword());
         configs.put("queueName", "test-queue");
         configs.put("connectionName", "test-connection");
         configs.put("requestedChannelMax", "0");
diff --git a/pulsar-io/rabbitmq/src/test/resources/qpid.json 
b/pulsar-io/rabbitmq/src/test/resources/qpid.json
deleted file mode 100644
index 419e9cc1e4a..00000000000
--- a/pulsar-io/rabbitmq/src/test/resources/qpid.json
+++ /dev/null
@@ -1,68 +0,0 @@
-{
-  "name": "${broker.name}",
-  "modelVersion": "2.0",
-  "authenticationproviders": [
-    {
-      "name": "plain",
-      "type": "Plain",
-      "secureOnlyMechanisms": [],
-      "users": [
-        {
-          "name": "guest",
-          "password": "guest",
-          "type": "managed"
-        }
-      ]
-    }
-  ],
-  "brokerloggers": [
-    {
-      "name": "console",
-      "type": "Console",
-      "brokerloginclusionrules": [
-        {
-          "name": "Root",
-          "type": "NameAndLevel",
-          "level": "WARN",
-          "loggerName": "ROOT"
-        },
-        {
-          "name": "Qpid",
-          "type": "NameAndLevel",
-          "level": "INFO",
-          "loggerName": "org.apache.qpid.*"
-        },
-        {
-          "name": "Operational",
-          "type": "NameAndLevel",
-          "level": "INFO",
-          "loggerName": "qpid.message.*"
-        },
-        {
-          "name": "Statistics",
-          "type": "NameAndLevel",
-          "level": "INFO",
-          "loggerName": "qpid.statistics.*"
-        }
-      ]
-    }
-  ],
-  "ports": [
-    {
-      "name": "AMQP",
-      "port": "${qpid.amqp_port}",
-      "authenticationProvider": "plain",
-      "protocols": [
-        "AMQP_0_9_1"
-      ]
-    }
-  ],
-  "virtualhostnodes": [
-    {
-      "name": "default",
-      "type": "Memory",
-      "defaultVirtualHostNode": "true",
-      "virtualHostInitialConfiguration": "{\"type\": \"Memory\"}"
-    }
-  ]
-}
\ No newline at end of file

Reply via email to