Author: robbie
Date: Thu Mar 18 16:23:56 2010
New Revision: 924879
URL: http://svn.apache.org/viewvc?rev=924879&view=rev
Log:
QPID-2379: add Binding.msgMatched() support to the HeadersExchange
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=924879&r1=924878&r2=924879&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
Thu Mar 18 16:23:56 2010
@@ -37,7 +37,7 @@ public class Binding
private final UUID _id;
private final AtomicLong _matches = new AtomicLong();
- Binding(UUID id, final String bindingKey, final AMQQueue queue, final
Exchange exchange, final Map<String, Object> arguments)
+ public Binding(UUID id, final String bindingKey, final AMQQueue queue,
final Exchange exchange, final Map<String, Object> arguments)
{
_id = id;
_bindingKey = bindingKey;
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=924879&r1=924878&r2=924879&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
Thu Mar 18 16:23:56 2010
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
/**
@@ -38,69 +39,35 @@ class HeadersBinding
private static final Logger _logger =
Logger.getLogger(HeadersBinding.class);
private final FieldTable _mappings;
+ private final Binding _binding;
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
- private final class MatchesOrProcessor implements
FieldTable.FieldTableElementProcessor
- {
- private Boolean _result = Boolean.FALSE;
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- if((value != null) && (value.getValue() != null) &&
value.getValue().equals(matches.get(propertyName)))
- {
- _result = Boolean.TRUE;
- return false;
- }
- return true;
- }
-
- public Object getResult()
- {
- return _result;
- }
- }
-
- private final class RequiredOrProcessor implements
FieldTable.FieldTableElementProcessor
- {
- Boolean _result = Boolean.FALSE;
-
- public boolean processElement(String propertyName, AMQTypedValue value)
- {
- if(required.contains(propertyName))
- {
- _result = Boolean.TRUE;
- return false;
- }
- return true;
- }
-
- public Object getResult()
- {
- return _result;
- }
- }
-
-
-
/**
- * Creates a binding for a set of mappings. Those mappings whose value is
+ * Creates a header binding for a set of mappings. Those mappings whose
value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
* define a required match of value.
- * @param mappings the defined mappings this binding should use
+ *
+ * @param binding the binding to create a header binding using
*/
-
- HeadersBinding(FieldTable mappings)
+ public HeadersBinding(Binding binding)
{
- _mappings = mappings;
- initMappings();
+ _binding = binding;
+ if(_binding !=null)
+ {
+ _mappings =
FieldTable.convertToFieldTable(_binding.getArguments());
+ initMappings();
+ }
+ else
+ {
+ _mappings = null;
+ }
}
-
+
private void initMappings()
{
-
_mappings.processOverElements(new
FieldTable.FieldTableElementProcessor()
{
@@ -133,6 +100,11 @@ class HeadersBinding
{
return _mappings;
}
+
+ public Binding getBinding()
+ {
+ return _binding;
+ }
/**
* Checks whether the supplied headers match the requirements of this
binding
@@ -250,4 +222,39 @@ class HeadersBinding
{
return key.startsWith("X-") || key.startsWith("x-");
}
-}
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+
+ if (o == null)
+ {
+ return false;
+ }
+
+ if (!(o instanceof HeadersBinding))
+ {
+ return false;
+ }
+
+ final HeadersBinding hb = (HeadersBinding) o;
+
+ if(_binding == null)
+ {
+ if(hb.getBinding() != null)
+ {
+ return false;
+ }
+ }
+ else if (!_binding.equals(hb.getBinding()))
+ {
+ return false;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=924879&r1=924878&r2=924879&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
Thu Mar 18 16:23:56 2010
@@ -24,8 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -36,10 +34,11 @@ import org.apache.qpid.server.binding.Bi
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.List;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
/**
* An exchange that binds queues based on a set of required headers and header
values
@@ -72,7 +71,14 @@ public class HeadersExchange extends Abs
{
private static final Logger _logger =
Logger.getLogger(HeadersExchange.class);
+
+ private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>
_bindingsByKey =
+ new ConcurrentHashMap<String,
CopyOnWriteArraySet<Binding>>();
+
+ private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers =
+ new CopyOnWriteArrayList<HeadersBinding>();
+
public static final ExchangeType<HeadersExchange> TYPE = new
ExchangeType<HeadersExchange>()
{
@@ -102,34 +108,12 @@ public class HeadersExchange extends Abs
}
};
-
- private final List<Registration> _bindings = new
CopyOnWriteArrayList<Registration>();
- private Map<AMQShortString, Registration> _bindingByKey = new
ConcurrentHashMap<AMQShortString, Registration>();
-
-
public HeadersExchange()
{
super(TYPE);
}
+
- public void registerQueue(String routingKey, AMQQueue queue,
Map<String,Object> args)
- {
- registerQueue(new AMQShortString(routingKey), queue,
FieldTable.convertToFieldTable(args));
- }
-
- public void registerQueue(AMQShortString routingKey, AMQQueue queue,
FieldTable args)
- {
- _logger.debug("Exchange " + getNameShortString() + ": Binding " +
queue.getNameShortString() + " with " + args);
-
- Registration registration = new Registration(new HeadersBinding(args),
queue, routingKey);
- _bindings.add(registration);
-
- }
-
- public void deregisterQueue(String routingKey, AMQQueue queue,
Map<String,Object> args)
- {
- _bindings.remove(new Registration(args == null ? null : new
HeadersBinding(FieldTable.convertToFieldTable(args)), queue, new
AMQShortString(routingKey)));
- }
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -138,24 +122,27 @@ public class HeadersExchange extends Abs
{
_logger.debug("Exchange " + getNameShortString() + ": routing
message with headers " + header);
}
- boolean routed = false;
- ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
- for (Registration e : _bindings)
+
+ LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>();
+
+ for (HeadersBinding hb : _bindingHeaderMatchers)
{
-
- if (e.binding.matches(header))
+ if (hb.matches(header))
{
+ Binding b = hb.getBinding();
+
+ b.incrementMatches();
+
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getNameShortString() + ":
delivering message with headers " +
- header + " to " +
e.queue.getNameShortString());
+ header + " to " +
b.getQueue().getNameShortString());
}
- queues.add(e.queue);
-
- routed = true;
+ queues.add(b.getQueue());
}
}
- return queues;
+
+ return new ArrayList<BaseQueue>(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments,
AMQQueue queue)
@@ -166,38 +153,49 @@ public class HeadersExchange extends Abs
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- return isBound(queue);
+ String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+
+ if(bindings != null)
+ {
+ for(Binding binding : bindings)
+ {
+ if(binding.getQueue().equals(queue))
+ {
+ return true;
+ }
+ }
+ }
+
+ return false;
}
public boolean isBound(AMQShortString routingKey)
{
- return hasBindings();
+ String bindingKey = (routingKey == null) ? "" : routingKey.toString();
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+ return bindings != null && !bindings.isEmpty();
}
public boolean isBound(AMQQueue queue)
{
- for (Registration r : _bindings)
+ for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
{
- if (r.queue.equals(queue))
+ for(Binding binding : bindings)
{
- return true;
+ if(binding.getQueue().equals(queue))
+ {
+ return true;
+ }
}
}
+
return false;
}
public boolean hasBindings()
{
- return !_bindings.isEmpty();
- }
-
-
-
- protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
- {
- //what if the content type is not 'basic'? 'file' and 'stream' content
classes also define headers,
- //but these are not yet implemented.
- return ((BasicContentHeaderProperties)
contentHeaderFrame.properties).getHeaders();
+ return !getBindings().isEmpty();
}
protected AbstractExchangeMBean createMBean() throws JMException
@@ -210,59 +208,51 @@ public class HeadersExchange extends Abs
return _logger;
}
-
- static class Registration
+ protected void onBind(final Binding binding)
{
- private final HeadersBinding binding;
- private final AMQQueue queue;
- private final AMQShortString routingKey;
+ String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getQueue();
+ AMQShortString routingKey = AMQShortString.valueOf(bindingKey);
+ Map<String,Object> args = binding.getArguments();
- Registration(HeadersBinding binding, AMQQueue queue, AMQShortString
routingKey)
- {
- this.binding = binding;
- this.queue = queue;
- this.routingKey = routingKey;
- }
-
- public int hashCode()
- {
- int queueHash = queue.hashCode();
- int routingHash = routingKey == null ? 0 : routingKey.hashCode();
- return queueHash + routingHash;
- }
+ assert queue != null;
+ assert routingKey != null;
- public boolean equals(Object o)
- {
- return o instanceof Registration
- && ((Registration) o).queue.equals(queue)
- && (routingKey == null ? ((Registration)o).routingKey ==
null
- :
routingKey.equals(((Registration)o).routingKey));
- }
+ CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
- public HeadersBinding getBinding()
+ if(bindings == null)
{
- return binding;
+ bindings = new CopyOnWriteArraySet<Binding>();
+ CopyOnWriteArraySet<Binding> newBindings;
+ if((newBindings = _bindingsByKey.putIfAbsent(bindingKey,
bindings)) != null)
+ {
+ bindings = newBindings;
+ }
}
-
- public AMQQueue getQueue()
+
+ if(_logger.isDebugEnabled())
{
- return queue;
+ _logger.debug("Exchange " + getNameShortString() + ": Binding " +
queue.getNameShortString() +
+ " with binding key '" +bindingKey + "' and args: " +
args);
}
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- }
+ _bindingHeaderMatchers.add(new HeadersBinding(binding));
+ bindings.add(binding);
- protected void onBind(final Binding binding)
- {
- registerQueue(binding.getBindingKey(), binding.getQueue(),
binding.getArguments());
}
protected void onUnbind(final Binding binding)
{
- deregisterQueue(binding.getBindingKey(), binding.getQueue(),
binding.getArguments());
+ assert binding != null;
+
+ CopyOnWriteArraySet<Binding> bindings =
_bindingsByKey.get(binding.getBindingKey());
+ if(bindings != null)
+ {
+ bindings.remove(binding);
+ }
+
+ _logger.debug("===============");
+ _logger.debug("Removing Binding: " + _bindingHeaderMatchers.remove(new
HeadersBinding(binding)));
}
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=924879&r1=924878&r2=924879&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Thu Mar 18 16:23:56 2010
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -53,8 +54,10 @@ import org.apache.qpid.server.subscripti
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -92,31 +95,31 @@ public class AbstractHeadersExchangeTest
protected TestQueue bindDefault(String... bindings) throws AMQException
{
- return bind("Queue" + (++count), bindings);
- }
+ String queueName = "Queue" + (++count);
- protected TestQueue bind(String queueName, String... bindings) throws
AMQException
- {
- return bind(queueName, getHeaders(bindings));
+ return bind(queueName, queueName, getHeadersMap(bindings));
}
-
- protected TestQueue bind(String queue, FieldTable bindings) throws
AMQException
+
+ protected void unbind(TestQueue queue, String... bindings) throws
AMQException
{
- return bind(new TestQueue(new AMQShortString(queue)), bindings);
+ String queueName = queue.getName();
+ //TODO - check this
+ exchange.onUnbind(new Binding(null,queueName, queue, exchange,
getHeadersMap(bindings)));
}
-
- protected TestQueue bind(TestQueue queue, String... bindings) throws
AMQException
+
+ protected int getCount()
{
- return bind(queue, getHeaders(bindings));
+ return count;
}
- protected TestQueue bind(TestQueue queue, FieldTable bindings) throws
AMQException
+ private TestQueue bind(String key, String queueName, Map<String,Object>
args) throws AMQException
{
+ TestQueue queue = new TestQueue(new AMQShortString(queueName));
queues.add(queue);
- exchange.registerQueue(null, queue, bindings);
+ exchange.onBind(new Binding(null,key, queue, exchange, args));
return queue;
}
-
+
protected int route(Message m) throws AMQException
{
@@ -171,6 +174,23 @@ public class AbstractHeadersExchangeTest
}
}
+
+ static Map<String,Object> getHeadersMap(String... entries)
+ {
+ if(entries == null)
+ {
+ return null;
+ }
+
+ Map<String,Object> headers = new HashMap<String,Object>();
+
+ for (String s : entries)
+ {
+ String[] parts = s.split("=", 2);
+ headers.put(parts[0], parts.length > 1 ? parts[1] : "");
+ }
+ return headers;
+ }
static FieldTable getHeaders(String... entries)
{
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=924879&r1=924878&r2=924879&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
Thu Mar 18 16:23:56 2010
@@ -26,7 +26,9 @@ import java.util.Set;
import junit.framework.TestCase;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.queue.MockAMQQueue;
/**
*/
@@ -119,166 +121,193 @@ public class HeadersBindingTest extends
}
}
- private FieldTable bindHeaders = new FieldTable();
+ private Map<String,Object> bindHeaders = new HashMap<String,Object>();
private MockHeader matchHeaders = new MockHeader();
+ private int _count = 0;
+ private MockAMQQueue _queue;
+
+ protected void setUp()
+ {
+ _count++;
+ _queue = new MockAMQQueue(getQueueName());
+ }
+
+ protected String getQueueName()
+ {
+ return "Queue" + _count;
+ }
public void testDefault_1()
{
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testDefault_2()
{
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testDefault_3()
{
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Altered value of A");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_1()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_2()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_3()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_4()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
matchHeaders.setString("C", "Value of C");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAll_5()
{
- bindHeaders.setString("X-match", "all");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "all");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_1()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_2()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_3()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_4()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Value of B");
matchHeaders.setString("C", "Value of C");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_5()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Value of A");
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- assertTrue(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertTrue(new HeadersBinding(b).matches(matchHeaders));
}
public void testAny_6()
{
- bindHeaders.setString("X-match", "any");
- bindHeaders.setString("A", "Value of A");
- bindHeaders.setString("B", "Value of B");
+ bindHeaders.put("X-match", "any");
+ bindHeaders.put("A", "Value of A");
+ bindHeaders.put("B", "Value of B");
matchHeaders.setString("A", "Altered value of A");
matchHeaders.setString("B", "Altered value of B");
matchHeaders.setString("C", "Value of C");
- assertFalse(new HeadersBinding(bindHeaders).matches(matchHeaders));
+ Binding b = new Binding(null, getQueueName(), _queue, null,
bindHeaders);
+ assertFalse(new HeadersBinding(b).matches(matchHeaders));
}
public static junit.framework.Test suite()
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=924879&r1=924878&r2=924879&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Thu Mar 18 16:23:56 2010
@@ -106,6 +106,22 @@ public class HeadersExchangeTest extends
pb2.setMandatory(true);
routeAndTest(m1,true);
}
+
+ public void testOnUnbind() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ TestQueue q2 = bindDefault("F0000=Aardvark");
+ TestQueue q3 = bindDefault("F0001");
+
+ routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1);
+ routeAndTest(new Message(_protocolSession, "Message2",
"F0000=Aardvark"), q1, q2);
+ routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3);
+
+ unbind(q1,"F0000");
+ routeAndTest(new Message(_protocolSession, "Message4", "F0000"));
+ routeAndTest(new Message(_protocolSession, "Message5",
"F0000=Aardvark"), q2);
+ }
+
public static junit.framework.Test suite()
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]