This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 4af6f40 AMQ-6494 is related, fix intermittent failure of
RedeliveryPolicyTest related to vm transport server being shutdown while in use
via async onException handler
4af6f40 is described below
commit 4af6f4018656e01989136042a8ad4ec1ab64c137
Author: gtully <[email protected]>
AuthorDate: Thu Oct 3 11:08:05 2019 +0100
AMQ-6494 is related, fix intermittent failure of RedeliveryPolicyTest
related to vm transport server being shutdown while in use via async
onException handler
---
.../activemq/transport/vm/VMTransportServer.java | 9 +++--
.../org/apache/activemq/RedeliveryPolicyTest.java | 39 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index 2f3d519..8bef1cc 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BrokerInfo;
@@ -35,7 +36,7 @@ public class VMTransportServer implements TransportServer {
private TransportAcceptListener acceptListener;
private final URI location;
- private boolean disposed;
+ private AtomicBoolean disposed = new AtomicBoolean(false);
private final AtomicInteger connectionCount = new AtomicInteger(0);
private final boolean disposeOnDisconnect;
@@ -64,7 +65,7 @@ public class VMTransportServer implements TransportServer {
public VMTransport connect() throws IOException {
TransportAcceptListener al;
synchronized (this) {
- if (disposed) {
+ if (disposed.get()) {
throw new IOException("Server has been disposed.");
}
al = acceptListener;
@@ -117,7 +118,9 @@ public class VMTransportServer implements TransportServer {
}
public void stop() throws IOException {
- VMTransportFactory.stopped(this);
+ if (disposed.compareAndSet(false, true)) {
+ VMTransportFactory.stopped(this);
+ }
}
public URI getConnectURI() {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 5af3a37..a0a1ca8 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -600,6 +600,45 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
+ public void testRepeatedServerClose() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ session.commit();
+
+ final int maxRedeliveries = 10000;
+ for (int i=0;i<=maxRedeliveries + 1;i++) {
+
+ final ActiveMQConnection toTest =
(ActiveMQConnection)factory.createConnection(userName, password);
+ toTest.start();
+
+ // abortive close via broker
+ for (VMTransportServer transportServer :
VMTransportFactory.SERVERS.values()) {
+ transportServer.stop();
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return toTest.isTransportFailed();
+ }
+ },10000, 100 );
+
+ try {
+ toTest.close();
+ } catch (Exception expected) {
+ } finally {
+ }
+ }
+ }
+
+
+
public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
connection.start();