This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9a314406b6e CAMEL-19664: fix multiple concurrency issues in
camel-test-infra-artemis (#10854)
9a314406b6e is described below
commit 9a314406b6e55a00acef957cbf5831da73f2a7e3
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Thu Jul 27 12:57:22 2023 +0200
CAMEL-19664: fix multiple concurrency issues in camel-test-infra-artemis
(#10854)
---
.../services/AbstractArtemisEmbeddedService.java | 38 ++++++++++++----------
.../infra/artemis/services/ArtemisAMQPService.java | 2 +-
.../infra/artemis/services/ArtemisMQTTService.java | 2 +-
.../services/ArtemisPersistentVMService.java | 3 +-
.../services/ArtemisTCPAllProtocolsService.java | 5 +--
.../infra/artemis/services/ArtemisVMService.java | 10 ++++--
6 files changed, 34 insertions(+), 26 deletions(-)
diff --git
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
index d2c33ecd586..8a8dc969013 100644
---
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
+++
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
@@ -41,37 +41,36 @@ import static org.junit.jupiter.api.Assertions.fail;
public abstract class AbstractArtemisEmbeddedService implements
ArtemisService, ConnectionFactoryAware {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractArtemisEmbeddedService.class);
- protected static final LongAdder BROKER_COUNT = new LongAdder();
+ private static final LongAdder BROKER_COUNT = new LongAdder();
- protected int tcpPort = AvailablePortFinder.getNextAvailable();
- protected EmbeddedActiveMQ embeddedBrokerService;
- private Configuration artemisConfiguration;
+ protected final EmbeddedActiveMQ embeddedBrokerService;
+ private final Configuration artemisConfiguration;
public AbstractArtemisEmbeddedService() {
- defaultConfiguration();
-
-
embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration,
AvailablePortFinder.getNextAvailable()));
+ this(AvailablePortFinder.getNextAvailable());
}
- public AbstractArtemisEmbeddedService(int port) {
- defaultConfiguration();
+ protected AbstractArtemisEmbeddedService(int port) {
+ embeddedBrokerService = new EmbeddedActiveMQ();
+ artemisConfiguration = new ConfigurationImpl();
-
embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration,
port));
+ embeddedBrokerService.setConfiguration(configure(port));
}
- private void defaultConfiguration() {
- embeddedBrokerService = new EmbeddedActiveMQ();
+ private synchronized Configuration configure(int port) {
+ final int brokerId = BROKER_COUNT.intValue();
+ BROKER_COUNT.increment();
// Base configuration
- artemisConfiguration = new ConfigurationImpl();
artemisConfiguration.setSecurityEnabled(false);
- BROKER_COUNT.increment();
- artemisConfiguration.setBrokerInstance(new File("target", "artemis-" +
BROKER_COUNT.intValue()));
+ artemisConfiguration.setBrokerInstance(new File("target", "artemis-" +
brokerId));
artemisConfiguration.setJMXManagementEnabled(false);
artemisConfiguration.setMaxDiskUsage(98);
+
+ return configure(artemisConfiguration, port, brokerId);
}
- protected abstract Configuration getConfiguration(Configuration
artemisConfiguration, int port);
+ protected abstract Configuration configure(Configuration
artemisConfiguration, int port, int brokerId);
public void customConfiguration(Consumer<Configuration> configuration) {
configuration.accept(artemisConfiguration);
@@ -103,8 +102,11 @@ public abstract class AbstractArtemisEmbeddedService
implements ArtemisService,
@Override
public void initialize() {
try {
- embeddedBrokerService.start();
- embeddedBrokerService.getActiveMQServer().waitForActivation(20,
TimeUnit.SECONDS);
+ if (embeddedBrokerService.getActiveMQServer() == null ||
!embeddedBrokerService.getActiveMQServer().isStarted()) {
+ embeddedBrokerService.start();
+
+
embeddedBrokerService.getActiveMQServer().waitForActivation(20,
TimeUnit.SECONDS);
+ }
} catch (Exception e) {
LOG.warn("Unable to start embedded Artemis broker: {}",
e.getMessage(), e);
fail(e.getMessage());
diff --git
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
index ab799052292..b5e9c023ae4 100644
---
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
+++
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
@@ -33,7 +33,7 @@ public class ArtemisAMQPService extends
AbstractArtemisEmbeddedService {
}
@Override
- protected Configuration getConfiguration(Configuration
artemisConfiguration, int port) {
+ protected Configuration configure(Configuration artemisConfiguration, int
port, int brokerId) {
amqpPort = port;
brokerURL = "tcp://0.0.0.0:" + amqpPort
+
"?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300";
diff --git
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
index e0d38e3c268..b0989bde725 100644
---
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
+++
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
@@ -34,7 +34,7 @@ public class ArtemisMQTTService extends
AbstractArtemisEmbeddedService {
}
@Override
- protected Configuration getConfiguration(Configuration configuration, int
port) {
+ protected Configuration configure(Configuration configuration, int port,
int brokerId) {
this.port = port;
brokerURL = "tcp://0.0.0.0:" + port;
diff --git
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
index e329966889c..2260e5db8e4 100644
---
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
+++
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
@@ -29,8 +29,7 @@ public class ArtemisPersistentVMService extends
AbstractArtemisEmbeddedService {
private String brokerURL;
@Override
- protected Configuration getConfiguration(Configuration configuration, int
port) {
- final int brokerId = super.BROKER_COUNT.intValue();
+ protected Configuration configure(Configuration configuration, int port,
int brokerId) {
brokerURL = "vm://" + brokerId;
configuration.setPersistenceEnabled(true);
diff --git
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
index fbbb8ec0af5..faf84ecb17d 100644
---
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
+++
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
@@ -32,8 +32,9 @@ public class ArtemisTCPAllProtocolsService extends
AbstractArtemisEmbeddedServic
private int port;
@Override
- protected Configuration getConfiguration(Configuration configuration, int
port) {
- final int brokerId = super.BROKER_COUNT.intValue();
+ protected Configuration configure(Configuration configuration, int port,
int brokerId) {
+ this.port = port;
+
port = AvailablePortFinder.getNextAvailable();
brokerURL = "tcp://0.0.0.0:" + port;
diff --git
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
index 0f08dd9a64b..77e5d96bcf9 100644
---
a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
+++
b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
@@ -20,19 +20,25 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.fail;
public class ArtemisVMService extends AbstractArtemisEmbeddedService {
+ private static final Logger LOG =
LoggerFactory.getLogger(ArtemisVMService.class);
private String brokerURL;
@Override
- protected Configuration getConfiguration(Configuration configuration, int
port) {
- final int brokerId = super.BROKER_COUNT.intValue();
+ protected Configuration configure(Configuration configuration, int port,
int brokerId) {
brokerURL = "vm://" + brokerId;
+ LOG.info("Creating a new Artemis VM-based broker");
configuration.setPersistenceEnabled(false);
+ configuration.setJournalMinFiles(10);
+ configuration.setSecurityEnabled(false);
+
try {
configuration.addAcceptorConfiguration("in-vm", "vm://" +
brokerId);
} catch (Exception e) {