Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Sun Dec 25 22:55:13 2016 @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.exchange; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anySet; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -27,6 +28,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -42,16 +44,23 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -84,6 +93,7 @@ public class HeadersExchangeTest extends when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class); when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor); when(_virtualHost.getChildExecutor()).thenReturn(_taskExecutor); + when(_virtualHost.getState()).thenReturn(State.ACTIVE); _factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(_virtualHost.getObjectFactory()).thenReturn(_factory); @@ -106,7 +116,7 @@ public class HeadersExchangeTest extends protected void routeAndTest(ServerMessage msg, Queue<?>... expected) throws Exception { - List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY); + List<? extends BaseQueue> results = routeToQueues(msg, "", InstanceProperties.EMPTY); List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); unexpected.removeAll(Arrays.asList(expected)); assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); @@ -116,6 +126,88 @@ public class HeadersExchangeTest extends assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size()); } + private List<? extends BaseQueue> routeToQueues(final ServerMessage message, + final String routingAddress, + final InstanceProperties instanceProperties) + { + RoutingResult result = _exchange.route(message, routingAddress, instanceProperties); + final List<BaseQueue> resultQueues = new ArrayList<>(); + result.send(new ServerTransaction() + { + @Override + public long getTransactionStartTime() + { + return 0; + } + + @Override + public long getTransactionUpdateTime() + { + return 0; + } + + @Override + public void addPostTransactionAction(final Action postTransactionAction) + { + + } + + @Override + public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction) + { + + } + + @Override + public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction) + { + + } + + @Override + public void enqueue(final TransactionLogResource queue, + final EnqueueableMessage message, + final EnqueueAction postTransactionAction) + { + resultQueues.add((BaseQueue) queue); + } + + @Override + public void enqueue(final Collection<? extends BaseQueue> queues, + final EnqueueableMessage message, + final EnqueueAction postTransactionAction) + { + resultQueues.addAll(queues); + } + + @Override + public void commit() + { + + } + + @Override + public void commit(final Runnable immediatePostTransactionAction) + { + + } + + @Override + public void rollback() + { + + } + + @Override + public boolean isTransactional() + { + return false; + } + }, null); + + return resultQueues; + } + private Queue<?> createAndBind(final String name, String... arguments) throws Exception @@ -169,6 +261,9 @@ public class HeadersExchangeTest extends when(q.getTaskExecutor()).thenReturn(taskExecutor); when(q.getChildExecutor()).thenReturn(taskExecutor); when(_virtualHost.getAttainedQueue(name)).thenReturn(q); + final RoutingResult routingResult = new RoutingResult(null); + routingResult.addQueue(q); + when(q.route(any(ServerMessage.class), anyString(), any(InstanceProperties.class))).thenReturn(routingResult); return q; } @@ -282,6 +377,7 @@ public class HeadersExchangeTest extends }); final ServerMessage serverMessage = mock(ServerMessage.class); when(serverMessage.getMessageHeader()).thenReturn(header); + when(serverMessage.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); return serverMessage; }
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Sun Dec 25 22:55:13 2016 @@ -25,6 +25,8 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -34,15 +36,20 @@ import org.junit.Assert; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.BrokerTestHelper; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.test.utils.QpidTestCase; public class TopicExchangeTest extends QpidTestCase @@ -353,24 +360,31 @@ public class TopicExchangeTest extends Q _exchange.bind(queue.getName(), bindingKey, bindArgs, false); ServerMessage matchMsg1 = mock(ServerMessage.class); + when(matchMsg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); AMQMessageHeader msgHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); when(matchMsg1.getMessageHeader()).thenReturn(msgHeader1); routeMessage(matchMsg1, bindingKey, 1); Assert.assertEquals("First message should be routed to queue", 1, queue.getQueueDepthMessages()); ServerMessage nonmatchMsg2 = mock(ServerMessage.class); + when(nonmatchMsg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); + AMQMessageHeader msgHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 5)); when(nonmatchMsg2.getMessageHeader()).thenReturn(msgHeader2); routeMessage(nonmatchMsg2, bindingKey, 2); Assert.assertEquals("Second message should not be routed to queue", 1, queue.getQueueDepthMessages()); ServerMessage nonmatchMsg3 = mock(ServerMessage.class); + when(nonmatchMsg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); + AMQMessageHeader msgHeader3 = createMessageHeader(Collections.<String, Object>emptyMap()); when(nonmatchMsg3.getMessageHeader()).thenReturn(msgHeader3); routeMessage(nonmatchMsg3, bindingKey, 3); Assert.assertEquals("Third message should not be routed to queue", 1, queue.getQueueDepthMessages()); ServerMessage matchMsg4 = mock(ServerMessage.class); + when(matchMsg4.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); + AMQMessageHeader msgHeader4 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7)); when(matchMsg4.getMessageHeader()).thenReturn(msgHeader4); routeMessage(matchMsg4, bindingKey, 4); @@ -388,6 +402,7 @@ public class TopicExchangeTest extends Q AMQMessageHeader mgsHeader1 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); ServerMessage msg1 = mock(ServerMessage.class); + when(msg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); when(msg1.getMessageHeader()).thenReturn(mgsHeader1); routeMessage(msg1, bindingKey, 1); @@ -400,6 +415,7 @@ public class TopicExchangeTest extends Q // Message that would have matched the original selector but not the new AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); ServerMessage msg2 = mock(ServerMessage.class); + when(msg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); when(msg2.getMessageHeader()).thenReturn(mgsHeader2); routeMessage(msg2, bindingKey, 2); @@ -408,6 +424,7 @@ public class TopicExchangeTest extends Q // Message that matches only the second AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7)); ServerMessage msg3 = mock(ServerMessage.class); + when(msg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); when(msg3.getMessageHeader()).thenReturn(mgsHeader3); routeMessage(msg3, bindingKey, 2); @@ -425,7 +442,7 @@ public class TopicExchangeTest extends Q _exchange.bind(queue.getName(), bindingKey, null, false); ServerMessage msg1 = mock(ServerMessage.class); - + when(msg1.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); routeMessage(msg1, bindingKey, 1); Assert.assertEquals(1, queue.getQueueDepthMessages()); @@ -436,6 +453,7 @@ public class TopicExchangeTest extends Q // Message that does not match the new selector AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 6)); ServerMessage msg2 = mock(ServerMessage.class); + when(msg2.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); when(msg2.getMessageHeader()).thenReturn(mgsHeader2); routeMessage(msg2, bindingKey, 2); @@ -444,6 +462,8 @@ public class TopicExchangeTest extends Q // Message that matches the selector AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, Object>singletonMap("arg", 7)); ServerMessage msg3 = mock(ServerMessage.class); + when(msg3.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); + when(msg3.getMessageHeader()).thenReturn(mgsHeader3); routeMessage(msg3, bindingKey, 2); @@ -455,13 +475,14 @@ public class TopicExchangeTest extends Q private int routeMessage(String routingKey, long messageNumber) { ServerMessage message = mock(ServerMessage.class); + when(message.isResourceAcceptable(any(TransactionLogResource.class))).thenReturn(true); return routeMessage(message, routingKey, messageNumber); } private int routeMessage(ServerMessage message, String routingKey, long messageNumber) { when(message.getInitialRoutingAddress()).thenReturn(routingKey); - List<? extends BaseQueue> queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); + List<? extends BaseQueue> queues = routeToQueues(message, routingKey, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); @@ -475,6 +496,88 @@ public class TopicExchangeTest extends Q return queues.size(); } + private List<? extends BaseQueue> routeToQueues(final ServerMessage message, + final String routingAddress, + final InstanceProperties instanceProperties) + { + RoutingResult result = _exchange.route(message, routingAddress, instanceProperties); + final List<BaseQueue> resultQueues = new ArrayList<>(); + result.send(new ServerTransaction() + { + @Override + public long getTransactionStartTime() + { + return 0; + } + + @Override + public long getTransactionUpdateTime() + { + return 0; + } + + @Override + public void addPostTransactionAction(final Action postTransactionAction) + { + + } + + @Override + public void dequeue(final MessageEnqueueRecord record, final Action postTransactionAction) + { + + } + + @Override + public void dequeue(final Collection<MessageInstance> messages, final Action postTransactionAction) + { + + } + + @Override + public void enqueue(final TransactionLogResource queue, + final EnqueueableMessage message, + final EnqueueAction postTransactionAction) + { + resultQueues.add((BaseQueue) queue); + } + + @Override + public void enqueue(final Collection<? extends BaseQueue> queues, + final EnqueueableMessage message, + final EnqueueAction postTransactionAction) + { + resultQueues.addAll(queues); + } + + @Override + public void commit() + { + + } + + @Override + public void commit(final Runnable immediatePostTransactionAction) + { + + } + + @Override + public void rollback() + { + + } + + @Override + public boolean isTransactional() + { + return false; + } + }, null); + + return resultQueues; + } + private AMQMessageHeader createMessageHeader(Map<String, Object> headers) { AMQMessageHeader messageHeader = mock(AMQMessageHeader.class); Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Sun Dec 25 22:55:13 2016 @@ -54,7 +54,6 @@ public class AsyncAutoCommitTransactionT when(_messageStore.newTransaction()).thenReturn(_storeTransaction); when(_storeTransaction.commitTranAsync((Void) null)).thenReturn(_future); - when(_queue.isDurable()).thenReturn(true); when(_queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Sun Dec 25 22:55:13 2016 @@ -424,7 +424,6 @@ public class AutoCommitTransactionTest e private BaseQueue createTestAMQQueue(final boolean durable) { BaseQueue queue = mock(BaseQueue.class); - when(queue.isDurable()).thenReturn(durable); when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original) +++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Sun Dec 25 22:55:13 2016 @@ -658,7 +658,6 @@ public class LocalTransactionTest extend private BaseQueue createQueue(final boolean durable) { BaseQueue queue = mock(BaseQueue.class); - when(queue.isDurable()).thenReturn(durable); when(queue.getMessageDurability()).thenReturn(durable ? MessageDurability.DEFAULT : MessageDurability.NEVER); return queue; } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Dec 25 22:55:13 2016 @@ -55,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.connection.SessionPrincipal; -import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -65,6 +64,7 @@ import org.apache.qpid.server.message.In import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageInstanceConsumer; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.model.AbstractConfigurationChangeListener; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfigurationChangeListener; @@ -98,22 +98,7 @@ import org.apache.qpid.server.txn.Unknow import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlow; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageSetFlowMode; -import org.apache.qpid.transport.MessageStop; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.RangeSetFactory; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.Ticker; public class ServerSession extends Session @@ -298,10 +283,9 @@ public class ServerSession extends Sessi _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } - int enqueues = exchange.send(message, - message.getInitialRoutingAddress(), - instanceProperties, _transaction, _checkCapacityAction - ); + final RoutingResult<MessageTransferMessage> result = + exchange.route(message, message.getInitialRoutingAddress(), instanceProperties); + int enqueues = result.send(_transaction, _checkCapacityAction); getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); incrementUncommittedMessageSize(message.getStoredMessage()); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Dec 25 22:55:13 2016 @@ -81,6 +81,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -494,11 +495,12 @@ public class AMQChannel } }; - int enqueues = destination.send(amqMessage, - amqMessage.getInitialRoutingAddress(), - instanceProperties, _transaction, - immediate ? _immediateAction : _capacityCheckAction - ); + final RoutingResult<AMQMessage> result = + destination.route(amqMessage, + amqMessage.getInitialRoutingAddress(), + instanceProperties); + + int enqueues = result.send(_transaction, immediate ? _immediateAction : _capacityCheckAction); if (enqueues == 0) { finallyAction = handleUnroutableMessage(amqMessage); Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Sun Dec 25 22:55:13 2016 @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol. import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -37,6 +38,10 @@ import java.util.Set; import javax.security.auth.Subject; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.MethodRegistry; @@ -46,6 +51,7 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -61,8 +67,6 @@ import org.apache.qpid.server.store.Mess import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMemoryMessage; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -213,7 +217,16 @@ public class AMQChannelTest extends Qpid return messageHandle; } }); - + final ArgumentCaptor<ServerMessage> messageCaptor = ArgumentCaptor.forClass(ServerMessage.class); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ServerMessage message = messageCaptor.getValue(); + return new RoutingResult(message); + } + }).when(_messageDestination).route(messageCaptor.capture(), eq(ROUTING_KEY.toString()), any(InstanceProperties.class)); AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore()); BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); @@ -221,10 +234,8 @@ public class AMQChannelTest extends Qpid channel.receiveBasicPublish(AMQShortString.EMPTY_STRING, ROUTING_KEY, false, false); channel.receiveMessageHeader(properties, 0); - verify(_messageDestination).send((ServerMessage) any(), + verify(_messageDestination).route((ServerMessage) any(), eq(ROUTING_KEY.toString()), - any(InstanceProperties.class), - any(ServerTransaction.class), - any(Action.class) ); + any(InstanceProperties.class)); } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Sun Dec 25 22:55:13 2016 @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.protocol.v1_0.type.Outcome; @@ -105,11 +106,10 @@ public class ExchangeDestination impleme return null; }}; - int enqueues = _exchange.send(message, - routingAddress, - instanceProperties, - txn, - action); + final RoutingResult result = _exchange.route(message, + routingAddress, + instanceProperties); + int enqueues = result.send(txn, action); if(enqueues == 0) { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Sun Dec 25 22:55:13 2016 @@ -28,6 +28,7 @@ import org.apache.qpid.server.logging.me import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.protocol.v1_0.type.Outcome; @@ -104,7 +105,8 @@ public class NodeReceivingDestination im return null; }}; - int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, action); + RoutingResult result = _destination.route(message, routingAddress, instanceProperties); + int enqueues = result.send(txn, action); if(enqueues == 0) { Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Sun Dec 25 22:55:13 2016 @@ -43,9 +43,9 @@ import org.apache.qpid.exchange.Exchange import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSender; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Connection; @@ -64,7 +64,6 @@ import org.apache.qpid.server.store.Stor import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.txn.DtxNotSupportedException; import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode; @@ -354,20 +353,18 @@ public class ManagementAddressSpace impl } @Override - public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final String routingAddress, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message, + final String routingAddress, + final InstanceProperties instanceProperties) { MessageDestination destination = getAttainedMessageDestination(routingAddress); if(destination == null || destination == this) { - return 0; + return new RoutingResult<>(message); } else { - return destination.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); + return destination.route(message, routingAddress, instanceProperties); } } Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec 25 22:55:13 2016 @@ -64,6 +64,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.MessageSender; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.message.internal.InternalMessageHeader; @@ -81,6 +82,7 @@ import org.apache.qpid.server.model.Publ import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityToken; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.MessageEnqueueRecord; @@ -92,7 +94,7 @@ import org.apache.qpid.server.util.Actio import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; -class ManagementNode implements MessageSource, MessageDestination +class ManagementNode implements MessageSource, MessageDestination, BaseQueue { private static final Logger LOGGER = LoggerFactory.getLogger(ManagementNode.class); @@ -173,6 +175,15 @@ class ManagementNode implements MessageS private final ManagementInputConverter _managementInputConverter; + private static final InstanceProperties CONSUMED_INSTANCE_PROPERTIES = new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + return null; + } + }; + ManagementNode(final NamedAddressSpace addressSpace, final ConfiguredObject<?> configuredObject) { @@ -332,12 +343,11 @@ class ManagementNode implements MessageS } @Override - public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final String routingAddress, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message, + final String routingAddress, + final InstanceProperties instanceProperties) { + final RoutingResult<M> result = new RoutingResult<>(message); if(message.isResourceAcceptable(this)) { @SuppressWarnings("unchecked") @@ -347,33 +357,12 @@ class ManagementNode implements MessageS if (converter != null) { - final InternalMessage msg = converter.convert(message, _addressSpace); - txn.addPostTransactionAction(new ServerTransaction.Action() - { - @Override - public void postCommit() - { - enqueue(msg, instanceProperties, postEnqueueAction); - } - - @Override - public void onRollback() - { - - } - }); - - return 1; + result.addQueue(this); } - else - { - return 0; - } - } - else - { - return 0; + } + return result; + } @Override @@ -408,7 +397,7 @@ class ManagementNode implements MessageS String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE); String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE); String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER); - + LOGGER.debug("Management Node identity: {}, type: {}, operation {}", id, type, operation); InternalMessage response; // TODO - handle runtime exceptions @@ -433,6 +422,26 @@ class ManagementNode implements MessageS } + @Override + public void enqueue(final ServerMessage message, + final Action<? super MessageInstance> action, + final MessageEnqueueRecord record) + { + MessageConverter<ServerMessage, InternalMessage> converter = + (MessageConverter<ServerMessage, InternalMessage>) MessageConverterRegistry.getConverter((message.getClass()), InternalMessage.class); + + final InternalMessage msg = converter.convert(message, _addressSpace); + + enqueue(msg, CONSUMED_INSTANCE_PROPERTIES, action); + + } + + @Override + public boolean isDeleted() + { + return false; + } + private interface StandardOperation { String getName(); @@ -982,11 +991,9 @@ class ManagementNode implements MessageS String replyTo = message.getMessageHeader().getReplyTo(); response.setInitialRoutingAddress(replyTo); - - getResponseDestination(replyTo).send(response, - replyTo, InstanceProperties.EMPTY, - new AutoCommitTransaction(_addressSpace.getMessageStore()), - null); + final MessageDestination responseDestination = getResponseDestination(replyTo); + RoutingResult<InternalMessage> result = responseDestination.route(response, replyTo, InstanceProperties.EMPTY); + result.send(new AutoCommitTransaction(_addressSpace.getMessageStore()), null); } @@ -1533,14 +1540,13 @@ class ManagementNode implements MessageS private class ConsumedMessageInstance implements MessageInstance { + private final ServerMessage _message; - private final InstanceProperties _properties; ConsumedMessageInstance(final ServerMessage message, final InstanceProperties properties) { _message = message; - _properties = properties; } @Override @@ -1741,7 +1747,7 @@ class ManagementNode implements MessageS @Override public InstanceProperties getInstanceProperties() { - return _properties; + return CONSUMED_INSTANCE_PROPERTIES; } @Override Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Dec 25 22:55:13 2016 @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.InstanceProperties; @@ -32,24 +33,28 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.MessageSender; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.PublishingLink; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.message.MessageContainer; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityToken; +import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; -class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, MessageDestination +class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, MessageDestination, + BaseQueue { private final ManagementNode _managementNode; private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>()); private final T _target; private final String _name; - private final Object _identifier = new Object(); + private final UUID _identifier = UUID.randomUUID(); public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, T target) @@ -142,14 +147,25 @@ class ManagementNodeConsumer<T extends C } @Override - public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final String routingAddress, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + public UUID getId() { - send((InternalMessage)message); - return 1; + return _identifier; + } + + @Override + public MessageDurability getMessageDurability() + { + return MessageDurability.NEVER; + } + + @Override + public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message, + final String routingAddress, + final InstanceProperties instanceProperties) + { + RoutingResult<M> result = new RoutingResult<>(message); + result.addQueue(this); + return result; } @Override @@ -181,10 +197,30 @@ class ManagementNodeConsumer<T extends C return _managementNode; } - void send(final InternalMessage response) + void send(ManagementResponse responseEntry) { - final ManagementResponse responseEntry = new ManagementResponse(this, response); _queue.add(responseEntry); _target.notifyWork(); } + + @Override + public void enqueue(final ServerMessage message, + final Action<? super MessageInstance> action, + final MessageEnqueueRecord record) + { + final InternalMessage internalMessage = (InternalMessage) message; + final ManagementResponse responseEntry = new ManagementResponse(this, internalMessage); + + send(responseEntry); + if(action != null) + { + action.performAction(responseEntry); + } + } + + @Override + public boolean isDeleted() + { + return isClosed(); + } } Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java (original) +++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java Sun Dec 25 22:55:13 2016 @@ -47,6 +47,7 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageInstanceConsumer; import org.apache.qpid.server.message.MessageSender; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.NamedAddressSpace; @@ -55,8 +56,6 @@ import org.apache.qpid.server.protocol.A import org.apache.qpid.server.security.SecurityToken; import org.apache.qpid.server.store.MessageDurability; import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; public class ProxyMessageSource implements MessageSource, MessageDestination { @@ -94,13 +93,11 @@ public class ProxyMessageSource implemen } @Override - public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, - final String routingAddress, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action<? super MessageInstance> postEnqueueAction) + public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message, + final String routingAddress, + final InstanceProperties instanceProperties) { - return 0; + return new RoutingResult<>(message); } @Override Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1776037&r1=1776036&r2=1776037&view=diff ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Sun Dec 25 22:55:13 2016 @@ -48,6 +48,7 @@ import org.apache.qpid.server.configurat import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.RoutingResult; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -615,7 +616,8 @@ public class VirtualHostMessageStoreTest ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore()); - exchange.send(currentMessage, routingKey, InstanceProperties.EMPTY, trans, null); + RoutingResult result = exchange.route(currentMessage, routingKey, InstanceProperties.EMPTY); + result.send(trans, null); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
