Author: rgodfrey
Date: Sat Nov 12 18:05:45 2016
New Revision: 1769396
URL: http://svn.apache.org/viewvc?rev=1769396&view=rev
Log:
QPID-7509 : Remove unnecessary RecordDeliveryMethod interface
Removed:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1769396&r1=1769395&r2=1769396&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Sat Nov 12 18:05:45 2016
@@ -305,16 +305,6 @@ public class AMQChannel
final GetDeliveryMethod getDeliveryMethod =
new GetDeliveryMethod(singleMessageCredit, queue);
- final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
- {
-
- public void recordMessageDelivery(final ConsumerImpl sub,
- final MessageInstance entry,
- final long deliveryTag)
- {
- addUnacknowledgedMessage(entry, deliveryTag, null);
- }
- };
ConsumerTarget_0_8 target;
EnumSet<ConsumerImpl.Option> options =
EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
@@ -324,13 +314,13 @@ public class AMQChannel
target = ConsumerTarget_0_8.createAckTarget(this,
AMQShortString.EMPTY_STRING, null,
- singleMessageCredit,
getDeliveryMethod, getRecordMethod);
+ singleMessageCredit,
getDeliveryMethod);
}
else
{
target = ConsumerTarget_0_8.createGetNoAckTarget(this,
AMQShortString.EMPTY_STRING, null,
-
singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
singleMessageCredit, getDeliveryMethod);
}
ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class,
"", options, null);
@@ -1151,27 +1141,6 @@ public class AMQChannel
/**
- * Acknowledge one or more messages.
- *
- * @param deliveryTag the last delivery tag
- * @param multiple if true will acknowledge all messages up to an
including the delivery tag. if false only
- * acknowledges the single message specified by the
delivery tag
- *
- */
- private void acknowledgeMessage(long deliveryTag, boolean multiple)
- {
- Collection<MessageInstance> ackedMessages =
getAckedMessages(deliveryTag, multiple);
- _transaction.dequeue(ackedMessages, new
MessageAcknowledgeAction(ackedMessages));
- }
-
- private Collection<MessageInstance> getAckedMessages(long deliveryTag,
boolean multiple)
- {
-
- return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
-
- }
-
- /**
* Used only for testing purposes.
*
* @return the map of unacknowledged messages
@@ -1386,20 +1355,6 @@ public class AMQChannel
return _clientDeliveryMethod;
}
- private final RecordDeliveryMethod _recordDeliveryMethod = new
RecordDeliveryMethod()
- {
-
- public void recordMessageDelivery(final ConsumerImpl sub, final
MessageInstance entry, final long deliveryTag)
- {
- addUnacknowledgedMessage(entry, deliveryTag, sub);
- }
- };
-
- public RecordDeliveryMethod getRecordDeliveryMethod()
- {
- return _recordDeliveryMethod;
- }
-
private AMQMessage createAMQMessage(StoredMessage<MessageMetaData> handle)
{
@@ -2063,7 +2018,8 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag:
" + deliveryTag + " multiple: " + multiple + " ]");
}
- acknowledgeMessage(deliveryTag, multiple);
+ Collection<MessageInstance> ackedMessages =
_unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
+ _transaction.dequeue(ackedMessages, new
MessageAcknowledgeAction(ackedMessages));
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769396&r1=1769395&r2=1769396&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Sat Nov 12 18:05:45 2016
@@ -35,7 +35,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -50,7 +49,6 @@ public abstract class ConsumerTarget_0_8
{
private final ClientDeliveryMethod _deliveryMethod;
- private final RecordDeliveryMethod _recordMethod;
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
@@ -62,7 +60,7 @@ public abstract class ConsumerTarget_0_8
AMQShortString
consumerTag, FieldTable filters,
FlowCreditManager
creditManager, final boolean multiQueue)
{
- return new BrowserConsumer(channel, consumerTag, filters,
creditManager, channel.getClientDeliveryMethod(),
channel.getRecordDeliveryMethod(),
+ return new BrowserConsumer(channel, consumerTag, filters,
creditManager, channel.getClientDeliveryMethod(),
multiQueue);
}
@@ -70,10 +68,9 @@ public abstract class ConsumerTarget_0_8
final AMQShortString
consumerTag,
final FieldTable
filters,
final
FlowCreditManager creditManager,
- final
ClientDeliveryMethod deliveryMethod,
- final
RecordDeliveryMethod recordMethod)
+ final
ClientDeliveryMethod deliveryMethod)
{
- return new GetNoAckConsumer(channel, consumerTag, filters,
creditManager, deliveryMethod, recordMethod);
+ return new GetNoAckConsumer(channel, consumerTag, filters,
creditManager, deliveryMethod);
}
static final class BrowserConsumer extends ConsumerTarget_0_8
@@ -83,11 +80,10 @@ public abstract class ConsumerTarget_0_8
FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
boolean multiQueue)
{
super(channel, consumerTag,
- filters, creditManager, deliveryMethod, recordMethod,
multiQueue);
+ filters, creditManager, deliveryMethod, multiQueue);
}
/**
@@ -122,7 +118,7 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager
creditManager,
boolean multiQueue)
{
- return new NoAckConsumer(channel, consumerTag, filters, creditManager,
channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(),
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager,
channel.getClientDeliveryMethod(),
multiQueue);
}
@@ -135,10 +131,9 @@ public abstract class ConsumerTarget_0_8
FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
boolean multiQueue)
{
- super(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod, multiQueue);
+ super(channel, consumerTag, filters, creditManager,
deliveryMethod, multiQueue);
_txn = new
AutoCommitTransaction(channel.getAddressSpace().getMessageStore());
}
@@ -205,10 +200,9 @@ public abstract class ConsumerTarget_0_8
public GetNoAckConsumer(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ ClientDeliveryMethod deliveryMethod)
{
- super(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod, false);
+ super(channel, consumerTag, filters, creditManager,
deliveryMethod, false);
}
}
@@ -224,18 +218,16 @@ public abstract class ConsumerTarget_0_8
consumerTag,
filters, creditManager,
channel.getClientDeliveryMethod(),
- channel.getRecordDeliveryMethod(),
multiQueue);
}
public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
- AMQShortString
consumerTag, FieldTable filters,
- FlowCreditManager
creditManager,
- ClientDeliveryMethod
deliveryMethod,
- RecordDeliveryMethod
recordMethod)
+ AMQShortString
consumerTag, FieldTable filters,
+ FlowCreditManager
creditManager,
+ ClientDeliveryMethod
deliveryMethod)
{
- return new AckConsumer(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod, false);
+ return new AckConsumer(channel, consumerTag, filters, creditManager,
deliveryMethod, false);
}
static final class AckConsumer extends ConsumerTarget_0_8
@@ -244,10 +236,9 @@ public abstract class ConsumerTarget_0_8
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
boolean multiQueue)
{
- super(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod, multiQueue);
+ super(channel, consumerTag, filters, creditManager,
deliveryMethod, multiQueue);
}
/**
@@ -270,7 +261,7 @@ public abstract class ConsumerTarget_0_8
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
- recordMessageDelivery(consumer, entry, deliveryTag);
+ getChannel().addUnacknowledgedMessage(entry, deliveryTag,
consumer);
long size = sendToClient(consumer, entry.getMessage(),
entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
}
@@ -302,7 +293,6 @@ public abstract class ConsumerTarget_0_8
FieldTable arguments,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod,
boolean multiQueue)
{
super(State.ACTIVE, isPullOnly(arguments), multiQueue,
channel.getAMQPConnection());
@@ -314,7 +304,6 @@ public abstract class ConsumerTarget_0_8
creditManager.addStateListener(this);
_deliveryMethod = deliveryMethod;
- _recordMethod = recordMethod;
if (arguments != null)
{
@@ -454,14 +443,6 @@ public abstract class ConsumerTarget_0_8
}
- protected void recordMessageDelivery(final ConsumerImpl consumer,
- final MessageInstance entry,
- final long deliveryTag)
- {
- _recordMethod.recordMessageDelivery(consumer, entry, deliveryTag);
- }
-
-
public void confirmAutoClose()
{
ProtocolOutputConverter converter =
getChannel().getConnection().getProtocolOutputConverter();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1769396&r1=1769395&r2=1769396&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
Sat Nov 12 18:05:45 2016
@@ -29,7 +29,7 @@ import org.apache.qpid.server.message.Me
public interface UnacknowledgedMessageMap
{
- public interface Visitor
+ interface Visitor
{
/**
* @param deliveryTag
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]