Repository: qpid-broker-j Updated Branches: refs/heads/master 50b462d7a -> c80aceab2
[Broker-J][AMQP 0-8..0-10] Queue declare arguments that match know attributes should be acceptable Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c80aceab Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c80aceab Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c80aceab Branch: refs/heads/master Commit: c80aceab29fc8c6c835c0d83554286bbe2ed8e36 Parents: 50b462d Author: Alex Rudyy <[email protected]> Authored: Tue Dec 4 18:04:34 2018 +0000 Committer: Alex Rudyy <[email protected]> Committed: Tue Dec 4 18:04:34 2018 +0000 ---------------------------------------------------------------------- .../server/queue/QueueArgumentsConverter.java | 20 ++- .../protocol/v0_10/ServerSessionDelegate.java | 4 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 2 +- .../qpid/tests/protocol/v0_10/Interaction.java | 22 +++ .../tests/protocol/v0_10/QueueInteraction.java | 8 + .../v0_10/LargeApplicationHeadersTest.java | 37 +---- .../protocol/v0_10/LargeMessageBodyTest.java | 36 +---- .../qpid/tests/protocol/v0_10/MessageTest.java | 81 +++------- .../v0_10/extensions/queue/QueueTest.java | 152 +++++++++++++++++ .../v0_8/extension/queue/QueueTest.java | 161 +++++++++++++++++++ 10 files changed, 397 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 65be871..e67e9a3 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -20,16 +20,21 @@ */ package org.apache.qpid.server.queue; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.model.AlternateBinding; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectAttribute; +import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry; +import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.OverflowPolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -130,11 +135,24 @@ public class QueueArgumentsConverter public static Map<String,Object> convertWireArgsToModel(final String queueName, - Map<String, Object> wireArguments) + final Map<String, Object> wireArguments, + final Model model) { Map<String,Object> modelArguments = new HashMap<>(); if(wireArguments != null) { + final ConfiguredObjectTypeRegistry typeRegistry = model.getTypeRegistry(); + final Map<String, ConfiguredObjectAttribute<?, ?>> attributeTypes = + new HashMap<>(typeRegistry.getAttributeTypes(Queue.class)); + typeRegistry.getTypeSpecialisations(Queue.class) + .forEach(type -> typeRegistry.getTypeSpecificAttributes(type) + .forEach(t -> attributeTypes.put(t.getName(), t))); + wireArguments.entrySet() + .stream() + .filter(entry -> attributeTypes.containsKey(entry.getKey()) + && !attributeTypes.get(entry.getKey()).isDerived()) + .forEach(entry -> modelArguments.put(entry.getKey(), entry.getValue())); + for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet()) { if(wireArguments.containsKey(entry.getKey())) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 505ea89..5dda560 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -1557,7 +1557,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme try { final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(queueName, - method.getArguments()); + method.getArguments(), + session.getAMQPConnection() + .getModel()); final String alternateExchangeName = method.getAlternateExchange(); if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName)) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 4b4f9c7..e2048a0 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -2985,7 +2985,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 } Map<String, Object> attributes = - QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments); + QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments, getModel()); attributes.put(Queue.NAME, queueNameString); attributes.put(Queue.DURABLE, durable); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java index 03deef5..8804c31 100644 --- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java @@ -21,6 +21,7 @@ package org.apache.qpid.tests.protocol.v0_10; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder; import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder; @@ -34,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.Method; import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached; import org.apache.qpid.tests.protocol.AbstractFrameTransport; import org.apache.qpid.tests.protocol.AbstractInteraction; +import org.apache.qpid.tests.protocol.Response; public class Interaction extends AbstractInteraction<Interaction> { @@ -204,4 +206,24 @@ public class Interaction extends AbstractInteraction<Interaction> { return _exchangeInteraction; } + + public <T extends Method> T consume(final Class<T> expected, + final Class<? extends Method>... ignore) + throws Exception + { + final Class<? extends Method>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1); + expectedResponses[ignore.length] = expected; + + T completed = null; + do + { + Response<?> response = consumeResponse(expectedResponses).getLatestResponse(); + if (expected.isAssignableFrom(response.getBody().getClass())) + { + completed = (T) response.getBody(); + } + } + while (completed == null); + return completed; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java index ca3edee..6171466 100644 --- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java +++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.tests.protocol.v0_10; +import java.util.Map; + import org.apache.qpid.server.protocol.v0_10.transport.QueueDeclare; import org.apache.qpid.server.protocol.v0_10.transport.QueueDelete; import org.apache.qpid.server.protocol.v0_10.transport.QueuePurge; @@ -143,4 +145,10 @@ public class QueueInteraction { return _interaction.sendPerformative(_query); } + + public QueueInteraction declareArguments(final Map<String, Object> arguments) + { + _declare.setArguments(arguments); + return this; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java index 2872f46..51a376a 100644 --- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java @@ -24,22 +24,17 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.junit.Before; import org.junit.Test; import org.apache.qpid.server.protocol.v0_10.transport.*; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -113,12 +108,10 @@ public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase .consumeResponse() .getLatestResponse(SessionCompleted.class); - - MessageTransfer transfer = consumeResponse(interaction, - MessageTransfer.class, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class); + MessageTransfer transfer = interaction.consume(MessageTransfer.class, + SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class); assertThat(transfer.getBodySize(), is(0)); @@ -134,28 +127,6 @@ public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase } } - private <T extends Method> T consumeResponse(final Interaction interaction, - final Class<T> expected, - final Class<? extends Method>... ignore) - throws Exception - { - List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore)); - possibleResponses.add(expected); - - T completed = null; - do - { - interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()])); - Response<?> response = interaction.getLatestResponse(); - if (expected.isAssignableFrom(response.getBody().getClass())) - { - completed = (T) response.getBody(); - } - } - while (completed == null); - return completed; - } - private Map<String, Object> createApplicationHeadersThatExceedSingleFrame(final int headerPropertySize, final int maxFrameSize) { Map<String, Object> applicationHeaders = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java index cc2ef42..6d829d5 100644 --- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java @@ -26,9 +26,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.stream.IntStream; import org.junit.Before; @@ -41,11 +38,9 @@ import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune; import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit; import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; -import org.apache.qpid.server.protocol.v0_10.transport.Method; import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint; import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted; import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -119,11 +114,10 @@ public class LargeMessageBodyTest extends BrokerAdminUsingTestBase .consumeResponse() .getLatestResponse(SessionCompleted.class); - MessageTransfer transfer = consumeResponse(interaction, - MessageTransfer.class, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class); + MessageTransfer transfer = interaction.consume(MessageTransfer.class, + SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class); assertThat(transfer.getBodySize(), is(equalTo(messageContent.length))); QpidByteBuffer receivedBody = transfer.getBody(); @@ -132,26 +126,4 @@ public class LargeMessageBodyTest extends BrokerAdminUsingTestBase assertThat(receivedBytes, is(equalTo(messageContent))); } } - - private <T extends Method> T consumeResponse(final Interaction interaction, - final Class<T> expected, - final Class<? extends Method>... ignore) - throws Exception - { - List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore)); - possibleResponses.add(expected); - - T completed = null; - do - { - interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()])); - Response<?> response = interaction.getLatestResponse(); - if (expected.isAssignableFrom(response.getBody().getClass())) - { - completed = (T) response.getBody(); - } - } - while (completed == null); - return completed; - } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java index 6747451..08baf22 100644 --- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java @@ -27,9 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.junit.Before; import org.junit.Test; @@ -41,14 +38,12 @@ import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode; import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode; import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit; import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; -import org.apache.qpid.server.protocol.v0_10.transport.Method; import org.apache.qpid.server.protocol.v0_10.transport.Range; import org.apache.qpid.server.protocol.v0_10.transport.RangeSet; import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint; import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted; import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed; import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush; -import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -115,10 +110,8 @@ public class MessageTest extends BrokerAdminUsingTestBase .flushCompleted() .flush(); - SessionCompleted completed = consumeResponse(interaction, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class); + SessionCompleted completed = + interaction.consume(SessionCompleted.class, SessionCommandPoint.class, SessionConfirmed.class); assertThat(completed.getCommands(), is(notNullValue())); assertThat(completed.getCommands().includes(0), is(equalTo(true))); @@ -160,11 +153,10 @@ public class MessageTest extends BrokerAdminUsingTestBase .flowValue(-1) .flow(); - MessageTransfer transfer = consumeResponse(interaction, - MessageTransfer.class, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class); + MessageTransfer transfer = interaction.consume(MessageTransfer.class, + SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class); try (QpidByteBuffer buffer = transfer.getBody()) { @@ -210,11 +202,10 @@ public class MessageTest extends BrokerAdminUsingTestBase .flowValue(-1) .flow(); - MessageTransfer transfer = consumeResponse(interaction, - MessageTransfer.class, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class); + MessageTransfer transfer = interaction.consume(MessageTransfer.class, + SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class); assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); @@ -224,11 +215,10 @@ public class MessageTest extends BrokerAdminUsingTestBase .flushCompleted() .flush(); - SessionCompleted completed = consumeResponse(interaction, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class, - SessionFlush.class); + SessionCompleted completed = interaction.consume(SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class, + SessionFlush.class); assertThat(completed.getCommands(), is(notNullValue())); assertThat(completed.getCommands().includes(3), is(equalTo(true))); @@ -273,12 +263,11 @@ public class MessageTest extends BrokerAdminUsingTestBase .flowValue(-1) .flow(); - MessageTransfer transfer = consumeResponse(interaction, - MessageTransfer.class, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class, - SessionFlush.class); + MessageTransfer transfer = interaction.consume(MessageTransfer.class, + SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class, + SessionFlush.class); assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); @@ -293,11 +282,10 @@ public class MessageTest extends BrokerAdminUsingTestBase .session().flushCompleted() .flush(); - SessionCompleted completed = consumeResponse(interaction, - SessionCompleted.class, - SessionCommandPoint.class, - SessionConfirmed.class, - SessionFlush.class); + SessionCompleted completed = interaction.consume(SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class, + SessionFlush.class); assertThat(completed.getCommands(), is(notNullValue())); assertThat(completed.getCommands().includes(4), is(equalTo(true))); @@ -305,27 +293,4 @@ public class MessageTest extends BrokerAdminUsingTestBase assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); } } - - private <T extends Method> T consumeResponse(final Interaction interaction, - final Class<T> expected, - final Class<? extends Method>... ignore) - throws Exception - { - List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore)); - possibleResponses.add(expected); - - T completed = null; - do - { - interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()])); - Response<?> response = interaction.getLatestResponse(); - if (expected.isAssignableFrom(response.getBody().getClass())) - { - completed = (T) response.getBody(); - } - } - while (completed == null); - return completed; - } - } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java new file mode 100644 index 0000000..f5216c2 --- /dev/null +++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_10.extensions.queue; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assume.assumeThat; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult; +import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit; +import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; +import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; +import org.apache.qpid.server.protocol.v0_10.transport.QueueQueryResult; +import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint; +import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted; +import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.protocol.v0_10.FrameTransport; +import org.apache.qpid.tests.protocol.v0_10.Interaction; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.utils.BrokerSpecific; + +@BrokerSpecific(kind = KIND_BROKER_J) +public class QueueTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + private static final byte[] SESSION_NAME = "test".getBytes(UTF_8); + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "10.queue.declare", description = "This command creates or checks a queue.") + public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + SessionCompleted completed = interaction.openAnonymousConnection() + .channelId(1) + .attachSession(SESSION_NAME) + .queue() + .declareQueue(BrokerAdmin.TEST_QUEUE_NAME) + .declareArguments(Collections.singletonMap("defaultFilters", + "{\"selector\":{\"x-filter-jms-selector\":[\"id=2\"]}}")) + .declareId(0) + .declare() + .session() + .flushCompleted() + .flush() + .consumeResponse() + .getLatestResponse(SessionCompleted.class); + + assertThat(completed.getCommands().includes(0), is(equalTo(true))); + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0))); + + MessageProperties messageProperties1 = new MessageProperties(); + messageProperties1.setApplicationHeaders(Collections.singletonMap("id", 1)); + + interaction.message() + .transferDestination(BrokerAdmin.TEST_QUEUE_NAME) + .transferId(0) + .transferHeader(null, messageProperties1) + .transferBody("Test1".getBytes(StandardCharsets.UTF_8)) + .transfer() + .session() + .flushCompleted() + .flush() + .consumeResponse(); + + MessageProperties messageProperties2 = new MessageProperties(); + messageProperties2.setApplicationHeaders(Collections.singletonMap("id", 2)); + final String body2 = "Message 2 Content"; + interaction.message() + .transferDestination(BrokerAdmin.TEST_QUEUE_NAME) + .transferId(1) + .transferHeader(null, messageProperties2) + .transferBody(body2.getBytes(StandardCharsets.UTF_8)) + .transfer() + .session() + .flushCompleted() + .flush() + .consumeResponse(); + + final String subscriberName = "Test"; + interaction.message() + .subscribeDestination(subscriberName) + .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME) + .subscribeId(0) + .subscribe() + .message() + .flowId(1) + .flowDestination(subscriberName) + .flowUnit(MessageCreditUnit.MESSAGE) + .flowValue(1) + .flow() + .message() + .flowId(2) + .flowDestination(subscriberName) + .flowUnit(MessageCreditUnit.BYTE) + .flowValue(-1) + .flow(); + + MessageTransfer transfer = interaction.consume(MessageTransfer.class, + SessionCompleted.class, + SessionCommandPoint.class, + SessionConfirmed.class); + + try (QpidByteBuffer buffer = transfer.getBody()) + { + final byte[] dst = new byte[buffer.remaining()]; + buffer.get(dst); + assertThat(new String(dst, UTF_8), is(equalTo(body2))); + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c80aceab/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java new file mode 100644 index 0000000..af809e1 --- /dev/null +++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tests.protocol.v0_8.extension.queue; + +import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.protocol.v0_8.FrameTransport; +import org.apache.qpid.tests.protocol.v0_8.Interaction; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.utils.BrokerSpecific; + +@BrokerSpecific(kind = KIND_BROKER_J) +public class QueueTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "1.7.2.1", description = "declare queue, create if needed") + public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + QueueDeclareOkBody response = interaction.openAnonymousConnection() + .channel().open().consumeResponse(ChannelOpenOkBody.class) + .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME) + .declareArguments(Collections.singletonMap("defaultFilters", + "{\"selector\":{\"x-filter-jms-selector\":[\"id=2\"]}}")) + .declare() + .consumeResponse().getLatestResponse(QueueDeclareOkBody.class); + + assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME)))); + assertThat(response.getMessageCount(), is(equalTo(0L))); + assertThat(response.getConsumerCount(), is(equalTo(0L))); + + // make sure that wire arguments took effect + // by publishing messages and consuming message matching filter + + String consumerTag = "test"; + interaction.basic().qosPrefetchCount(2) + .qos() + .consumeResponse(BasicQosOkBody.class) + .basic().consumeConsumerTag(consumerTag) + .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME) + .consume() + .consumeResponse(BasicConsumeOkBody.class) + .channel().flow(true) + .consumeResponse(ChannelFlowOkBody.class); + + String content2 = "Test Content 2"; + Map<String, Object> messageHeaders2 = Collections.singletonMap("id", 2); + String contentType = "text/plain"; + + // first message is not matching queue default filter + interaction.basic().publishExchange("") + .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME) + .publishMandatory(true) + .contentHeaderPropertiesContentType(contentType) + .contentHeaderPropertiesHeaders(Collections.singletonMap("id", 1)) + .content("Test1") + .publishMessage() + + // second message is matching queue default filter + .basic().publishExchange("") + .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME) + .publishMandatory(true) + .contentHeaderPropertiesContentType(contentType) + .contentHeaderPropertiesHeaders(messageHeaders2) + .content(content2) + .publishMessage(); + + // second message should be received + BasicDeliverBody delivery = + interaction.consumeResponse(BasicDeliverBody.class).getLatestResponse(BasicDeliverBody.class); + + assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag)))); + assertThat(delivery.getConsumerTag(), is(notNullValue())); + assertThat(delivery.getRedelivered(), is(equalTo(false))); + assertThat(delivery.getExchange(), is(nullValue())); + assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME)))); + + ContentHeaderBody header = + interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class); + + assertThat(header.getBodySize(), is(equalTo((long) content2.length()))); + BasicContentHeaderProperties properties = header.getProperties(); + Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders())); + assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders2)))); + assertThat(properties.getContentTypeAsString(), is(equalTo(contentType))); + + ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class); + + QpidByteBuffer payload = content.getPayload(); + byte[] contentData = new byte[payload.remaining()]; + payload.get(contentData); + payload.dispose(); + String receivedContent = new String(contentData, StandardCharsets.UTF_8); + + assertThat(receivedContent, is(equalTo(receivedContent))); + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2))); + + interaction.basic().ackDeliveryTag(delivery.getDeliveryTag()) + .ack() + .channel().close().consumeResponse(ChannelCloseOkBody.class); + assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1))); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
