http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java new file mode 100644 index 0000000..f4b54a6 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsMessageIntegrityTest.java @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.UUID; +import java.util.Vector; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageEOFException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests that messages sent and received don't lose data and have expected + * JMS Message property values. + */ +public class JmsMessageIntegrityTest extends AmqpTestSupport { + + private Connection connection; + + @Override + public void setUp() throws Exception { + super.setUp(); + connection = createAmqpConnection(); + } + + @Test + public void testTextMessage() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + { + TextMessage message = session.createTextMessage(); + message.setText("Hi"); + producer.send(message); + } + { + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("Hi", message.getText()); + } + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testBytesMessageLength() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + { + BytesMessage message = session.createBytesMessage(); + message.writeInt(1); + message.writeInt(2); + message.writeInt(3); + message.writeInt(4); + producer.send(message); + } + { + BytesMessage message = (BytesMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals(16, message.getBodyLength()); + } + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testObjectMessage() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + UUID payload = UUID.randomUUID(); + + { + ObjectMessage message = session.createObjectMessage(); + message.setObject(payload); + producer.send(message); + } + { + ObjectMessage message = (ObjectMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals(payload, message.getObject()); + } + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testBytesMessage() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + { + BytesMessage message = session.createBytesMessage(); + message.writeBoolean(true); + producer.send(message); + } + { + BytesMessage message = (BytesMessage)consumer.receive(1000); + assertNotNull(message); + assertTrue(message.readBoolean()); + + try { + message.readByte(); + fail("Expected exception not thrown."); + } catch (MessageEOFException e) { + } + } + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testStreamMessage() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + { + StreamMessage message = session.createStreamMessage(); + message.writeString("This is a test to see how it works."); + producer.send(message); + } + { + StreamMessage message = (StreamMessage)consumer.receive(1000); + assertNotNull(message); + + // Invalid conversion should throw exception and not move the stream position. + try { + message.readByte(); + fail("Should have received NumberFormatException"); + } catch (NumberFormatException e) { + } + + assertEquals("This is a test to see how it works.", message.readString()); + + // Invalid conversion should throw exception and not move the stream position. + try { + message.readByte(); + fail("Should have received MessageEOFException"); + } catch (MessageEOFException e) { + } + } + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testMapMessage() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + { + MapMessage message = session.createMapMessage(); + message.setBoolean("boolKey", true); + producer.send(message); + } + { + MapMessage message = (MapMessage)consumer.receive(1000); + assertNotNull(message); + assertTrue(message.getBoolean("boolKey")); + } + assertNull(consumer.receiveNoWait()); + } + + static class ForeignMessage implements TextMessage { + + public int deliveryMode; + + private String messageId; + private long timestamp; + private String correlationId; + private Destination replyTo; + private Destination destination; + private boolean redelivered; + private String type; + private long expiration; + private int priority; + private String text; + private final HashMap<String, Object> props = new HashMap<String, Object>(); + + @Override + public String getJMSMessageID() throws JMSException { + return messageId; + } + + @Override + public void setJMSMessageID(String arg0) throws JMSException { + messageId = arg0; + } + + @Override + public long getJMSTimestamp() throws JMSException { + return timestamp; + } + + @Override + public void setJMSTimestamp(long arg0) throws JMSException { + timestamp = arg0; + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + return null; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException { + } + + @Override + public void setJMSCorrelationID(String arg0) throws JMSException { + correlationId = arg0; + } + + @Override + public String getJMSCorrelationID() throws JMSException { + return correlationId; + } + + @Override + public Destination getJMSReplyTo() throws JMSException { + return replyTo; + } + + @Override + public void setJMSReplyTo(Destination arg0) throws JMSException { + replyTo = arg0; + } + + @Override + public Destination getJMSDestination() throws JMSException { + return destination; + } + + @Override + public void setJMSDestination(Destination arg0) throws JMSException { + destination = arg0; + } + + @Override + public int getJMSDeliveryMode() throws JMSException { + return deliveryMode; + } + + @Override + public void setJMSDeliveryMode(int arg0) throws JMSException { + deliveryMode = arg0; + } + + @Override + public boolean getJMSRedelivered() throws JMSException { + return redelivered; + } + + @Override + public void setJMSRedelivered(boolean arg0) throws JMSException { + redelivered = arg0; + } + + @Override + public String getJMSType() throws JMSException { + return type; + } + + @Override + public void setJMSType(String arg0) throws JMSException { + type = arg0; + } + + @Override + public long getJMSExpiration() throws JMSException { + return expiration; + } + + @Override + public void setJMSExpiration(long arg0) throws JMSException { + expiration = arg0; + } + + @Override + public int getJMSPriority() throws JMSException { + return priority; + } + + @Override + public void setJMSPriority(int arg0) throws JMSException { + priority = arg0; + } + + @Override + public void clearProperties() throws JMSException { + } + + @Override + public boolean propertyExists(String arg0) throws JMSException { + return false; + } + + @Override + public boolean getBooleanProperty(String arg0) throws JMSException { + return false; + } + + @Override + public byte getByteProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public short getShortProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public int getIntProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public long getLongProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public float getFloatProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public double getDoubleProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public String getStringProperty(String arg0) throws JMSException { + return (String)props.get(arg0); + } + + @Override + public Object getObjectProperty(String arg0) throws JMSException { + return props.get(arg0); + } + + @Override + public Enumeration<?> getPropertyNames() throws JMSException { + return new Vector<String>(props.keySet()).elements(); + } + + @Override + public void setBooleanProperty(String arg0, boolean arg1) throws JMSException { + } + + @Override + public void setByteProperty(String arg0, byte arg1) throws JMSException { + } + + @Override + public void setShortProperty(String arg0, short arg1) throws JMSException { + } + + @Override + public void setIntProperty(String arg0, int arg1) throws JMSException { + } + + @Override + public void setLongProperty(String arg0, long arg1) throws JMSException { + } + + @Override + public void setFloatProperty(String arg0, float arg1) throws JMSException { + } + + @Override + public void setDoubleProperty(String arg0, double arg1) throws JMSException { + } + + @Override + public void setStringProperty(String arg0, String arg1) throws JMSException { + props.put(arg0, arg1); + } + + @Override + public void setObjectProperty(String arg0, Object arg1) throws JMSException { + props.put(arg0, arg1); + } + + @Override + public void acknowledge() throws JMSException { + } + + @Override + public void clearBody() throws JMSException { + } + + @Override + public void setText(String arg0) throws JMSException { + text = arg0; + } + + @Override + public String getText() throws JMSException { + return text; + } + } + + // TODO - implement proper handling of foreign JMS Message and Destination types. + @Ignore("ActiveMQ is dropping messages as expired with current proton lib") + @Test + public void testForeignMessage() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + { + ForeignMessage message = new ForeignMessage(); + message.text = "Hello"; + message.setStringProperty("test", "value"); + long timeToLive = 10000L; + long start = System.currentTimeMillis(); + producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive); + long end = System.currentTimeMillis(); + + // validate jms spec 1.1 section 3.4.11 table 3.1 + // JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp + // must be set by sending a message. + + assertNotNull(message.getJMSDestination()); + assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode()); + assertTrue(start + timeToLive <= message.getJMSExpiration()); + assertTrue(end + timeToLive >= message.getJMSExpiration()); + assertEquals(7, message.getJMSPriority()); + assertNotNull(message.getJMSMessageID()); + assertTrue(start <= message.getJMSTimestamp()); + assertTrue(end >= message.getJMSTimestamp()); + } + { + TextMessage message = (TextMessage)consumer.receive(10000); + assertNotNull(message); + assertEquals("Hello", message.getText()); + assertEquals("value", message.getStringProperty("test")); + } + + assertNull(consumer.receiveNoWait()); + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java new file mode 100644 index 0000000..36253c6 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsQueueConnectionTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.QueueConnection; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test basic QueueConnection creation etc. + */ +public class JmsQueueConnectionTest extends AmqpTestSupport { + + @Test + public void testCreateQueueConnection() throws JMSException { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + QueueConnection connection = factory.createQueueConnection(); + assertNotNull(connection); + connection.close(); + } + + @Test(timeout=30000) + public void testCreateConnectionAsSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + factory.setUsername("system"); + factory.setPassword("manager"); + QueueConnection connection = factory.createQueueConnection(); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout=30000, expected = JMSSecurityException.class) + public void testCreateConnectionAsUnknwonUser() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + factory.setUsername("unknown"); + factory.setPassword("unknown"); + QueueConnection connection = factory.createQueueConnection(); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout=30000) + public void testCreateConnectionCallSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + QueueConnection connection = factory.createQueueConnection("system", "manager"); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout=30000, expected = JMSSecurityException.class) + public void testCreateConnectionCallUnknwonUser() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + QueueConnection connection = factory.createQueueConnection("unknown", "unknown"); + assertNotNull(connection); + connection.start(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java new file mode 100644 index 0000000..0e61d39 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSSLConnectionTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import static org.junit.Assert.assertNotNull; + +import java.net.URI; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test that we can connect to a broker over SSL. + */ +public class JmsSSLConnectionTest { + + private BrokerService brokerService; + + public static final String PASSWORD = "password"; + public static final String KEYSTORE = "src/test/resources/keystore"; + public static final String KEYSTORE_TYPE = "jks"; + + private URI connectionURI; + + @Before + public void setUp() throws Exception { + System.setProperty("javax.net.ssl.trustStore", KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", KEYSTORE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(true); + + TransportConnector connector = brokerService.addConnector("amqp+ssl://localhost:0"); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionURI = connector.getPublishableConnectURI(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + public String getConnectionURI() throws Exception { + return "amqps://" + connectionURI.getHost() + ":" + connectionURI.getPort(); + } + + @Test(timeout=30000) + public void testCreateConnection() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI()); + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertNotNull(connection); + connection.close(); + } + + @Test(timeout=30000) + public void testCreateConnectionAndStart() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getConnectionURI()); + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertNotNull(connection); + connection.start(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java new file mode 100644 index 0000000..efb3df3 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionClosedTest.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Validates all Session contracts following a close() call. + */ +public class JmsSessionClosedTest extends AmqpTestSupport { + + protected Connection connection; + + protected Session createSession() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.close(); + return session; + } + + @Override + public void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateMessageFails() throws Exception { + Session session = createSession(); + session.createMessage(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateTextMessageFails() throws Exception { + Session session = createSession(); + session.createTextMessage(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateTextMessageWithTextFails() throws Exception { + Session session = createSession(); + session.createTextMessage("TEST"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateMapMessageFails() throws Exception { + Session session = createSession(); + session.createMapMessage(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateStreamMessageFails() throws Exception { + Session session = createSession(); + session.createStreamMessage(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateBytesMessageFails() throws Exception { + Session session = createSession(); + session.createBytesMessage(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateObjectMessageFails() throws Exception { + Session session = createSession(); + session.createObjectMessage(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateObjectMessageWithObjectFails() throws Exception { + Session session = createSession(); + session.createObjectMessage("TEST"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetTransactedFails() throws Exception { + Session session = createSession(); + session.getTransacted(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetAcknowledgeModeFails() throws Exception { + Session session = createSession(); + session.getAcknowledgeMode(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCommitFails() throws Exception { + Session session = createSession(); + session.commit(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testRollbackFails() throws Exception { + Session session = createSession(); + session.rollback(); + } + + @Test(timeout=30000) + public void testCloseDoesNotFail() throws Exception { + Session session = createSession(); + session.close(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testRecoverFails() throws Exception { + Session session = createSession(); + session.recover(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetMessageListenerFails() throws Exception { + Session session = createSession(); + session.getMessageListener(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetMessageListenerFails() throws Exception { + Session session = createSession(); + MessageListener listener = new MessageListener() { + @Override + public void onMessage(Message message) { + } + }; + session.setMessageListener(listener); + } + + @Test(timeout=30000, expected=RuntimeException.class) + public void testRunFails() throws Exception { + Session session = createSession(); + session.run(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateProducerFails() throws Exception { + Session session = createSession(); + Destination destination = session.createQueue("test"); + session.createProducer(destination); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateConsumerDestinatioFails() throws Exception { + Session session = createSession(); + Destination destination = session.createQueue("test"); + session.createConsumer(destination); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateConsumerDestinatioSelectorFails() throws Exception { + Session session = createSession(); + Destination destination = session.createQueue("test"); + session.createConsumer(destination, "a = b"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateConsumerDestinatioSelectorBooleanFails() throws Exception { + Session session = createSession(); + Destination destination = session.createQueue("test"); + session.createConsumer(destination, "a = b", true); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateQueueFails() throws Exception { + Session session = createSession(); + session.createQueue("TEST"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateTopicFails() throws Exception { + Session session = createSession(); + session.createTopic("TEST"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateTemporaryQueueFails() throws Exception { + Session session = createSession(); + session.createTemporaryQueue(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateTemporaryTopicFails() throws Exception { + Session session = createSession(); + session.createTemporaryQueue(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateDurableSubscriberFails() throws Exception { + Session session = createSession(); + Topic destination = session.createTopic("TEST"); + session.createDurableSubscriber(destination, "test"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateDurableSubscriberSelectorBooleanFails() throws Exception { + Session session = createSession(); + Topic destination = session.createTopic("TEST"); + session.createDurableSubscriber(destination, "test", "a = b", false); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateQueueBrowserFails() throws Exception { + Session session = createSession(); + Queue destination = session.createQueue("test"); + session.createBrowser(destination); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testCreateQueueBrowserWithSelectorFails() throws Exception { + Session session = createSession(); + Queue destination = session.createQueue("test"); + session.createBrowser(destination, "a = b"); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testUnsubscribeFails() throws Exception { + Session session = createSession(); + session.unsubscribe("test"); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java new file mode 100644 index 0000000..5e6846c --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionFailedTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.Wait; + + +/** + * Tests the Session method contracts when the underlying connection is lost. + */ +public class JmsSessionFailedTest extends JmsSessionClosedTest { + + @Override + protected Session createSession() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + latch.countDown(); + } + }); + connection.start(); + stopPrimaryBroker(); + assertTrue(latch.await(20, TimeUnit.SECONDS)); + final JmsConnection jmsConnection = (JmsConnection) connection; + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !jmsConnection.isConnected(); + } + })); + return session; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java new file mode 100644 index 0000000..6c3ca35 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsSessionTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test basic Session functionality. + */ +public class JmsSessionTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateSession() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + session.close(); + } + + @Test(timeout=30000) + public void testSessionCreateProducer() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + Queue queue = session.createQueue("test.queue"); + MessageProducer producer = session.createProducer(queue); + + producer.close(); + session.close(); + } + + @Test(timeout=30000) + public void testSessionCreateConsumer() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + Queue queue = session.createQueue("test.queue"); + MessageConsumer consumer = session.createConsumer(queue); + + consumer.close(); + session.close(); + } + + @Test(timeout=30000) + public void testSessionDoubleCloseWithoutException() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.close(); + session.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java new file mode 100644 index 0000000..e423139 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsTopicConnectionTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.TopicConnection; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * + */ +public class JmsTopicConnectionTest extends AmqpTestSupport { + + @Test + public void testCreateQueueConnection() throws JMSException { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + TopicConnection connection = factory.createTopicConnection(); + assertNotNull(connection); + connection.close(); + } + + @Test(timeout=30000) + public void testCreateConnectionAsSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + factory.setUsername("system"); + factory.setPassword("manager"); + TopicConnection connection = factory.createTopicConnection(); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout=30000) + public void testCreateConnectionCallSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + TopicConnection connection = factory.createTopicConnection("system", "manager"); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout=30000, expected = JMSSecurityException.class) + public void testCreateConnectionAsUnknwonUser() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + factory.setUsername("unknown"); + factory.setPassword("unknown"); + TopicConnection connection = factory.createTopicConnection(); + assertNotNull(connection); + connection.start(); + connection.close(); + } + + @Test(timeout=30000, expected = JMSSecurityException.class) + public void testCreateConnectionCallUnknownUser() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + TopicConnection connection = factory.createTopicConnection("unknown", "unknown"); + assertNotNull(connection); + connection.start(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java new file mode 100644 index 0000000..90d7358 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ConsumeFromAMQPTest.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.bench; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +/** + * + */ +@Ignore +public class ConsumeFromAMQPTest extends AmqpTestSupport { + + private final int MSG_COUNT = 50 * 1000; + private final int NUM_RUNS = 10; + + @Override + protected boolean isForceAsyncSends() { + return true; + } + + @Override + protected boolean isAlwaysSyncSend() { + return false; + } + + @Override + protected String getAmqpTransformer() { + return "raw"; + } + + @Override + protected boolean isMessagePrioritySupported() { + return false; + } + + @Override + protected boolean isSendAcksAsync() { + return true; + } + + @Override + public String getAmqpConnectionURIOptions() { + return "provider.presettleProducers=true&provider.presettleConsumers=true"; + } + + @Test + public void oneConsumedForProfile() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + producer.close(); + + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + assertEquals("Queue should have a message", 1, queueView.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(7000); + assertNotNull(received); + consumer.close(); + + assertEquals("Queue should have ano messages", 0, queueView.getQueueSize()); + } + + @Test + public void testConsumeRateFromQueue() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + // Warm Up the broker. + produceMessages(queue, MSG_COUNT); + consumerMessages(queue, MSG_COUNT); + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + queueView.purge(); + + List<Long> sendTimes = new ArrayList<Long>(); + long cumulative = 0; + + for (int i = 0; i < NUM_RUNS; ++i) { + produceMessages(queue, MSG_COUNT); + long result = consumerMessages(queue, MSG_COUNT); + sendTimes.add(result); + cumulative += result; + LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result); + queueView.purge(); + } + + long smoothed = cumulative / NUM_RUNS; + LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed); + } + + @Test + public void testConsumeRateFromQueueAsync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + // Warm Up the broker. + produceMessages(queue, MSG_COUNT); + consumerMessagesAsync(queue, MSG_COUNT); + + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + + List<Long> sendTimes = new ArrayList<Long>(); + long cumulative = 0; + + for (int i = 0; i < NUM_RUNS; ++i) { + produceMessages(queue, MSG_COUNT); + long result = consumerMessagesAsync(queue, MSG_COUNT); + sendTimes.add(result); + cumulative += result; + LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result); + queueView.purge(); + } + + long smoothed = cumulative / NUM_RUNS; + LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed); + } + + protected long consumerMessages(Destination destination, int msgCount) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < msgCount; ++i) { + Message message = consumer.receive(7000); + assertNotNull("Failed to receive message " + i, message); + } + long result = (System.currentTimeMillis() - startTime); + + consumer.close(); + return result; + } + + protected long consumerMessagesAsync(Destination destination, int msgCount) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch doneLatch = new CountDownLatch(MSG_COUNT); + long startTime = System.currentTimeMillis(); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + doneLatch.countDown(); + } + }); + assertTrue(doneLatch.await(60, TimeUnit.SECONDS)); + long result = (System.currentTimeMillis() - startTime); + + consumer.close(); + return result; + } + + protected void produceMessages(Destination destination, int msgCount) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + + for (int i = 0; i < msgCount; ++i) { + producer.send(message); + } + + producer.close(); + } + + @Override + protected void configureBrokerPolicies(BrokerService broker) { + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policyEntry.setPrioritizedMessages(false); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setEnableAudit(false); + policyEntry.setOptimizedDispatch(false); + policyEntry.setQueuePrefetch(1000); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java new file mode 100644 index 0000000..9077f08 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToAMQPTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.bench; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Collect some basic throughput data on message producer. + */ +@Ignore +public class ProduceToAMQPTest extends AmqpTestSupport { + + private final int MSG_COUNT = 50 * 1000; + private final int NUM_RUNS = 20; + + @Override + protected boolean isForceAsyncSends() { + return true; + } + + @Override + protected boolean isAlwaysSyncSend() { + return false; + } + + @Override + protected String getAmqpTransformer() { + return "raw"; + } + + @Override + public String getAmqpConnectionURIOptions() { + return "provider.presettle=true"; + } + + @Test + public void singleSendProfile() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getDestinationName()); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + producer.close(); + } + + @Test + public void testProduceRateToTopic() throws Exception { + + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getDestinationName()); + + // Warm Up the broker. + produceMessages(topic, MSG_COUNT); + + List<Long> sendTimes = new ArrayList<Long>(); + long cumulative = 0; + + for (int i = 0; i < NUM_RUNS; ++i) { + long result = produceMessages(topic, MSG_COUNT); + sendTimes.add(result); + cumulative += result; + LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result); + } + + long smoothed = cumulative / NUM_RUNS; + LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed); + TimeUnit.SECONDS.sleep(1); + } + + @Test + public void testProduceRateToQueue() throws Exception { + + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + // Warm Up the broker. + produceMessages(queue, MSG_COUNT); + + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + queueView.purge(); + + List<Long> sendTimes = new ArrayList<Long>(); + long cumulative = 0; + + for (int i = 0; i < NUM_RUNS; ++i) { + long result = produceMessages(queue, MSG_COUNT); + sendTimes.add(result); + cumulative += result; + LOG.info("Time to send {} queue messages: {} ms", MSG_COUNT, result); + queueView.purge(); + } + + long smoothed = cumulative / NUM_RUNS; + LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed); + TimeUnit.SECONDS.sleep(1); + } + + protected long produceMessages(Destination destination, int msgCount) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < msgCount; ++i) { + producer.send(message); + } + long result = (System.currentTimeMillis() - startTime); + + producer.close(); + return result; + } + + @Override + protected void configureBrokerPolicies(BrokerService broker) { + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policyEntry.setPrioritizedMessages(false); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setEnableAudit(false); + policyEntry.setOptimizedDispatch(true); + policyEntry.setQueuePrefetch(100); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java new file mode 100644 index 0000000..3fa8fd5 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProduceToOpenWireTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.bench; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Ignore +public class ProduceToOpenWireTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(ProduceToOpenWireTest.class); + + private final int MSG_COUNT = 50 * 1000; + private final int NUM_RUNS = 40; + + @Test + public void singleSendProfile() throws Exception { + connection = createActiveMQConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getDestinationName()); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + } + + @Test + public void testProduceRateToTopic() throws Exception { + + connection = createActiveMQConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getDestinationName()); + + // Warm Up the broker. + produceMessages(topic, MSG_COUNT); + + List<Long> sendTimes = new ArrayList<Long>(); + long cumulative = 0; + + for (int i = 0; i < NUM_RUNS; ++i) { + long result = produceMessages(topic, MSG_COUNT); + sendTimes.add(result); + cumulative += result; + LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result); + } + + long smoothed = cumulative / NUM_RUNS; + LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed); + } + + @Test + public void testProduceRateToQueue() throws Exception { + + connection = createActiveMQConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + // Warm Up the broker. + produceMessages(queue, MSG_COUNT); + + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + queueView.purge(); + + List<Long> sendTimes = new ArrayList<Long>(); + long cumulative = 0; + + for (int i = 0; i < NUM_RUNS; ++i) { + long result = produceMessages(queue, MSG_COUNT); + sendTimes.add(result); + cumulative += result; + LOG.info("Time to send {} queue messages: {} ms", MSG_COUNT, result); + queueView.purge(); + } + + long smoothed = cumulative / NUM_RUNS; + LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed); + } + + protected long produceMessages(Destination destination, int msgCount) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < msgCount; ++i) { + producer.send(message); + } + + long result = (System.currentTimeMillis() - startTime); + + producer.close(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java new file mode 100644 index 0000000..0dacc84 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/bench/ProducerAndConsumerBench.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.bench; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +@Ignore +public class ProducerAndConsumerBench extends AmqpTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ProducerAndConsumerBench.class); + + public static final int payload = 64 * 1024; + public static final int ioBuffer = 2 * payload; + public static final int socketBuffer = 64 * payload; + + private final String payloadString = new String(new byte[payload]); + private final int parallelProducer = 1; + private final int parallelConsumer = 1; + private final Vector<Throwable> exceptions = new Vector<Throwable>(); + private ConnectionFactory factory; + + private final long NUM_SENDS = 30000; + + @Test + public void testProduceConsume() throws Exception { + this.factory = createAmqpConnectionFactory(); + + final AtomicLong sharedSendCount = new AtomicLong(NUM_SENDS); + final AtomicLong sharedReceiveCount = new AtomicLong(NUM_SENDS); + + Thread.sleep(2000); + + long start = System.currentTimeMillis(); + ExecutorService executorService = Executors.newFixedThreadPool(parallelConsumer + parallelProducer); + + for (int i = 0; i < parallelConsumer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + consumeMessages(sharedReceiveCount); + } catch (Throwable e) { + exceptions.add(e); + } + } + }); + } + for (int i = 0; i < parallelProducer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + publishMessages(sharedSendCount); + } catch (Throwable e) { + exceptions.add(e); + } + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.MINUTES); + assertTrue("Producers done in time", executorService.isTerminated()); + assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); + + double duration = System.currentTimeMillis() - start; + LOG.info("Duration: " + duration + "ms"); + LOG.info("Rate: " + (NUM_SENDS * 1000 / duration) + "m/s"); + } + + private void consumeMessages(AtomicLong count) throws Exception { + JmsConnection connection = (JmsConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageConsumer consumer = session.createConsumer(queue); + long v; + while ((v = count.decrementAndGet()) > 0) { + assertNotNull("got message " + v, consumer.receive(15000)); + } + consumer.close(); + } + + private void publishMessages(AtomicLong count) throws Exception { + JmsConnection connection = (JmsConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + Message message = session.createBytesMessage(); + ((BytesMessage) message).writeBytes(payloadString.getBytes()); + + while (count.getAndDecrement() > 0) { + producer.send(message); + } + producer.close(); + connection.close(); + } + + @Override + protected void configureBrokerPolicies(BrokerService broker) { + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policyEntry.setPrioritizedMessages(false); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setEnableAudit(false); + policyEntry.setOptimizedDispatch(true); + policyEntry.setQueuePrefetch(1); // ensure no contention on add with + // matched producer/consumer + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + } + + @Override + protected boolean isForceAsyncSends() { + return true; + } + + @Override + protected boolean isAlwaysSyncSend() { + return false; + } + + @Override + protected String getAmqpTransformer() { + return "raw"; + } + + @Override + protected boolean isMessagePrioritySupported() { + return false; + } + + @Override + protected boolean isSendAcksAsync() { + return true; + } + + @Override + public String getAmqpConnectionURIOptions() { + return "provider.presettleProducers=true&provider.presettleConsumers=true"; + } + + @Override + protected int getSocketBufferSize() { + return socketBuffer; + } + + @Override + protected int getIOBufferSize() { + return ioBuffer; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java new file mode 100644 index 0000000..db2a104 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsAutoAckTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsAutoAckTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsAutoAckTest.class); + + @Test(timeout = 60000) + public void testAckedMessageAreConsumed() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + sendToAmqQueue(1); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + assertNotNull("Failed to receive any message.", consumer.receive(2000)); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testAckedMessageAreConsumedAsync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + sendToAmqQueue(1); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + LOG.debug("Received async message: {}", message); + } + }); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
