http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java new file mode 100644 index 0000000..d4cecab --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.spring.SpringConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ZeroPrefetchConsumerTest.class); + + protected Connection connection; + protected Queue queue; + protected Queue brokerZeroQueue = new ActiveMQQueue("brokerZeroConfig"); + + public void testCannotUseMessageListener() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + MessageListener listener = new SpringConsumer(); + try { + consumer.setMessageListener(listener); + fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch"); + } catch (JMSException e) { + LOG.info("Received expected exception : " + e); + } + } + + public void testPullConsumerWorks() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello World!")); + + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receive(5000); + assertNotNull("Should have received a message!", answer); + // check if method will return at all and will return a null + answer = consumer.receive(1); + assertNull("Should have not received a message!", answer); + answer = consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + public void testIdleConsumer() throws Exception { + doTestIdleConsumer(false); + } + + public void testIdleConsumerTranscated() throws Exception { + doTestIdleConsumer(true); + } + + private void doTestIdleConsumer(boolean transacted) throws Exception { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + if (transacted) { + session.commit(); + } + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + + session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + if (transacted) { + session.commit(); + } + // this call would return null if prefetchSize > 0 + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + public void testRecvRecvCommit() throws Exception { + doTestRecvRecvCommit(false); + } + + public void testRecvRecvCommitTranscated() throws Exception { + doTestRecvRecvCommit(true); + } + + private void doTestRecvRecvCommit(boolean transacted) throws Exception { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + if (transacted) { + session.commit(); + } + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer.receiveNoWait(); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + answer = (TextMessage)consumer.receiveNoWait(); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + public void testTwoConsumers() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + + // now lets receive it + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer1.receiveNoWait(); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + answer = (TextMessage)consumer2.receiveNoWait(); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + + answer = (TextMessage)consumer2.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + // https://issues.apache.org/activemq/browse/AMQ-2567 + public void testManyMessageConsumer() throws Exception { + doTestManyMessageConsumer(true); + } + + public void testManyMessageConsumerNoTransaction() throws Exception { + doTestManyMessageConsumer(false); + } + + private void doTestManyMessageConsumer(boolean transacted) throws Exception { + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + producer.send(session.createTextMessage("Msg3")); + producer.send(session.createTextMessage("Msg4")); + producer.send(session.createTextMessage("Msg5")); + producer.send(session.createTextMessage("Msg6")); + producer.send(session.createTextMessage("Msg7")); + producer.send(session.createTextMessage("Msg8")); + if (transacted) { + session.commit(); + } + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg3"); + if (transacted) { + session.commit(); + } + // this call would return null if prefetchSize > 0 + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg4"); + if (transacted) { + session.commit(); + } + // Now using other consumer + // this call should return the next message (Msg5) still left on the queue + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg5"); + if (transacted) { + session.commit(); + } + // Now using other consumer + // this call should return the next message still left on the queue + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg6"); + // read one more message without commit + // this call should return the next message still left on the queue + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg7"); + if (transacted) { + session.commit(); + } + // Now using other consumer + // this call should return the next message (Msg5) still left on the queue + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg8"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + public void testManyMessageConsumerWithSend() throws Exception { + doTestManyMessageConsumerWithSend(true); + } + + public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception { + ((ActiveMQConnection)connection).setMessagePrioritySupported(true); + doTestManyMessageConsumerWithSend(true); + } + + public void testManyMessageConsumerWithSendNoTransaction() throws Exception { + doTestManyMessageConsumerWithSend(false); + } + + private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception { + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + producer.send(session.createTextMessage("Msg3")); + producer.send(session.createTextMessage("Msg4")); + producer.send(session.createTextMessage("Msg5")); + producer.send(session.createTextMessage("Msg6")); + producer.send(session.createTextMessage("Msg7")); + producer.send(session.createTextMessage("Msg8")); + if (transacted) { + session.commit(); + } + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg3"); + if (transacted) { + session.commit(); + } + // Now using other consumer take 2 + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg4"); + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg5"); + + // ensure prefetch extension ok by sending another that could get dispatched + producer.send(session.createTextMessage("Msg9")); + if (transacted) { + session.commit(); + } + + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg6"); + // read one more message without commit + // and using other consumer + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg7"); + if (transacted) { + session.commit(); + } + + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg8"); + if (transacted) { + session.commit(); + } + + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg9"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + // https://issues.apache.org/jira/browse/AMQ-4224 + public void testBrokerZeroPrefetchConfig() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(brokerZeroQueue); + producer.send(session.createTextMessage("Msg1")); + // now lets receive it + MessageConsumer consumer = session.createConsumer(brokerZeroQueue); + + TextMessage answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + } + + // https://issues.apache.org/jira/browse/AMQ-4234 + // https://issues.apache.org/jira/browse/AMQ-4235 + public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue); + assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); + + // verify sub view broker + Subscription sub = + broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0); + assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); + + // manipulate Prefetch (like failover and stomp) + ConsumerControl consumerControl = new ConsumerControl(); + consumerControl.setConsumerId(consumer.info.getConsumerId()); + consumerControl.setDestination(ActiveMQDestination.transform(brokerZeroQueue)); + consumerControl.setPrefetch(1000); // default for a q + + Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl); + assertTrue("good request", !(reply instanceof ExceptionResponse)); + assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); + assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService brokerService = super.createBroker(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry zeroPrefetchPolicy = new PolicyEntry(); + zeroPrefetchPolicy.setQueuePrefetch(0); + policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy); + brokerService.setDestinationPolicy(policyMap); + return brokerService; + } + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + super.setUp(); + + connection = createConnection(); + connection.start(); + queue = createQueue(); + } + + @Override + protected void startBroker() throws Exception { + super.startBroker(); + bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString(); + } + + @Override + protected void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + protected Queue createQueue() { + return new ActiveMQQueue(getDestinationString() + "?consumer.prefetchSize=0"); + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java new file mode 100644 index 0000000..5e20f79 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.Topic; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; + +public class AdvisoryTempDestinationTests extends TestCase { + + protected static final int MESSAGE_COUNT = 2000; + protected BrokerService broker; + protected Connection connection; + protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + protected int topicCount; + + + public void testNoSlowConsumerAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = s.createTemporaryQueue(); + MessageConsumer consumer = s.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + Topic advisoryTopic = AdvisorySupport + .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + Message msg = advisoryConsumer.receive(1000); + assertNull(msg); + } + + public void testSlowConsumerAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = s.createTemporaryQueue(); + MessageConsumer consumer = s.createConsumer(queue); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport + .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + } + + public void testMessageDeliveryAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = s.createTemporaryQueue(); + MessageConsumer consumer = s.createConsumer(queue); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + } + + public void testTempMessageConsumedAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = s.createTemporaryQueue(); + MessageConsumer consumer = s.createConsumer(queue); + + Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + String id = m.getJMSMessageID(); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + String originalId = payload.getJMSMessageID(); + assertEquals(originalId, id); + } + + public void testMessageExpiredAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + producer.setTimeToLive(1); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + + Message msg = advisoryConsumer.receive(5000); + assertNotNull(msg); + } + + @Override + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + ConnectionFactory factory = createConnectionFactory(); + connection = factory.createConnection(); + connection.start(); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + connection.close(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); + strategy.setLimit(10); + PolicyEntry tempQueueEntry = createPolicyEntry(strategy); + tempQueueEntry.setTempQueue(true); + PolicyEntry tempTopicEntry = createPolicyEntry(strategy); + tempTopicEntry.setTempTopic(true); + + PolicyMap pMap = new PolicyMap(); + final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); + policyEntries.add(tempQueueEntry); + policyEntries.add(tempTopicEntry); + pMap.setPolicyEntries(policyEntries); + + answer.setDestinationPolicy(pMap); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } + + private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) { + PolicyEntry policy = new PolicyEntry(); + policy.setAdvisoryForFastProducers(true); + policy.setAdvisoryForConsumed(true); + policy.setAdvisoryForDelivery(true); + policy.setAdvisoryForDiscardingMessages(true); + policy.setAdvisoryForSlowConsumers(true); + policy.setAdvisoryWhenFull(true); + policy.setProducerFlowControl(false); + policy.setPendingMessageLimitStrategy(strategy); + + return policy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java new file mode 100644 index 0000000..4bb9053 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; + +/** + * + */ +public class AdvisoryTests extends TestCase { + protected static final int MESSAGE_COUNT = 2000; + protected BrokerService broker; + protected Connection connection; + protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + protected int topicCount; + + + public void testNoSlowConsumerAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + Topic advisoryTopic = AdvisorySupport + .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + Message msg = advisoryConsumer.receive(1000); + assertNull(msg); + } + + public void testSlowConsumerAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport + .getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue); + s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + // start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + } + + public void testMessageDeliveryAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + } + + public void testMessageConsumedAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + + Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + String id = m.getJMSMessageID(); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + + ActiveMQMessage message = (ActiveMQMessage) msg; + ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure(); + String originalId = payload.getJMSMessageID(); + assertEquals(originalId, id); + } + + public void testMessageExpiredAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(queue); + producer.setTimeToLive(1); + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = s.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + + Message msg = advisoryConsumer.receive(2000); + assertNotNull(msg); + } + + public void xtestMessageDiscardedAdvisory() throws Exception { + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = s.createTopic(getClass().getName()); + MessageConsumer consumer = s.createConsumer(topic); + assertNotNull(consumer); + + Topic advisoryTopic = AdvisorySupport.getMessageDiscardedAdvisoryTopic((ActiveMQDestination) topic); + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + //start throwing messages at the consumer + MessageProducer producer = s.createProducer(topic); + int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); + for (int i = 0; i < count; i++) { + BytesMessage m = s.createBytesMessage(); + producer.send(m); + } + + Message msg = advisoryConsumer.receive(1000); + assertNotNull(msg); + } + + @Override + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + ConnectionFactory factory = createConnectionFactory(); + connection = factory.createConnection(); + connection.start(); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + connection.close(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + PolicyEntry policy = new PolicyEntry(); + policy.setAdvisoryForFastProducers(true); + policy.setAdvisoryForConsumed(true); + policy.setAdvisoryForDelivery(true); + policy.setAdvisoryForDiscardingMessages(true); + policy.setAdvisoryForSlowConsumers(true); + policy.setAdvisoryWhenFull(true); + policy.setProducerFlowControl(false); + ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); + strategy.setLimit(10); + policy.setPendingMessageLimitStrategy(strategy); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + answer.setDestinationPolicy(pMap); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java new file mode 100644 index 0000000..2c5f9cd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener { + private static final Logger LOG = LoggerFactory.getLogger(ConsumerListenerTest.class); + + protected Session consumerSession1; + protected Session consumerSession2; + protected int consumerCounter; + protected ConsumerEventSource consumerEventSource; + protected BlockingQueue<ConsumerEvent> eventQueue = new ArrayBlockingQueue<ConsumerEvent>(1000); + private Connection connection; + + public void testConsumerEvents() throws Exception { + consumerEventSource.start(); + + consumerSession1 = createConsumer(); + assertConsumerEvent(1, true); + + consumerSession2 = createConsumer(); + assertConsumerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertConsumerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertConsumerEvent(0, false); + } + + public void testListenWhileAlreadyConsumersActive() throws Exception { + consumerSession1 = createConsumer(); + consumerSession2 = createConsumer(); + + consumerEventSource.start(); + assertConsumerEvent(2, true); + assertConsumerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertConsumerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertConsumerEvent(0, false); + } + + public void testConsumerEventsOnTemporaryDestination() throws Exception { + + Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); + Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue(); + consumerEventSource = new ConsumerEventSource(connection, dest); + consumerEventSource.setConsumerListener(this); + consumerEventSource.start(); + MessageConsumer consumer = s.createConsumer(dest); + assertConsumerEvent(1,true); + consumer.close(); + assertConsumerEvent(0,false); + } + + public void onConsumerEvent(ConsumerEvent event) { + eventQueue.add(event); + } + + protected void setUp() throws Exception { + super.setUp(); + + connection = createConnection(); + connection.start(); + consumerEventSource = new ConsumerEventSource(connection, destination); + consumerEventSource.setConsumerListener(this); + } + + protected void tearDown() throws Exception { + if (consumerEventSource != null) { + consumerEventSource.stop(); + } + if (consumerSession2 != null) { + consumerSession2.close(); + } + if (consumerSession1 != null) { + consumerSession1.close(); + } + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { + ConsumerEvent event = waitForConsumerEvent(); + assertEquals("Consumer count", count, event.getConsumerCount()); + assertEquals("started", started, event.isStarted()); + } + + protected Session createConsumer() throws JMSException { + final String consumerText = "Consumer: " + (++consumerCounter); + LOG.info("Creating consumer: " + consumerText + " on destination: " + destination); + + Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = answer.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + LOG.info("Received message by: " + consumerText + " message: " + message); + } + }); + return answer; + } + + protected ConsumerEvent waitForConsumerEvent() throws InterruptedException { + ConsumerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS); + assertTrue("Should have received a consumer event!", answer != null); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java new file mode 100644 index 0000000..01dc443 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/DestinationListenerTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class DestinationListenerTest extends EmbeddedBrokerTestSupport implements DestinationListener { + private static final transient Logger LOG = LoggerFactory.getLogger(DestinationListenerTest.class); + protected ActiveMQConnection connection; + protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar"); + protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese"); + protected List<ActiveMQDestination> newDestinations = new ArrayList<ActiveMQDestination>(); + + public void testDestiationSourceHasInitialDestinations() throws Exception { + Thread.sleep(1000); + + DestinationSource destinationSource = connection.getDestinationSource(); + Set<ActiveMQQueue> queues = destinationSource.getQueues(); + Set<ActiveMQTopic> topics = destinationSource.getTopics(); + + LOG.info("Queues: " + queues); + LOG.info("Topics: " + topics); + + assertTrue("The queues should not be empty!", !queues.isEmpty()); + assertTrue("The topics should not be empty!", !topics.isEmpty()); + + assertTrue("queues contains initial queue: " + queues, queues.contains(sampleQueue)); + assertTrue("topics contains initial topic: " + queues, topics.contains(sampleTopic)); + } + + public void testConsumerForcesNotificationOfNewDestination() throws Exception { + // now lets cause a destination to be created + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue newQueue = new ActiveMQQueue("Test.Cheese"); + session.createConsumer(newQueue); + + Thread.sleep(3000); + + assertThat(newQueue, isIn(newDestinations)); + + LOG.info("New destinations are: " + newDestinations); + } + + public void testProducerForcesNotificationOfNewDestination() throws Exception { + // now lets cause a destination to be created + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue newQueue = new ActiveMQQueue("Test.Beer"); + MessageProducer producer = session.createProducer(newQueue); + TextMessage message = session.createTextMessage("<hello>world</hello>"); + producer.send(message); + + Thread.sleep(3000); + + assertThat(newQueue, isIn(newDestinations)); + + LOG.info("New destinations are: " + newDestinations); + } + + public void onDestinationEvent(DestinationEvent event) { + ActiveMQDestination destination = event.getDestination(); + if (event.isAddOperation()) { + LOG.info("Added: " + destination); + newDestinations.add(destination); + } + else { + LOG.info("Removed: " + destination); + newDestinations.remove(destination); + } + } + + protected void setUp() throws Exception { + super.setUp(); + + connection = (ActiveMQConnection) createConnection(); + connection.start(); + connection.getDestinationSource().setDestinationListener(this); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setDestinations(new ActiveMQDestination[]{ + sampleQueue, + sampleTopic + }); + return broker; + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java new file mode 100644 index 0000000..dfa1b5e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener { + private static final Logger LOG = LoggerFactory.getLogger(ProducerListenerTest.class); + + protected Session consumerSession1; + protected Session consumerSession2; + protected int consumerCounter; + protected ProducerEventSource producerEventSource; + protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue<ProducerEvent>(1000); + private Connection connection; + + public void testProducerEvents() throws Exception { + producerEventSource.start(); + + consumerSession1 = createProducer(); + assertProducerEvent(1, true); + + consumerSession2 = createProducer(); + assertProducerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertProducerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertProducerEvent(0, false); + } + + public void testListenWhileAlreadyConsumersActive() throws Exception { + consumerSession1 = createProducer(); + consumerSession2 = createProducer(); + + producerEventSource.start(); + assertProducerEvent(2, true); + assertProducerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertProducerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertProducerEvent(0, false); + } + + public void testConsumerEventsOnTemporaryDestination() throws Exception { + + Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); + Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue(); + producerEventSource = new ProducerEventSource(connection, dest); + producerEventSource.setProducerListener(this); + producerEventSource.start(); + MessageProducer producer = s.createProducer(dest); + assertProducerEvent(1, true); + producer.close(); + assertProducerEvent(0, false); + } + + + + @Override + public void onProducerEvent(ProducerEvent event) { + eventQueue.add(event); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + connection = createConnection(); + connection.start(); + producerEventSource = new ProducerEventSource(connection, destination); + producerEventSource.setProducerListener(this); + } + + @Override + protected void tearDown() throws Exception { + if (producerEventSource != null) { + producerEventSource.stop(); + } + if (consumerSession2 != null) { + consumerSession2.close(); + } + if (consumerSession1 != null) { + consumerSession1.close(); + } + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + protected void assertProducerEvent(int count, boolean started) throws InterruptedException { + ProducerEvent event = waitForProducerEvent(); + assertEquals("Producer count", count, event.getProducerCount()); + assertEquals("started", started, event.isStarted()); + } + + protected Session createProducer() throws JMSException { + final String consumerText = "Consumer: " + (++consumerCounter); + LOG.info("Creating consumer: " + consumerText + " on destination: " + destination); + + Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = answer.createProducer(destination); + assertNotNull(producer); + + return answer; + } + + protected ProducerEvent waitForProducerEvent() throws InterruptedException { + ProducerEvent answer = eventQueue.poll(100000, TimeUnit.MILLISECONDS); + assertTrue("Should have received a consumer event!", answer != null); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java new file mode 100644 index 0000000..123c778 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestDeleteTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class TempDestDeleteTest extends EmbeddedBrokerTestSupport implements ConsumerListener { + private static final Logger LOG = LoggerFactory.getLogger(TempDestDeleteTest.class); + + protected int consumerCounter; + protected ConsumerEventSource topicConsumerEventSource; + protected BlockingQueue<ConsumerEvent> eventQueue = new ArrayBlockingQueue<ConsumerEvent>(1000); + + private ConsumerEventSource queueConsumerEventSource; + private Connection connection; + private Session session; + private ActiveMQTempTopic tempTopic; + private ActiveMQTempQueue tempQueue; + + public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception { + topicConsumerEventSource.start(); + + MessageConsumer consumer = createConsumer(tempTopic); + assertConsumerEvent(1, true); + + Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic); + assertTrue(destinationExists(advisoryTopic)); + + consumer.close(); + + // Once we delete the topic, the advisory topic for the destination + // should also be deleted. + tempTopic.delete(); + + assertFalse(destinationExists(advisoryTopic)); + } + + public void testDeleteTempQueueDeletesAvisoryTopics() throws Exception { + queueConsumerEventSource.start(); + + MessageConsumer consumer = createConsumer(tempQueue); + assertConsumerEvent(1, true); + + Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempQueue); + assertTrue(destinationExists(advisoryTopic)); + + consumer.close(); + + // Once we delete the queue, the advisory topic for the destination + // should also be deleted. + tempQueue.delete(); + + assertFalse(destinationExists(advisoryTopic)); + } + + private boolean destinationExists(Destination dest) throws Exception { + RegionBroker rb = (RegionBroker)broker.getBroker().getAdaptor(RegionBroker.class); + return rb.getTopicRegion().getDestinationMap().containsKey(dest) || rb.getQueueRegion().getDestinationMap().containsKey(dest) + || rb.getTempTopicRegion().getDestinationMap().containsKey(dest) || rb.getTempQueueRegion().getDestinationMap().containsKey(dest); + } + + public void onConsumerEvent(ConsumerEvent event) { + eventQueue.add(event); + } + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + tempTopic = (ActiveMQTempTopic)session.createTemporaryTopic(); + topicConsumerEventSource = new ConsumerEventSource(connection, tempTopic); + topicConsumerEventSource.setConsumerListener(this); + + tempQueue = (ActiveMQTempQueue)session.createTemporaryQueue(); + queueConsumerEventSource = new ConsumerEventSource(connection, tempQueue); + queueConsumerEventSource.setConsumerListener(this); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { + ConsumerEvent event = waitForConsumerEvent(); + assertEquals("Consumer count", count, event.getConsumerCount()); + assertEquals("started", started, event.isStarted()); + } + + protected MessageConsumer createConsumer(Destination dest) throws JMSException { + final String consumerText = "Consumer: " + (++consumerCounter); + LOG.info("Creating consumer: " + consumerText + " on destination: " + dest); + + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + LOG.info("Received message by: " + consumerText + " message: " + message); + } + }); + return consumer; + } + + protected ConsumerEvent waitForConsumerEvent() throws InterruptedException { + ConsumerEvent answer = eventQueue.poll(1000, TimeUnit.MILLISECONDS); + assertTrue("Should have received a consumer event!", answer != null); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java new file mode 100644 index 0000000..cab4e59 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempDestLoadTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class TempDestLoadTest extends EmbeddedBrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(TempDestLoadTest.class); + + protected int consumerCounter; + private Connection connection; + private Session session; + private static final int MESSAGE_COUNT = 2000; + + public void testLoadTempAdvisoryQueues() throws Exception { + + for (int i = 0; i < MESSAGE_COUNT; i++) { + TemporaryQueue tempQueue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(tempQueue); + MessageProducer producer = session.createProducer(tempQueue); + consumer.close(); + producer.close(); + tempQueue.delete(); + } + + AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor( + AdvisoryBroker.class); + + assertTrue(ab.getAdvisoryDestinations().size() == 0); + assertTrue(ab.getAdvisoryConsumers().size() == 0); + assertTrue(ab.getAdvisoryProducers().size() == 0); + + RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class); + + for (Destination dest : rb.getDestinationMap().values()) { + LOG.debug("Destination: {}", dest); + } + + // there should be at least 2 destinations - advisories - + // 1 for the connection + 1 generic ones + assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2); + } + + public void testLoadTempAdvisoryTopics() throws Exception { + for (int i = 0; i < MESSAGE_COUNT; i++) { + TemporaryTopic tempTopic = session.createTemporaryTopic(); + MessageConsumer consumer = session.createConsumer(tempTopic); + MessageProducer producer = session.createProducer(tempTopic); + consumer.close(); + producer.close(); + tempTopic.delete(); + } + + AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor( + AdvisoryBroker.class); + assertTrue(ab.getAdvisoryDestinations().size() == 0); + assertTrue(ab.getAdvisoryConsumers().size() == 0); + assertTrue(ab.getAdvisoryProducers().size() == 0); + RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor( + RegionBroker.class); + + for (Destination dest : rb.getDestinationMap().values()) { + LOG.debug("Destination: {}", dest); + } + + // there should be at least 2 destinations - advisories - + // 1 for the connection + 1 generic ones + assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java new file mode 100644 index 0000000..9bf8ed1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.Vector; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; + +public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { + + protected Connection serverConnection; + protected Session serverSession; + protected Connection clientConnection; + protected Session clientSession; + protected Destination serverDestination; + protected int messagesToSend = 10; + protected boolean deleteTempQueue = true; + protected boolean serverTransactional = false; + protected boolean clientTransactional = false; + protected int numConsumers = 1; + protected int numProducers = 1; + + public void testConcurrentProducerRequestReply() throws Exception { + numProducers = 10; + testLoadRequestReply(); + } + + public void testLoadRequestReply() throws Exception { + for (int i = 0; i < numConsumers; i++) { + serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() { + @Override + public void onMessage(Message msg) { + try { + Destination replyTo = msg.getJMSReplyTo(); + MessageProducer producer = serverSession.createProducer(replyTo); + producer.send(replyTo, msg); + if (serverTransactional) { + serverSession.commit(); + } + producer.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + class Producer extends Thread { + private final int numToSend; + + public Producer(int numToSend) { + this.numToSend = numToSend; + } + + @Override + public void run() { + try { + Session session = clientConnection.createSession(clientTransactional, clientTransactional ? + Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(serverDestination); + + for (int i = 0; i < numToSend; i++) { + TemporaryQueue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Message msg = session.createMessage(); + msg.setJMSReplyTo(replyTo); + producer.send(msg); + if (clientTransactional) { + session.commit(); + } + consumer.receive(); + if (clientTransactional) { + session.commit(); + } + consumer.close(); + if (deleteTempQueue) { + replyTo.delete(); + } else { + // temp queue will be cleaned up on clientConnection.close + } + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + Vector<Thread> threads = new Vector<Thread>(numProducers); + for (int i = 0; i < numProducers; i++) { + threads.add(new Producer(messagesToSend / numProducers)); + } + startAndJoinThreads(threads); + + clientSession.close(); + serverSession.close(); + clientConnection.close(); + serverConnection.close(); + + AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(AdvisoryBroker.class); + + // The server destination will be left + assertTrue(ab.getAdvisoryDestinations().size() == 1); + + assertTrue("should be zero but is " + ab.getAdvisoryConsumers().size(), ab.getAdvisoryConsumers().size() == 0); + assertTrue("should be zero but is " + ab.getAdvisoryProducers().size(), ab.getAdvisoryProducers().size() == 0); + + RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class); + + assertTrue(rb.getDestinationMap().size() >= 6); + } + + private void startAndJoinThreads(Vector<Thread> threads) throws Exception { + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + serverConnection = createConnection(); + serverConnection.start(); + serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + clientConnection = createConnection(); + clientConnection.start(); + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + serverDestination = createDestination(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + serverTransactional = clientTransactional = false; + numConsumers = numProducers = 1; + messagesToSend = 2000; + } + + @Override + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue(getClass().getName()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java new file mode 100644 index 0000000..4040569 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/BlobTransferPolicyUriTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.blob; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * + */ +public class BlobTransferPolicyUriTest extends TestCase { + public void testBlobTransferPolicyIsConfiguredViaUri() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=http://foo.com"); + BlobTransferPolicy policy = factory.getBlobTransferPolicy(); + assertEquals("http://foo.com", policy.getDefaultUploadUrl()); + assertEquals("http://foo.com", policy.getUploadUrl()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java new file mode 100644 index 0000000..0875a5b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobDownloadStrategyTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.blob; + +import java.io.File; +import java.io.FileWriter; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBlobMessage; + +public class FTPBlobDownloadStrategyTest extends FTPTestSupport { + + final int FILE_SIZE = Short.MAX_VALUE * 10; + + public void testDownload() throws Exception { + setConnection(); + + // create file + File uploadFile = new File(ftpHomeDirFile, "test.txt"); + FileWriter wrt = new FileWriter(uploadFile); + + wrt.write("hello world"); + + for(int ix = 0; ix < FILE_SIZE; ++ix ) { + wrt.write("a"); + } + + wrt.close(); + + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy()); + InputStream stream; + try { + message.setURL(new URL(ftpUrl + "test.txt")); + stream = strategy.getInputStream(message); + int i = stream.read(); + StringBuilder sb = new StringBuilder(2048); + while(i != -1) { + sb.append((char)i); + i = stream.read(); + } + assertEquals("hello world", sb.toString().substring(0, "hello world".length())); + assertEquals(FILE_SIZE, sb.toString().substring("hello world".length()).length()); + + assertTrue(uploadFile.exists()); + strategy.deleteFile(message); + assertFalse(uploadFile.exists()); + + } catch (Exception e) { + e.printStackTrace(); + assertTrue(false); + } + } + + public void testWrongAuthentification() throws MalformedURLException { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy()); + try { + message.setURL(new URL("ftp://" + userNamePass + "_wrong:" + userNamePass + "@localhost:" + ftpPort + "/ftptest/")); + strategy.getInputStream(message); + } catch(JMSException e) { + assertEquals("Wrong Exception", "Cant Authentificate to FTP-Server", e.getMessage()); + return; + } catch(Exception e) { + System.out.println(e); + assertTrue("Wrong Exception "+ e, false); + return; + } + + assertTrue("Expect Exception", false); + } + + public void testWrongFTPPort() throws MalformedURLException { + ActiveMQBlobMessage message = new ActiveMQBlobMessage(); + BlobDownloadStrategy strategy = new FTPBlobDownloadStrategy(new BlobTransferPolicy()); + try { + message.setURL(new URL("ftp://" + userNamePass + ":" + userNamePass + "@localhost:" + 422 + "/ftptest/")); + strategy.getInputStream(message); + } catch(JMSException e) { + assertEquals("Wrong Exception", "Problem connecting the FTP-server", e.getMessage()); + return; + } catch(Exception e) { + e.printStackTrace(); + assertTrue("Wrong Exception "+ e, false); + return; + } + + assertTrue("Expect Exception", false); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java new file mode 100644 index 0000000..4aecc09 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.blob; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.InputStream; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.BlobMessage; +import org.apache.activemq.command.ActiveMQBlobMessage; + +public class FTPBlobTest extends FTPTestSupport { + + public void testBlobFile() throws Exception { + setConnection(); + // first create Message + File file = File.createTempFile("amq-data-file-", ".dat"); + // lets write some data + String content = "hello world " + System.currentTimeMillis(); + BufferedWriter writer = new BufferedWriter(new FileWriter(file)); + writer.append(content); + writer.close(); + + ActiveMQSession session = (ActiveMQSession) connection.createSession( + false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + BlobMessage message = session.createBlobMessage(file); + message.setName("fileName"); + + producer.send(message); + Thread.sleep(1000); + + // check message send + Message msg = consumer.receive(1000); + assertTrue(msg instanceof ActiveMQBlobMessage); + + assertEquals("name is correct", "fileName", ((ActiveMQBlobMessage)msg).getName()); + InputStream input = ((ActiveMQBlobMessage) msg).getInputStream(); + StringBuilder b = new StringBuilder(); + int i = input.read(); + while (i != -1) { + b.append((char) i); + i = input.read(); + } + input.close(); + File uploaded = new File(ftpHomeDirFile, msg.getJMSMessageID().toString().replace(":", "_")); + assertEquals(content, b.toString()); + assertTrue(uploaded.exists()); + ((ActiveMQBlobMessage)msg).deleteFile(); + assertFalse(uploaded.exists()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java new file mode 100644 index 0000000..cac9a0a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPBlobUploadStrategyTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.blob; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; + +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.command.ActiveMQBlobMessage; + + +public class FTPBlobUploadStrategyTest extends FTPTestSupport { + + public void testFileUpload() throws Exception { + setConnection(); + File file = File.createTempFile("amq-data-file-", ".dat"); + // lets write some data + BufferedWriter writer = new BufferedWriter(new FileWriter(file)); + writer.append("hello world"); + writer.close(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ((ActiveMQConnection)connection).setCopyMessageOnSend(false); + + ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file); + message.setJMSMessageID("testmessage"); + message.onSend(); + assertEquals(ftpUrl + "ID_testmessage", message.getURL().toString()); + File uploaded = new File(ftpHomeDirFile, "ID_testmessage"); + assertTrue("File doesn't exists", uploaded.exists()); + } + + public void testWriteDenied() throws Exception { + userNamePass = "guest"; + setConnection(); + File file = File.createTempFile("amq-data-file-", ".dat"); + // lets write some data + BufferedWriter writer = new BufferedWriter(new FileWriter(file)); + writer.append("hello world"); + writer.close(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ((ActiveMQConnection)connection).setCopyMessageOnSend(false); + + ActiveMQBlobMessage message = (ActiveMQBlobMessage) ((ActiveMQSession)session).createBlobMessage(file); + message.setJMSMessageID("testmessage"); + try { + message.onSend(); + } catch (JMSException e) { + e.printStackTrace(); + return; + } + fail("Should have failed with permission denied exception!"); + } + +}
