Author: ritchiem
Date: Mon Dec 7 16:47:53 2009
New Revision: 887994
URL: http://svn.apache.org/viewvc?rev=887994&view=rev
Log:
QPID-2242 : Update to the 0-8/9 code path to use the 0-10 static lookup tables
for the Destination type when JMS_QPID_DESTTYPE has not been set.
Added:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
qpid/trunk/qpid/java/test-profiles/08Excludes
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
Mon Dec 7 16:47:53 2009
@@ -22,7 +22,6 @@
package org.apache.qpid.client.message;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import javax.jms.Destination;
import javax.jms.JMSException;
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
Mon Dec 7 16:47:53 2009
@@ -41,12 +41,13 @@
import javax.jms.MessageFormatException;
import javax.jms.DeliveryMode;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import org.apache.qpid.exchange.ExchangeDefaults;
-public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
+/**
+ * This extends AbstractAMQMessageDelegate which contains common code between
+ * both the 0_8 and 0_10 Message types.
+ *
+ */
+public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
private static final Map<ReplyTo, Destination> _destinationCache =
Collections.synchronizedMap(new ReferenceMap());
@@ -64,27 +65,6 @@
private AMQSession _session;
private final long _deliveryTag;
- private static Map<AMQShortString,Integer> _exchangeTypeMap = new
ConcurrentHashMap<AMQShortString, Integer>();
- private static Map<String,Integer> _exchangeTypeStringMap = new
ConcurrentHashMap<String, Integer>();
- private static Map<String, Integer> _exchangeTypeToDestinationType = new
ConcurrentHashMap<String, Integer>();;
-
- static
- {
- _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put(AMQShortString.EMPTY_STRING,
AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME,
AMQDestination.TOPIC_TYPE);
- _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME,
AMQDestination.TOPIC_TYPE);
-
-
_exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(),
AMQDestination.QUEUE_TYPE);
- _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
-
_exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
-
_exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
-
-
-
_exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(),
AMQDestination.QUEUE_TYPE);
-
_exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
-
_exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
- }
protected AMQMessageDelegate_0_10()
{
@@ -92,80 +72,49 @@
_readableProperties = false;
}
- private AMQDestination generateDestination(AMQShortString exchange,
AMQShortString routingKey)
- {
- AMQDestination dest;
- switch(getExchangeType(exchange))
- {
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
-
- }
-
- return dest;
- }
-
- private int getExchangeType(AMQShortString exchange)
+ protected AMQMessageDelegate_0_10(MessageProperties messageProps,
DeliveryProperties deliveryProps, long deliveryTag)
{
- Integer type = _exchangeTypeMap.get(exchange == null ?
AMQShortString.EMPTY_STRING : exchange);
-
- if(type == null)
- {
- return AMQDestination.UNKNOWN_TYPE;
- }
+ _messageProps = messageProps;
+ _deliveryProps = deliveryProps;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_messageProps != null);
+ AMQDestination dest;
- return type;
+ dest = generateDestination(new
AMQShortString(_deliveryProps.getExchange()),
+ new
AMQShortString(_deliveryProps.getRoutingKey()));
+ setJMSDestination(dest);
}
-
+ /**
+ * Use the 0-10 ExchangeQuery call to validate the exchange type.
+ *
+ * This is used primarily to provide the correct JMSDestination value.
+ *
+ * The query is performed synchronously iff the map exchange is not already
+ * present in the exchange Map.
+ *
+ * @param header The message headers, from which the exchange name can be
extracted
+ * @param session The 0-10 session to use to call ExchangeQuery
+ */
public static void updateExchangeTypeMapping(Header header,
org.apache.qpid.transport.Session session)
{
DeliveryProperties deliveryProps =
header.get(DeliveryProperties.class);
- if(deliveryProps != null)
+ if (deliveryProps != null)
{
String exchange = deliveryProps.getExchange();
- if(exchange != null &&
!_exchangeTypeStringMap.containsKey(exchange))
+ if (exchange != null && !exchangeMapContains(exchange))
{
-
- AMQShortString exchangeShortString = new
AMQShortString(exchange);
Future<ExchangeQueryResult> future =
- session.exchangeQuery(exchange.toString());
+ session.exchangeQuery(exchange.toString());
ExchangeQueryResult res = future.get();
- Integer type =
_exchangeTypeToDestinationType.get(res.getType());
- if(type == null)
- {
- type = AMQDestination.UNKNOWN_TYPE;
- }
- _exchangeTypeStringMap.put(exchange, type);
- _exchangeTypeMap.put(exchangeShortString, type);
-
+ updateExchangeType(exchange, res.getType());
}
}
}
- protected AMQMessageDelegate_0_10(MessageProperties messageProps,
DeliveryProperties deliveryProps, long deliveryTag)
- {
- _messageProps = messageProps;
- _deliveryProps = deliveryProps;
- _deliveryTag = deliveryTag;
- _readableProperties = (_messageProps != null);
-
- AMQDestination dest;
-
- dest = generateDestination(new
AMQShortString(_deliveryProps.getExchange()),
- new
AMQShortString(_deliveryProps.getRoutingKey()));
- setJMSDestination(dest);
- }
-
public String getJMSMessageID() throws JMSException
{
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
Mon Dec 7 16:47:53 2009
@@ -46,7 +46,7 @@
import java.util.UUID;
import java.net.URISyntaxException;
-public class AMQMessageDelegate_0_8 implements AMQMessageDelegate
+public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
private static final Map _destinationCache =
Collections.synchronizedMap(new ReferenceMap());
@@ -65,6 +65,16 @@
private AMQSession _session;
private final long _deliveryTag;
+ // The base set of items that needs to be set.
+ private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties,
long deliveryTag)
+ {
+ _contentHeaderProperties = properties;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_contentHeaderProperties != null);
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)
_contentHeaderProperties).getHeaders());
+ }
+
+ // Used for the creation of new messages
protected AMQMessageDelegate_0_8()
{
this(new BasicContentHeaderProperties(), -1);
@@ -73,6 +83,7 @@
}
+ // Used when generating a received message object
protected AMQMessageDelegate_0_8(long deliveryTag,
BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey)
{
@@ -80,41 +91,33 @@
Integer type =
contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- if(type == null)
+ AMQDestination dest = null;
+
+ // If we have a type set the attempt to use that.
+ if (type != null)
{
- type = AMQDestination.UNKNOWN_TYPE;
+ switch (type.intValue())
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ // Use the generateDestination method
+ dest = null;
+ }
}
- AMQDestination dest;
-
- switch(type.intValue())
+ if (dest == null)
{
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ dest = generateDestination(exchange, routingKey);
}
-
-
- // Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
-
-
-
}
- protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties,
long deliveryTag)
- {
- _contentHeaderProperties = properties;
- _deliveryTag = deliveryTag;
- _readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)
_contentHeaderProperties).getHeaders());
- }
public String getJMSMessageID() throws JMSException
Added:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=887994&view=auto
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
(added)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
Mon Dec 7 16:47:53 2009
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This abstract class provides exchange lookup functionality that is shared
+ * between all MessageDelegates. Update facilities are provided so that the
0-10
+ * code base can update the mappings. The 0-8 code base does not have the
+ * facility to update the exchange map so it can only use the default mappings.
+ *
+ * That said any updates that a 0-10 client performs will also benefit any 0-8
+ * connections in this VM.
+ *
+ */
+public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate
+{
+
+ private static Map<AMQShortString, Integer> _exchangeTypeMap = new
ConcurrentHashMap<AMQShortString, Integer>();
+ private static Map<String, Integer> _exchangeTypeStringMap = new
ConcurrentHashMap<String, Integer>();
+ private static Map<String, Integer> _exchangeTypeToDestinationType = new
ConcurrentHashMap<String, Integer>();
+
+ /**
+ * Add default Mappings for the Direct, Default, Topic and Fanout
exchanges.
+ */
+ static
+ {
+ _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME,
AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put(AMQShortString.EMPTY_STRING,
AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME,
AMQDestination.TOPIC_TYPE);
+ _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME,
AMQDestination.TOPIC_TYPE);
+
+
_exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(),
AMQDestination.QUEUE_TYPE);
+ _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
+
_exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
+
_exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
+
+
_exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(),
AMQDestination.QUEUE_TYPE);
+
_exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
+
_exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(),
AMQDestination.TOPIC_TYPE);
+ }
+
+ /**
+ * Called when a Destination is requried.
+ *
+ * This will create the AMQDestination that is the correct type and value
+ * based on the incomming values.
+ * @param exchange The exchange name
+ * @param routingKey The routing key to be used for the Destination
+ * @return AMQDestination of the correct subtype
+ */
+ protected AMQDestination generateDestination(AMQShortString exchange,
AMQShortString routingKey)
+ {
+ AMQDestination dest;
+ switch (getExchangeType(exchange))
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ }
+
+ return dest;
+ }
+
+ /**
+ * Update the exchange name to type mapping.
+ *
+ * If the newType is not known then an UNKNOWN_TYPE is created. Only if the
+ * exchange is of a known type: amq.direct, amq.topic, amq.fanout can we
+ * create a suitable AMQDestination representation
+ *
+ * @param exchange the name of the exchange
+ * @param newtype the AMQP exchange class name i.e. amq.direct
+ */
+ protected static void updateExchangeType(String exchange, String newtype)
+ {
+ Integer type = _exchangeTypeToDestinationType.get(newtype);
+ if (type == null)
+ {
+ type = AMQDestination.UNKNOWN_TYPE;
+ }
+ _exchangeTypeStringMap.put(exchange, type);
+ _exchangeTypeMap.put(new AMQShortString(exchange), type);
+ }
+
+ /**
+ * Accessor method to allow lookups of the given exchange name.
+ *
+ * This check allows the prevention of extra work required such as asking
+ * the broker for the exchange class name.
+ *
+ * @param exchange the exchange name to lookup
+ * @return true if there is a mapping for this exchange
+ */
+ protected static boolean exchangeMapContains(String exchange)
+ {
+ return _exchangeTypeStringMap.containsKey(exchange);
+ }
+
+ /**
+ * Returns an int representing the exchange type. This is used in the
+ * createDestination method to ensure the correct AMQDestiation is
created.
+ *
+ * @param exchange the exchange name to lookup
+ * @return int representing the Exchange type
+ */
+ private int getExchangeType(AMQShortString exchange)
+ {
+ Integer type = _exchangeTypeMap.get(exchange == null ?
AMQShortString.EMPTY_STRING : exchange);
+
+ if (type == null)
+ {
+ return AMQDestination.UNKNOWN_TYPE;
+ }
+
+ return type;
+ }
+
+}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
Mon Dec 7 16:47:53 2009
@@ -57,6 +57,9 @@
public void setUp() throws Exception
{
+ //Ensure JMX management is enabled for MovedToQueue test
+ setConfigurationProperty("management.enabled", "true");
+
super.setUp();
_connection = getConnection();
Modified: qpid/trunk/qpid/java/test-profiles/08Excludes
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/08Excludes?rev=887994&r1=887993&r2=887994&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/08Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/08Excludes Mon Dec 7 16:47:53 2009
@@ -18,5 +18,3 @@
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
-// QPID-2242 exclude till issue has been resolved
-org.apache.qpid.test.client.message.JMSDestinationTest#*
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]