This is an automated email from the ASF dual-hosted git repository.
ffang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 4a617b4 [CXF-6454] Handle InvalidClientIdException and allow to set
retryInterval
4a617b4 is described below
commit 4a617b492a2185285372594570b6706c9c9e8f50
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Apr 21 15:25:52 2016 +0200
[CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
(cherry picked from commit 5e3ac2b252412b90d6c91dea855773a294c3a565)
Conflicts:
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
---
.../apache/cxf/transport/jms/JMSConfigFactory.java | 2 ++
.../apache/cxf/transport/jms/JMSConfiguration.java | 9 ++++++++
.../apache/cxf/transport/jms/JMSDestination.java | 12 +++++++---
.../apache/cxf/transport/jms/uri/JMSEndpoint.java | 11 +++++++++
.../jms/util/PollingMessageListenerContainer.java | 5 +++++
.../cxf/transport/jms/AbstractJMSTester.java | 6 ++---
.../cxf/transport/jms/JMSDestinationTest.java | 26 ++++++++++++++++++++++
7 files changed, 65 insertions(+), 6 deletions(-)
diff --git
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index d8c588a..48953f5 100644
---
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -130,6 +130,8 @@ public final class JMSConfigFactory {
String targetService = endpoint.getTargetService();
jmsConfig.setTargetService(targetService);
jmsConfig.setMessageSelector(endpoint.getMessageSelector());
+ int retryInterval = endpoint.getRetryInterval();
+ jmsConfig.setRetryInterval(retryInterval);
return jmsConfig;
}
diff --git
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index dc91253..64b21ad 100644
---
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -97,6 +97,7 @@ public class JMSConfiguration {
// For jms spec. Do not configure manually
private String targetService;
private String requestURI;
+ private int retryInterval;
public void ensureProperlyConfigured() {
ConnectionFactory cf = getConnectionFactory();
@@ -504,4 +505,12 @@ public class JMSConfiguration {
this.transactionManager = transactionManager;
}
+ public int getRetryInterval() {
+ return this.retryInterval;
+ }
+
+ public void setRetryInterval(int retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+
}
diff --git
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index ef2be92..b85411e 100644
---
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -27,6 +27,7 @@ import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -101,7 +102,10 @@ public class JMSDestination extends
AbstractMultiplexDestination implements Mess
try {
this.jmsListener = createTargetDestinationListener();
} catch (Exception e) {
- // If first connect fails we will try to establish the connection
in the background
+ if (e.getCause() != null &&
InvalidClientIDException.class.isInstance(e.getCause())) {
+ throw e;
+ }
+ // If first connect fails we will try to establish the connection
in the background
new Thread(new Runnable() {
@Override
@@ -148,6 +152,8 @@ public class JMSDestination extends
AbstractMultiplexDestination implements Mess
connection.start();
return container;
} catch (JMSException e) {
+ ResourceCloser.close(connection);
+ this.connection = null;
throw JMSUtil.convertJmsException(e);
} finally {
ResourceCloser.close(session);
@@ -171,9 +177,9 @@ public class JMSDestination extends
AbstractMultiplexDestination implements Mess
LOG.log(Level.WARNING, message);
}
try {
- Thread.sleep(5000);
+ Thread.sleep(jmsConfig.getRetryInterval());
} catch (InterruptedException e2) {
- // Ignore
+ shutdown = true;
}
}
} while (jmsListener == null && !shutdown);
diff --git
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index ad4bbf4..89c12fb 100644
---
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -87,6 +87,7 @@ public class JMSEndpoint {
private String username;
private int concurrentConsumers = 1;
private String messageSelector;
+ private int retryInterval = 5000;
/**
* @param uri
@@ -488,4 +489,14 @@ public class JMSEndpoint {
+ public int getRetryInterval() {
+ return retryInterval;
+ }
+ public void setRetryInterval(int retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+ public void setRetryInterval(String retryInterval) {
+ this.retryInterval = Integer.valueOf(retryInterval);
+ }
+
}
diff --git
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 80d5f89..9edd0da 100644
---
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -74,6 +74,11 @@ public class PollingMessageListenerContainer extends
AbstractMessageListenerCont
}
} catch (Throwable e) {
handleException(e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ // Ignore
+ }
}
}
}
diff --git
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 8746628..c373665 100644
---
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -32,7 +32,6 @@ import javax.xml.namespace.QName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
@@ -57,6 +56,7 @@ public abstract class AbstractJMSTester extends Assert {
protected static final int MAX_RECEIVE_TIME = 10;
protected static final String MESSAGE_CONTENT = "HelloWorld";
protected static Bus bus;
+ protected static ActiveMQConnectionFactory cf1;
protected static ConnectionFactory cf;
protected static BrokerService broker;
@@ -78,8 +78,8 @@ public abstract class AbstractJMSTester extends Assert {
broker.addConnector(brokerUri);
broker.start();
bus = BusFactory.getDefaultBus();
- ActiveMQConnectionFactory cf1 = new
ActiveMQConnectionFactory(brokerUri);
- cf = new PooledConnectionFactory(cf1);
+ cf1 = new ActiveMQConnectionFactory(brokerUri);
+ cf = cf1;
}
@AfterClass
diff --git
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 3f8f154..52e3f54 100644
---
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -25,8 +25,10 @@ import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Topic;
@@ -41,6 +43,7 @@ import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.MultiplexDestination;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.junit.Ignore;
import org.junit.Test;
@@ -76,6 +79,29 @@ public class JMSDestinationTest extends AbstractJMSTester {
conduit.close();
destination.shutdown();
}
+
+ @Test(expected = InvalidClientIDException.class)
+ public void testDurableInvalidClientId() throws Throwable {
+ Connection con = cf1.createConnection();
+ JMSDestination destination = null;
+ try {
+ con.setClientID("testClient");
+ con.start();
+ destMessage = null;
+ EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService",
"HelloWorldPubSubPort");
+ JMSConfiguration jmsConfig =
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
+ jmsConfig.setDurableSubscriptionClientId("testClient");
+ jmsConfig.setDurableSubscriptionName("testsub");
+ jmsConfig.setConnectionFactory(cf);
+ destination = new JMSDestination(bus, ei, jmsConfig);
+ destination.setMessageObserver(createMessageObserver());
+ } catch (RuntimeException e) {
+ throw e.getCause();
+ } finally {
+ ResourceCloser.close(con);
+ destination.shutdown();
+ }
+ }
@Test
public void testOneWayDestination() throws Exception {
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].