Author: kwall Date: Thu Sep 22 16:46:26 2016 New Revision: 1761977 URL: http://svn.apache.org/viewvc?rev=1761977&view=rev Log: QPID-7366: [Java Broker] REST message publication
* added support system tests * guarded for messages published with no headers at all * allowed internal/0-10 converter to convert messageId, if it is a valid format UUID Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1761977&r1=1761976&r2=1761977&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Thu Sep 22 16:46:26 2016 @@ -209,7 +209,10 @@ public interface VirtualHost<X extends V @ManagedOperation(nonModifying = true) Connection<?> getConnection(@Param(name="name") String name); - @ManagedOperation(secure = true) + @ManagedOperation(secure = true, + description = "Publishes a message to a specified address. " + + "Returns the number of queues onto which it has been placed, " + + " or zero, if the address routes to no queues.") int publishMessage(@Param(name = "message")ManageableMessage message); @ManagedOperation(nonModifying = true, description = "Extract configuration", paramRequiringSecure = "includeSecureAttributes") Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1761977&r1=1761976&r2=1761977&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Sep 22 16:46:26 2016 @@ -1664,25 +1664,30 @@ public abstract class AbstractVirtualHos @Override public Object getHeader(final String name) { - return _message.getHeaders().get(name); + return getHeaders().get(name); } @Override public boolean containsHeaders(final Set<String> names) { - return _message.getHeaders().keySet().containsAll(names); + return getHeaders().keySet().containsAll(names); } @Override public boolean containsHeader(final String name) { - return _message.getHeaders().keySet().contains(name); + return getHeaders().keySet().contains(name); } @Override public Collection<String> getHeaderNames() { - return Collections.unmodifiableCollection(_message.getHeaders().keySet()); + return Collections.unmodifiableCollection(getHeaders().keySet()); + } + + private Map<String, Object> getHeaders() + { + return _message.getHeaders() == null ? Collections.<String, Object>emptyMap() : _message.getHeaders(); } } Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1761977&r1=1761976&r2=1761977&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original) +++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Thu Sep 22 16:46:26 2016 @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol. import java.util.Collection; import java.util.Collections; +import java.util.UUID; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.internal.InternalMessage; @@ -108,8 +109,6 @@ public class MessageConverter_Internal_t DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProps = new MessageProperties(); - - deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); @@ -118,11 +117,22 @@ public class MessageConverter_Internal_t messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); messageProps.setContentLength(size); messageProps.setContentType(bodyMimeType); - if(serverMsg.getMessageHeader().getCorrelationId() != null) + if (serverMsg.getMessageHeader().getCorrelationId() != null) { messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); } messageProps.setApplicationHeaders(serverMsg.getMessageHeader().getHeaderMap()); + if (serverMsg.getMessageHeader().getMessageId() != null) + { + try + { + messageProps.setMessageId(UUID.fromString(serverMsg.getMessageHeader().getMessageId())); + } + catch (IllegalArgumentException iae) + { + // ignore message id is not a UUID + } + } Header header = new Header(deliveryProps, messageProps, null); return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); } Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1761977&r1=1761976&r2=1761977&view=diff ============================================================================== --- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original) +++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Thu Sep 22 16:46:26 2016 @@ -398,6 +398,14 @@ public class RestTestHelper return readJsonResponse(connection, valueType); } + public <T> T postJson(String path, final Object data , final Class<T> valueType) throws IOException + { + HttpURLConnection connection = openManagementConnection(path, "POST"); + connection.connect(); + writeJsonRequest(connection, data); + return readJsonResponse(connection, valueType); + } + public void createNewGroupMember(String groupProviderName, String groupName, String memberName, int responseCode) throws IOException { HttpURLConnection connection = openManagementConnection( Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java?rev=1761977&view=auto ============================================================================== --- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java (added) +++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/PublishMessageRestTest.java Thu Sep 22 16:46:26 2016 @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.rest; + +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.servlet.http.HttpServletResponse; + +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.port.HttpPort; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +public class PublishMessageRestTest extends QpidRestTestCase +{ + + private Connection _connection; + private Session _session; + private String _queueName; + private MessageConsumer _consumer; + private String _publishMessageOpUrl; + private String _queueUrl; + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _connection = getConnection(); + _connection.start(); + + _session = _connection.createSession(true, Session.SESSION_TRANSACTED); + _queueName = getTestQueueName(); + Destination queue = _session.createQueue(_queueName); + _consumer = _session.createConsumer(queue); + + _publishMessageOpUrl = String.format("virtualhost/%s/%s/publishMessage", TEST1_VIRTUALHOST, TEST1_VIRTUALHOST); + _queueUrl = String.format("queue/%s/%s/", TEST1_VIRTUALHOST, TEST1_VIRTUALHOST); + } + + @Override + protected void customizeConfiguration() throws Exception + { + super.customizeConfiguration(); + getDefaultBrokerConfiguration().setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, + HttpPort.ALLOW_CONFIDENTIAL_OPERATIONS_ON_INSECURE_CHANNELS, + true); + } + + public void testPublishMinimalEmptyMessage() throws Exception + { + Map<String, Object> messageBody = new HashMap<>(); + messageBody.put("address", _queueName); + + getRestTestHelper().submitRequest(_publishMessageOpUrl, + "POST", + Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK); + + Message message = _consumer.receive(getLongReceiveTimeout()); + assertNotNull("Expected message not received", message); + assertNull("Unexpected JMSMessageID", message.getJMSMessageID()); + assertNull("Unexpected JMSCorrelationID", message.getJMSCorrelationID()); + assertEquals("Unexpected JMSExpiration", 0, message.getJMSExpiration()); + assertNotSame("Unexpected JMSTimestamp", 0, message.getJMSTimestamp()); + assertFalse("Unexpected number of mesage properties", message.getPropertyNames().hasMoreElements()); + } + + public void testPublishMessageWithPropertiesAndHeaders() throws Exception + { + final String messageId = UUID.randomUUID().toString(); + final long tomorrow = TimeUnit.DAYS.toMillis(1) + System.currentTimeMillis(); + final Map<String, Object> headers = new HashMap<>(); + headers.put("stringprop", "mystring"); + headers.put("intprop", Integer.MIN_VALUE); + headers.put("longprop", Long.MAX_VALUE); + //headers.put("", "mykeyisempty"); // 0-8..0-91 Causes Broker to die (MessageConverter_Internal_to_v0_8), 0-10 fails in JMS client on receipt + //headers.put("nullpropvalue", null); // 0-8..0-91 Causes Broker failure (MessageConverter_Internal_to_v0_8), 0-10 JMS client ignores property + + final Map<String, Object> messageBody = new HashMap<>(); + messageBody.put("messageId", messageId); + messageBody.put("address", _queueName); + messageBody.put("expiration", tomorrow); + messageBody.put("headers", headers); + + getRestTestHelper().submitRequest(_publishMessageOpUrl, + "POST", + Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK); + + Message message = _consumer.receive(getLongReceiveTimeout()); + assertNotNull("Expected message not received", message); + final String jmsMessageID = message.getJMSMessageID().replaceFirst("ID:", ""); + assertEquals("Unexpected JMSMessageID", messageId, jmsMessageID); + assertEquals("Unexpected JMSExpiration", tomorrow, message.getJMSExpiration()); + + final Enumeration propertyEnumeration = message.getPropertyNames(); + int count = 0; + while(propertyEnumeration.hasMoreElements()) + { + String key = (String) propertyEnumeration.nextElement(); + assertEquals("Unexpected property value fo key : " + key, headers.get(key), message.getObjectProperty(key)); + count++; + } + assertEquals("Unexpected number of properties", headers.size(), count); + } + + public void testPublishStringMessage() throws Exception + { + final String content = "Hello world"; + TextMessage message = publishMessageWithContent(content, TextMessage.class); + assertEquals("Unexpected message content", content, message.getText()); + } + + public void testPublishMapMessage() throws Exception + { + final Map<String, Object> content = new HashMap<>(); + content.put("key1", "astring"); + content.put("key2", Integer.MIN_VALUE); + content.put("key3", Long.MAX_VALUE); + content.put("key4", null); + MapMessage message = publishMessageWithContent(content, MapMessage.class); + final Enumeration mapNames = message.getMapNames(); + int entryCount = 0; + while(mapNames.hasMoreElements()) + { + String key = (String) mapNames.nextElement(); + assertEquals("Unexpected map content for key : " + key, content.get(key), message.getObject(key)); + entryCount++; + } + assertEquals("Unexpected number of key/value pairs in map message", content.size(), entryCount); + } + + public void testPublishRouting() throws Exception + { + final String queueName = UUID.randomUUID().toString(); + Map<String, Object> messageBody = Collections.<String, Object>singletonMap("address", queueName); + + int enqueues = getRestTestHelper().postJson(_publishMessageOpUrl, + Collections.singletonMap("message", messageBody), + Integer.class); + assertEquals("Unexpected number of enqueues", 0, enqueues); + + getRestTestHelper().submitRequest(_queueUrl, "POST", Collections.singletonMap(Queue.NAME, queueName), HttpServletResponse.SC_CREATED); + + enqueues = getRestTestHelper().postJson(_publishMessageOpUrl, + Collections.singletonMap("message", messageBody), + Integer.class); + + + assertEquals("Unexpected number of enqueues after queue creation", 1, enqueues); + } + + private <M extends Message> M publishMessageWithContent(final Object content, final Class<M> expectedMessageClass) throws Exception + { + Map<String, Object> messageBody = new HashMap<>(); + messageBody.put("address", _queueName); + messageBody.put("content", content); + + getRestTestHelper().submitRequest(_publishMessageOpUrl, + "POST", + Collections.singletonMap("message", messageBody), HttpServletResponse.SC_OK); + + M message = (M) _consumer.receive(getLongReceiveTimeout()); + assertNotNull("Expected message not received", message); + assertTrue(String.format("Unexpected message type. Expecting %s got %s", expectedMessageClass, message.getClass()), + expectedMessageClass.isAssignableFrom(message.getClass())); + return message; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org