Author: jstrachan
Date: Mon Apr 21 08:35:35 2008
New Revision: 650171
URL: http://svn.apache.org/viewvc?rev=650171&view=rev
Log:
initial implementation of https://issues.apache.org/activemq/browse/CAMEL-342
to support temporary destinations nicely in camel-jms
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationEndpoint.java
(with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationTransformProcessor.java
(with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProviderMetadata.java
(with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
(with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
(with props)
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsProviderTest.java
(with props)
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryQueueRouteTest.java
- copied, changed from r649546,
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryTopicRouteTest.java
(with props)
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationEndpoint.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationEndpoint.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationEndpoint.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,32 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.jms.JMSException;
+
+/**
+ * An optional interface that a [EMAIL PROTECTED] JmsEndpoint} may implement
to return the underlying
+ * [EMAIL PROTECTED] Destination} object
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface DestinationEndpoint {
+ Destination getJmsDestination(Session session) throws JMSException;
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationTransformProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationTransformProcessor.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationTransformProcessor.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationTransformProcessor.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,26 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+/**
+ * A processor which is capable of transforming the
+ *
+ * @version $Revision: 1.1 $
+ */
+public class DestinationTransformProcessor {
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DestinationTransformProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=650171&r1=650170&r2=650171&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
Mon Apr 21 08:35:35 2008
@@ -51,6 +51,9 @@
public static final String QUEUE_PREFIX = "queue:";
public static final String TOPIC_PREFIX = "topic:";
+ public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
+ public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
+
private static final transient Log LOG =
LogFactory.getLog(JmsComponent.class);
private static final String DEFAULT_QUEUE_BROWSE_STRATEGY =
"org.apache.camel.component.jms.DefaultQueueBrowseStrategy";
private JmsConfiguration configuration;
@@ -361,12 +364,21 @@
throws Exception {
boolean pubSubDomain = false;
+ boolean tempDestination = false;
if (remaining.startsWith(QUEUE_PREFIX)) {
pubSubDomain = false;
remaining =
removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
} else if (remaining.startsWith(TOPIC_PREFIX)) {
pubSubDomain = true;
remaining =
removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
+ } else if (remaining.startsWith(TEMP_QUEUE_PREFIX)) {
+ pubSubDomain = false;
+ tempDestination = true;
+ remaining =
removeStartingCharacters(remaining.substring(TEMP_QUEUE_PREFIX.length()), '/');
+ } else if (remaining.startsWith(TEMP_TOPIC_PREFIX)) {
+ pubSubDomain = true;
+ tempDestination = true;
+ remaining =
removeStartingCharacters(remaining.substring(TEMP_TOPIC_PREFIX.length()), '/');
}
final String subject = convertPathToActualDestination(remaining,
parameters);
@@ -376,10 +388,18 @@
JmsConfiguration newConfiguration = getConfiguration().copy();
JmsEndpoint endpoint;
QueueBrowseStrategy strategy = getQueueBrowseStrategy();
- if (pubSubDomain || strategy == null) {
- endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain,
newConfiguration);
+ if (pubSubDomain) {
+ if (tempDestination) {
+ endpoint = new JmsTemporaryTopicEndpoint(uri, this, subject,
newConfiguration);
+ } else {
+ endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain,
newConfiguration);
+ }
} else {
- endpoint = new JmsQueueEndpoint(uri, this, subject,
newConfiguration, strategy);
+ if (tempDestination) {
+ endpoint = new JmsTemporaryQueueEndpoint(uri, this, subject,
newConfiguration, strategy);
+ } else {
+ endpoint = new JmsQueueEndpoint(uri, this, subject,
newConfiguration, strategy);
+ }
}
String selector = (String)parameters.remove("selector");
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=650171&r1=650170&r2=650171&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Mon Apr 21 08:35:35 2008
@@ -27,7 +27,6 @@
import javax.jms.TopicPublisher;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.PackageHelper;
import org.apache.commons.logging.Log;
@@ -112,6 +111,8 @@
// Always make a JMS message copy when it's passed to Producer
private boolean alwaysCopyMessage;
private boolean useMessageIDAsCorrelationID;
+ private JmsProviderMetadata providerMetadata = new JmsProviderMetadata();
+ private JmsOperations metadataJmsOperations;
public JmsConfiguration() {
}
@@ -131,19 +132,6 @@
}
}
- /**
- * Creates a JmsOperations object used for request/response using a request
- * timeout value
- */
- public JmsOperations createInOutTemplate(boolean pubSubDomain, String
destination, long requestTimeout) {
- JmsOperations answer = createInOnlyTemplate(pubSubDomain, destination);
- if (answer instanceof JmsTemplate && requestTimeout > 0) {
- JmsTemplate jmsTemplate = (JmsTemplate)answer;
- jmsTemplate.setExplicitQosEnabled(true);
- jmsTemplate.setTimeToLive(requestTimeout);
- }
- return answer;
- }
public static interface MessageSentCallback {
void sent(Message message);
@@ -281,7 +269,25 @@
}
}
- public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String
destination) {
+
+ /**
+ * Creates a [EMAIL PROTECTED] JmsOperations} object used for
request/response using a request
+ * timeout value
+ */
+ public JmsOperations createInOutTemplate(JmsEndpoint endpoint, boolean
pubSubDomain, String destination, long requestTimeout) {
+ JmsOperations answer = createInOnlyTemplate(endpoint, pubSubDomain,
destination);
+ if (answer instanceof JmsTemplate && requestTimeout > 0) {
+ JmsTemplate jmsTemplate = (JmsTemplate)answer;
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setTimeToLive(requestTimeout);
+ }
+ return answer;
+ }
+
+ /**
+ * Creates a [EMAIL PROTECTED] JmsOperations} object used for one way
messaging
+ */
+ public JmsOperations createInOnlyTemplate(JmsEndpoint endpoint, boolean
pubSubDomain, String destination) {
if (jmsOperations != null) {
return jmsOperations;
@@ -296,6 +302,12 @@
template.setPubSubDomain(pubSubDomain);
if (destinationResolver != null) {
template.setDestinationResolver(destinationResolver);
+ if (endpoint instanceof DestinationEndpoint) {
+ LOG.debug("You are overloading the destinationResolver
property on a DestinationEndpoint; are you sure you want to do that?");
+ }
+ } else if (endpoint instanceof DestinationEndpoint) {
+ DestinationEndpoint destinationEndpoint = (DestinationEndpoint)
endpoint;
+
template.setDestinationResolver(createDestinationResolver(destinationEndpoint));
}
template.setDefaultDestinationName(destination);
@@ -388,7 +400,7 @@
/**
* Sets the connection factory to be used for sending messages via the
- * [EMAIL PROTECTED] JmsTemplate} via [EMAIL PROTECTED]
#createInOnlyTemplate(boolean, String)}
+ * [EMAIL PROTECTED] JmsTemplate} via [EMAIL PROTECTED]
#createInOnlyTemplate(JmsEndpoint,boolean, String)}
*
* @param templateConnectionFactory the connection factory for sending
* messages
@@ -746,15 +758,58 @@
public void setRequestMapPurgePollTimeMillis(long
requestMapPurgePollTimeMillis) {
this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
}
+ public JmsProviderMetadata getProviderMetadata() {
+ return providerMetadata;
+ }
+
+ /**
+ * Allows the provider metadata to be explicitly configured. Typically
this is not required
+ * and Camel will auto-detect the provider metadata from the underlying
provider.
+ */
+ public void setProviderMetadata(JmsProviderMetadata providerMetadata) {
+ this.providerMetadata = providerMetadata;
+ }
+
+ public JmsOperations getMetadataJmsOperations(JmsEndpoint endpoint) {
+ if (metadataJmsOperations == null) {
+ metadataJmsOperations = getJmsOperations();
+ if (metadataJmsOperations == null) {
+ metadataJmsOperations = createInOnlyTemplate(endpoint, false,
null);
+ }
+ }
+ return metadataJmsOperations;
+ }
+
+ /**
+ * Sets the [EMAIL PROTECTED] JmsOperations} used to deduce the [EMAIL
PROTECTED] JmsProviderMetadata} details which if none
+ * is customized one is lazily created on demand
+ *
+ * @param metadataJmsOperations
+ */
+ public void setMetadataJmsOperations(JmsOperations metadataJmsOperations) {
+ this.metadataJmsOperations = metadataJmsOperations;
+ }
// Implementation methods
//
-------------------------------------------------------------------------
+ public static DestinationResolver createDestinationResolver(final
DestinationEndpoint destinationEndpoint) {
+ return new DestinationResolver() {
+ public Destination resolveDestinationName(Session session, String
destinationName, boolean pubSubDomain) throws JMSException {
+ return destinationEndpoint.getJmsDestination(session);
+ }
+ };
+ }
+
+
protected void
configureMessageListenerContainer(AbstractMessageListenerContainer container,
JmsEndpoint endpoint) {
container.setConnectionFactory(getListenerConnectionFactory());
- if (destinationResolver != null) {
+ if (endpoint instanceof DestinationEndpoint) {
+
container.setDestinationResolver(createDestinationResolver((DestinationEndpoint)
endpoint));
+ }
+ else if (destinationResolver != null) {
container.setDestinationResolver(destinationResolver);
}
if (autoStartup) {
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=650171&r1=650170&r2=650171&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Mon Apr 21 08:35:35 2008
@@ -17,6 +17,8 @@
package org.apache.camel.component.jms;
import javax.jms.Message;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
import org.apache.camel.ExchangePattern;
import org.apache.camel.PollingConsumer;
@@ -109,14 +111,14 @@
* Factory method for creating a new template for InOnly message exchanges
*/
public JmsOperations createInOnlyTemplate() {
- return configuration.createInOnlyTemplate(pubSubDomain, destination);
+ return configuration.createInOnlyTemplate(this, pubSubDomain,
destination);
}
/**
* Factory method for creating a new template for InOut message exchanges
*/
public JmsOperations createInOutTemplate() {
- return configuration.createInOutTemplate(pubSubDomain, destination,
getRequestTimeout());
+ return configuration.createInOutTemplate(this, pubSubDomain,
destination, getRequestTimeout());
}
// Properties
@@ -178,7 +180,7 @@
/**
* Sets the timeout in milliseconds which requests should timeout after
- *
+ *
* @param requestTimeout
*/
public void setRequestTimeout(long requestTimeout) {
@@ -189,6 +191,54 @@
return pubSubDomain;
}
- // Implementation methods
- //-------------------------------------------------------------------------
+ /**
+ * Lazily loads the temporary queue type if one has not been explicitly
configured
+ * via calling the [EMAIL PROTECTED]
JmsProviderMetadata#setTemporaryQueueType(Class)}
+ * on the [EMAIL PROTECTED] #getConfiguration()} instance
+ */
+ public Class<? extends TemporaryQueue> getTemporaryQueueType() {
+ JmsProviderMetadata metadata = getProviderMetadata();
+ JmsOperations template = getMetadataJmsOperations();
+ return metadata.getTemporaryQueueType(template);
+ }
+
+ /**
+ * Lazily loads the temporary topic type if one has not been explicitly
configured
+ * via calling the [EMAIL PROTECTED]
JmsProviderMetadata#setTemporaryTopicType(Class)}
+ * on the [EMAIL PROTECTED] #getConfiguration()} instance
+ */
+ public Class<? extends TemporaryTopic> getTemporaryTopicType() {
+ JmsOperations template = getMetadataJmsOperations();
+ JmsProviderMetadata metadata = getProviderMetadata();
+ return metadata.getTemporaryTopicType(template);
+ }
+
+ /**
+ * Returns the provider metadata
+ */
+ protected JmsProviderMetadata getProviderMetadata() {
+ JmsConfiguration conf = getConfiguration();
+ JmsProviderMetadata metadata = conf.getProviderMetadata();
+ return metadata;
+ }
+
+ /**
+ * Returns the [EMAIL PROTECTED] JmsOperations} used for metadata
operations such as creating temporary destinations
+ */
+ protected JmsOperations getMetadataJmsOperations() {
+ JmsOperations template =
getConfiguration().getMetadataJmsOperations(this);
+ if (template == null) {
+ throw new IllegalArgumentException("No Metadata JmsTemplate
supplied!");
+ }
+ return template;
+ }
+
+ public void checkValidTemplate(JmsTemplate template) {
+ if (template.getDestinationResolver() == null) {
+ if (this instanceof DestinationEndpoint) {
+ final DestinationEndpoint destinationEndpoint =
(DestinationEndpoint) this;
+
template.setDestinationResolver(JmsConfiguration.createDestinationResolver(destinationEndpoint));
+ }
+ }
+ }
}
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProviderMetadata.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProviderMetadata.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProviderMetadata.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProviderMetadata.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.SessionCallback;
+
+/**
+ * A class which represents some metadata about the underlying JMS provider
+ * so that we can properly bridge JMS providers such as for dealing with
temporary destinations.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JmsProviderMetadata {
+ private Class<? extends TemporaryQueue> temporaryQueueType;
+ private Class<? extends TemporaryTopic> temporaryTopicType;
+
+ /**
+ * Lazily loads the temporary queue type if one has not been explicitly
configured
+ * via calling the [EMAIL PROTECTED] #setTemporaryQueueType(Class)}
+ */
+ public Class<? extends TemporaryQueue> getTemporaryQueueType(JmsOperations
template) {
+ Class<? extends TemporaryQueue> answer = getTemporaryQueueType();
+ if (answer == null) {
+ loadTemporaryDestinationTypes(template);
+ answer = getTemporaryQueueType();
+ }
+ return answer;
+ }
+
+ /**
+ * Lazily loads the temporary topic type if one has not been explicitly
configured
+ * via calling the [EMAIL PROTECTED] #setTemporaryTopicType(Class)}
+ */
+ public Class<? extends TemporaryTopic> getTemporaryTopicType(JmsOperations
template) {
+ Class<? extends TemporaryTopic> answer = getTemporaryTopicType();
+ if (answer == null) {
+ loadTemporaryDestinationTypes(template);
+ answer = getTemporaryTopicType();
+ }
+ return answer;
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+
+ public Class<? extends TemporaryQueue> getTemporaryQueueType() {
+ return temporaryQueueType;
+ }
+
+ public void setTemporaryQueueType(Class<? extends TemporaryQueue>
temporaryQueueType) {
+ this.temporaryQueueType = temporaryQueueType;
+ }
+
+ public Class<? extends TemporaryTopic> getTemporaryTopicType() {
+ return temporaryTopicType;
+ }
+
+ public void setTemporaryTopicType(Class<? extends TemporaryTopic>
temporaryTopicType) {
+ this.temporaryTopicType = temporaryTopicType;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void loadTemporaryDestinationTypes(JmsOperations template) {
+ if (template == null) {
+ throw new IllegalArgumentException("No JmsTemplate supplied!");
+ }
+ template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ TemporaryQueue queue = session.createTemporaryQueue();
+ setTemporaryQueueType(queue.getClass());
+
+ TemporaryTopic topic = session.createTemporaryTopic();
+ setTemporaryTopicType(topic.getClass());
+
+ queue.delete();
+ topic.delete();
+ return null;
+ }
+ });
+ }
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProviderMetadata.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=650171&r1=650170&r2=650171&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
Mon Apr 21 08:35:35 2008
@@ -17,9 +17,12 @@
package org.apache.camel.component.jms;
import java.util.List;
+import java.util.Collections;
import org.apache.camel.Exchange;
import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
/**
@@ -28,19 +31,24 @@
* @version $Revision$
*/
public class JmsQueueEndpoint extends JmsEndpoint implements
BrowsableEndpoint<JmsExchange> {
+ private static final transient Log LOG =
LogFactory.getLog(JmsQueueEndpoint.class);
+
private int maximumBrowseSize = -1;
private final QueueBrowseStrategy queueBrowseStrategy;
public JmsQueueEndpoint(String uri, JmsComponent component, String
destination,
JmsConfiguration configuration) {
- this(uri, component, destination, configuration,
createQueueBrowseStrategy());
+ this(uri, component, destination, configuration, null);
}
public JmsQueueEndpoint(String uri, JmsComponent component, String
destination,
JmsConfiguration configuration, QueueBrowseStrategy
queueBrowseStrategy) {
super(uri, component, destination, false, configuration);
this.queueBrowseStrategy = queueBrowseStrategy;
+ if (queueBrowseStrategy == null) {
+ queueBrowseStrategy = createQueueBrowseStrategy();
+ }
}
public int getMaximumBrowseSize() {
@@ -56,8 +64,11 @@
}
public List<Exchange> getExchanges() {
+ if (queueBrowseStrategy == null) {
+ return Collections.EMPTY_LIST;
+ }
String queue = getDestination();
- JmsOperations template =
getConfiguration().createInOnlyTemplate(false, queue);
+ JmsOperations template = getConfiguration().createInOnlyTemplate(this,
false, queue);
return queueBrowseStrategy.browse(template, queue, this);
}
@@ -66,10 +77,11 @@
try {
answer = JmsComponent.tryCreateDefaultQueueBrowseStrategy();
} catch (Throwable e) {
- throw new IllegalArgumentException("Could not create a
QueueBrowseStrategy, maybe you are using spring 2.0.x? Cause: " + e, e);
+ LOG.debug("Caught exception trying to create default
QueueBrowseStrategy. " +
+ "This could be due to spring 2.0.x on classpath? Cause: "
+ e, e);
}
if (answer == null) {
- throw new IllegalArgumentException("Could not create a
QueueBrowseStrategy, maybe you are using spring 2.0.x?");
+ LOG.warn("Cannot browse queues as no QueueBrowseStrategy
specified. Are you using Spring 2.0.x by any chance? If you upgrade to 2.5.x or
later then queue browsing is supported");
}
return answer;
}
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.SessionCallback;
+
+/**
+ * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
+ * for working with a [EMAIL PROTECTED] TemporaryQueue}
+ *
+ * @version $Revision: 1.1 $
+ */
+// TODO need to be really careful to always use the same Connection otherwise
the destination goes stale
+public class JmsTemporaryQueueEndpoint extends JmsQueueEndpoint implements
DestinationEndpoint {
+ private Destination jmsDestination;
+
+ public JmsTemporaryQueueEndpoint(String uri, JmsComponent component,
String destination, JmsConfiguration configuration) {
+ super(uri, component, destination, configuration);
+ }
+
+ public JmsTemporaryQueueEndpoint(String uri, JmsComponent component,
String destination, JmsConfiguration configuration, QueueBrowseStrategy
queueBrowseStrategy) {
+ super(uri, component, destination, configuration, queueBrowseStrategy);
+ }
+
+ /**
+ * This endpoint is a singleton so that the temporary destination
instances are shared across all
+ * producers and consumers of the same endpoint URI
+ *
+ * @return true
+ */
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public synchronized Destination getJmsDestination(Session session) throws
JMSException {
+ if (jmsDestination == null) {
+ jmsDestination = createJmsDestination(session);
+ }
+ return jmsDestination;
+ }
+
+ protected Destination createJmsDestination(Session session) throws
JMSException {
+ return session.createTemporaryQueue();
+ }
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryQueueEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.SessionCallback;
+
+/**
+ * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
+ * for working with a [EMAIL PROTECTED] TemporaryTopic}
+ *
+ * @version $Revision: 1.1 $
+ */
+// TODO need to be really careful to always use the same Connection otherwise
the destination goes stale
+public class JmsTemporaryTopicEndpoint extends JmsEndpoint implements
DestinationEndpoint {
+ private Destination jmsDestination;
+
+ public JmsTemporaryTopicEndpoint(String uri, JmsComponent component,
String destination, JmsConfiguration configuration) {
+ super(uri, component, destination, true, configuration);
+ }
+
+ /**
+ * This endpoint is a singleton so that the temporary destination
instances are shared across all
+ * producers and consumers of the same endpoint URI
+ *
+ * @return true
+ */
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public synchronized Destination getJmsDestination(Session session) throws
JMSException {
+ if (jmsDestination == null) {
+ jmsDestination = createJmsDestination(session);
+ }
+ return jmsDestination;
+ }
+
+ protected Destination createJmsDestination(Session session) throws
JMSException {
+ return session.createTemporaryTopic();
+ }
+
+
+}
\ No newline at end of file
Propchange:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsTemporaryTopicEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsProviderTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsProviderTest.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsProviderTest.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsProviderTest.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.temp;
+
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.naming.Context;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.camel.component.jms.JmsProviderMetadata;
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class JmsProviderTest extends ContextTestSupport {
+ public void testTemporaryDestinationTypes() throws Exception {
+ JmsEndpoint endpoint = getMandatoryEndpoint("activemq:test.queue",
JmsEndpoint.class);
+ JmsConfiguration configuration = endpoint.getConfiguration();
+ JmsProviderMetadata providerMetadata =
configuration.getProviderMetadata();
+ assertNotNull("provider", providerMetadata);
+
+ Class<? extends TemporaryQueue> queueType =
endpoint.getTemporaryQueueType();
+ Class<? extends TemporaryTopic> topicType =
endpoint.getTemporaryTopicType();
+
+ log.info("Found queue type: " + queueType);
+ log.info("Found topic type: " + topicType);
+
+ assertNotNull("queueType", queueType);
+ assertNotNull("topicType", topicType);
+
+ assertEquals("queueType", ActiveMQTempQueue.class, queueType);
+ assertEquals("topicType", ActiveMQTempTopic.class, topicType);
+ }
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ Context context = super.createJndiContext();
+ context.bind("activemq",
ActiveMQComponent.activeMQComponent("vm://localhost"));
+ return context;
+ }
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/JmsProviderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryQueueRouteTest.java
(from r649546,
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryQueueRouteTest.java?p2=activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryQueueRouteTest.java&p1=activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java&r1=649546&r2=650171&rev=650171&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryQueueRouteTest.java
Mon Apr 21 08:35:35 2008
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.jms;
+package org.apache.camel.component.jms.temp;
import java.util.List;
@@ -22,61 +22,56 @@
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static
org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
-
+import org.apache.camel.component.jms.JmsQueueEndpoint;
+import org.apache.camel.component.jms.BrowsableQueueTest;
/**
* @version $Revision$
*/
-public class BrowsableQueueTest extends ContextTestSupport {
- private static final transient Log LOG =
LogFactory.getLog(BrowsableQueueTest.class);
+public class TemporaryQueueRouteTest extends ContextTestSupport {
+ private static final transient Log LOG =
LogFactory.getLog(TemporaryQueueRouteTest.class);
- protected MockEndpoint resultEndpoint;
- protected String componentName = "activemq";
- protected String startEndpointUri;
- protected int counter;
- protected Object[] expectedBodies = {"body1", "body2"};
-
- public void testSendMessagesThenBrowseQueue() throws Exception {
- // send some messages
- for (int i = 0; i < expectedBodies.length; i++) {
- Object expectedBody = expectedBodies[i];
- template.sendBodyAndHeader("activemq:test.b", expectedBody,
"counter", i);
- }
+ protected String endpointUri = "activemq:temp:queue:cheese";
+ protected Object expectedBody = "<hello>world!</hello>";
+ protected MyBean myBean = new MyBean();
- // now lets browse the queue
- JmsQueueEndpoint endpoint = getMandatoryEndpoint("activemq:test.b",
JmsQueueEndpoint.class);
- List<Exchange> list = endpoint.getExchanges();
- LOG.debug("Received: " + list);
- assertEquals("Size of list", 2, list.size());
- int index = -1;
- for (Exchange exchange : list) {
- String actual = exchange.getIn().getBody(String.class);
- LOG.debug("Received body: " + actual);
+ public void testSendMessage() throws Exception {
+ MockEndpoint endpoint = getMockEndpoint("mock:result");
+ endpoint.expectedBodiesReceived("Result");
- Object expected = expectedBodies[++index];
- assertEquals("Body: " + index, expected, actual);
- }
- }
+ template.sendBody(endpointUri, expectedBody);
- protected void sendExchange(final Object expectedBody) {
- template.sendBodyAndHeader(startEndpointUri, expectedBody, "counter",
++counter);
+ endpoint.assertIsSatisfied();
+
+ Message message = myBean.getMessage();
+ assertNotNull("should have received a message", message);
+
+ LOG.info("Received: " + message);
+ Object header = message.getHeader("JMSDestination");
+ isValidDestination(header);
}
+ protected void isValidDestination(Object header) {
+ ActiveMQTempQueue destination =
assertIsInstanceOf(ActiveMQTempQueue.class, header);
+ LOG.info("Received message has a temporary queue: " + destination);
+ }
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- camelContext.addComponent(componentName,
jmsComponentClientAcknowledge(connectionFactory));
+ camelContext.addComponent("activemq",
jmsComponentClientAcknowledge(connectionFactory));
return camelContext;
}
@@ -84,8 +79,22 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("activemq:test.a").to("activemq:test.b");
+ from(endpointUri).bean(myBean).to("mock:result");
}
};
+ }
+
+ public static class MyBean {
+ private Message message;
+
+ public String onMessage(Message message) {
+ this.message = message;
+ LOG.info("Invoked bean with: " + message);
+ return "Result";
+ }
+
+ public Message getMessage() {
+ return message;
+ }
}
}
Added:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryTopicRouteTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryTopicRouteTest.java?rev=650171&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryTopicRouteTest.java
(added)
+++
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryTopicRouteTest.java
Mon Apr 21 08:35:35 2008
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.temp;
+
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class TemporaryTopicRouteTest extends TemporaryQueueRouteTest {
+ private static final transient Log LOG =
LogFactory.getLog(TemporaryQueueRouteTest.class);
+
+ @Override
+ protected void setUp() throws Exception {
+ endpointUri = "activemq:temp:topic:cheese";
+ super.setUp();
+ }
+
+ @Override
+ protected void isValidDestination(Object header) {
+ ActiveMQTempTopic destination =
assertIsInstanceOf(ActiveMQTempTopic.class, header);
+ LOG.info("Received message has a temporary topic: " + destination);
+ }
+}
Propchange:
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/temp/TemporaryTopicRouteTest.java
------------------------------------------------------------------------------
svn:eol-style = native