Author: robbie
Date: Mon Feb 13 00:56:23 2012
New Revision: 1243384
URL: http://svn.apache.org/viewvc?rev=1243384&view=rev
Log:
QPID-3831: use AcquireMode=PRE_ACQUIRED when using server-side selectors and
consuming from Queues. Remove unused method parameter for selector filter.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1243384&r1=1243383&r2=1243384&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Feb 13 00:56:23 2012
@@ -47,7 +47,6 @@ import org.apache.qpid.client.message.Un
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -2581,7 +2580,7 @@ public abstract class AMQSession<C exten
* @param queueName
*/
private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean
nowait, MessageFilter messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean
nowait) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2598,7 +2597,7 @@ public abstract class AMQSession<C exten
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait,
messageSelector, tagId);
+ sendConsume(consumer, queueName, protocolHandler, nowait, tagId);
}
catch (AMQException e)
{
@@ -2609,7 +2608,7 @@ public abstract class AMQSession<C exten
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler,
boolean nowait, MessageFilter messageSelector, int tag) throws AMQException,
FailoverException;
+ AMQProtocolHandler protocolHandler,
boolean nowait, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final boolean
mandatory, final boolean immediate)
throws JMSException
@@ -2954,7 +2953,7 @@ public abstract class AMQSession<C exten
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait,
consumer.getMessageSelectorFilter());
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait);
}
catch (FailoverException e)
{
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1243384&r1=1243383&r2=1243384&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Feb 13 00:56:23 2012
@@ -599,7 +599,7 @@ public class AMQSession_0_10 extends AMQ
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString
queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, MessageFilter messageSelector, int
tag)
+ boolean nowait, int tag)
throws AMQException, FailoverException
{
boolean preAcquire = consumer.isPreAcquire();
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1243384&r1=1243383&r2=1243384&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Mon Feb 13 00:56:23 2012
@@ -364,7 +364,6 @@ public class AMQSession_0_8 extends AMQS
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
- MessageFilter messageSelector,
int tag) throws AMQException,
FailoverException
{
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1243384&r1=1243383&r2=1243384&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Feb 13 00:56:23 2012
@@ -91,11 +91,10 @@ public class BasicMessageConsumer_0_10 e
rawSelector, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- _preAcquire = evaluatePreAcquire(browseOnly, destination);
-
- _capacity = evaluateCapacity(destination);
_serverJmsSelectorSupport =
connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ _preAcquire = evaluatePreAcquire(browseOnly, destination,
_serverJmsSelectorSupport);
+ _capacity = evaluateCapacity(destination);
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE ==
destination.getAddressType())
{
@@ -222,7 +221,7 @@ public class BasicMessageConsumer_0_10 e
boolean messageOk = true;
try
{
- if (getMessageSelectorFilter() != null &&
!_serverJmsSelectorSupport)
+ if (!_serverJmsSelectorSupport && getMessageSelectorFilter() !=
null)
{
messageOk = getMessageSelectorFilter().matches(message);
}
@@ -525,7 +524,7 @@ public class BasicMessageConsumer_0_10 e
return _preAcquire;
}
- private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination
destination)
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination
destination, boolean serverJmsSelectorSupport)
{
boolean preAcquire;
if (browseOnly)
@@ -535,7 +534,7 @@ public class BasicMessageConsumer_0_10 e
else
{
boolean isQueue = (destination instanceof AMQQueue ||
getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
- if (isQueue && getMessageSelectorFilter() != null)
+ if (!serverJmsSelectorSupport && isQueue &&
getMessageSelectorFilter() != null)
{
preAcquire = false;
}
Modified:
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1243384&r1=1243383&r2=1243384&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
(original)
+++
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
Mon Feb 13 00:56:23 2012
@@ -276,7 +276,7 @@ public class AMQSession_0_10Test extends
{
BasicMessageConsumer_0_10 consumer =
session.createMessageConsumer(createDestination(), 1, 1, true, false,
null, null, false, true);
- session.sendConsume(consumer, new AMQShortString("test"), null,
true, null, 1);
+ session.sendConsume(consumer, new AMQShortString("test"), null,
true, 1);
}
catch (Exception e)
{
Modified:
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1243384&r1=1243383&r2=1243384&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
Mon Feb 13 00:56:23 2012
@@ -29,7 +29,6 @@ import org.apache.qpid.client.BasicMessa
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -125,7 +124,7 @@ public class TestAMQSession extends AMQS
return false;
}
- public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString
queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter
messageSelector, int tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString
queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws
AMQException, FailoverException
{
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]