Author: rajdavies
Date: Tue Dec 26 08:14:14 2006
New Revision: 490319

URL: http://svn.apache.org/viewvc?view=rev&rev=490319
Log:
uses cases submitted by Brian Diesenhaus

Added:
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
   (with props)
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
   (with props)

Added: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java?view=auto&rev=490319
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
 Tue Dec 26 08:14:14 2006
@@ -0,0 +1,218 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQFailoverIssue extends TestCase{
+
+    private static final String URL1="tcp://localhost:61616";
+    private static final String QUEUE1_NAME="test.queue.1";
+    private static final int MAX_CONSUMERS=10;
+    private static final int MAX_PRODUCERS=5;
+    private static final int NUM_MESSAGE_TO_SEND=10000;
+    private static final int TOTAL_MESSAGES=MAX_PRODUCERS * 
NUM_MESSAGE_TO_SEND;
+    private static final boolean USE_FAILOVER=true;
+    private AtomicInteger messageCount=new AtomicInteger();
+    private CountDownLatch doneLatch;
+
+    @Override public void setUp() throws Exception{
+    }
+
+    @Override public void tearDown() throws Exception{
+    }
+
+    // This should fail with incubator-activemq-fuse-4.1.0.5
+    public void testFailoverIssue() throws Exception{
+        BrokerService brokerService1=null;
+        ActiveMQConnectionFactory acf=null;
+        PooledConnectionFactory pcf=null;
+        DefaultMessageListenerContainer container1=null;
+        try{
+            brokerService1=createBrokerService("broker1",URL1,null);
+            brokerService1.start();
+            acf=createConnectionFactory(URL1,USE_FAILOVER);
+            pcf=new PooledConnectionFactory(acf);
+            // Only listen on the first queue.. let the 2nd queue fill up.
+            doneLatch=new CountDownLatch(TOTAL_MESSAGES);
+            container1=createDefaultMessageListenerContainer(acf,new 
TestMessageListener1(0),QUEUE1_NAME);
+            container1.afterPropertiesSet();
+            Thread.sleep(5000);
+            final ExecutorService executor=Executors.newCachedThreadPool();
+            for(int i=0;i<MAX_PRODUCERS;i++){
+                executor.submit(new PooledProducerTask(pcf,QUEUE1_NAME));
+            }
+            // Wait for all message to arrive.
+            assertTrue(doneLatch.await(45,TimeUnit.SECONDS));
+            executor.shutdown();
+            // Thread.sleep(30000);
+            Assert.assertEquals(TOTAL_MESSAGES,messageCount.get());
+        }finally{
+            container1.stop();
+            container1.destroy();
+            container1=null;
+            brokerService1.stop();
+            brokerService1=null;
+        }
+    }
+
+    private BrokerService createBrokerService(final String brokerName,final 
String uri1,final String uri2)
+            throws Exception{
+        final BrokerService brokerService=new BrokerService();
+        brokerService.setBrokerName(brokerName);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        final UsageManager memoryManager=new UsageManager();
+        memoryManager.setLimit(5000000);
+        brokerService.setMemoryManager(memoryManager);
+        final ArrayList<PolicyEntry> policyEntries=new 
ArrayList<PolicyEntry>();
+        final PolicyEntry entry=new PolicyEntry();
+        entry.setQueue(">");
+        // entry.setQueue(QUEUE1_NAME);
+        entry.setMemoryLimit(1);
+        policyEntries.add(entry);
+        final PolicyMap policyMap=new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        brokerService.setDestinationPolicy(policyMap);
+        final TransportConnector tConnector=new TransportConnector();
+        tConnector.setUri(new URI(uri1));
+        tConnector.setBrokerName(brokerName);
+        tConnector.setName(brokerName+".transportConnector");
+        brokerService.addConnector(tConnector);
+        if(uri2!=null){
+            final NetworkConnector nc=new DiscoveryNetworkConnector(new 
URI("static:"+uri2));
+            nc.setBridgeTempDestinations(true);
+            nc.setBrokerName(brokerName);
+            nc.setName(brokerName+".nc");
+            nc.setPrefetchSize(1);
+            brokerService.addNetworkConnector(nc);
+        }
+        return brokerService;
+    }
+
+    public DefaultMessageListenerContainer 
createDefaultMessageListenerContainer(final ConnectionFactory acf,
+            final MessageListener listener,final String queue){
+        final DefaultMessageListenerContainer container=new 
DefaultMessageListenerContainer();
+        container.setConnectionFactory(acf);
+        container.setDestinationName(queue);
+        container.setMessageListener(listener);
+        container.setSessionTransacted(false);
+        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+        container.setConcurrentConsumers(MAX_CONSUMERS);
+        return container;
+    }
+
+    public ActiveMQConnectionFactory createConnectionFactory(final String 
url,final boolean useFailover){
+        final String failoverUrl="failover:("+url+")";
+        final ActiveMQConnectionFactory acf=new 
ActiveMQConnectionFactory(useFailover?failoverUrl:url);
+        acf.setCopyMessageOnSend(false);
+        acf.setUseAsyncSend(false);
+        acf.setDispatchAsync(true);
+        acf.setUseCompression(false);
+        acf.setOptimizeAcknowledge(false);
+        acf.setOptimizedMessageDispatch(true);
+        acf.setUseAsyncSend(false);
+        return acf;
+    }
+
+    private class TestMessageListener1 implements MessageListener{
+
+        private final long waitTime;
+
+        public TestMessageListener1(long waitTime){
+            this.waitTime=waitTime;
+        }
+
+        public void onMessage(Message msg){
+            try{
+                messageCount.incrementAndGet();
+                doneLatch.countDown();
+                Thread.sleep(waitTime);
+            }catch(InterruptedException e){
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class PooledProducerTask implements Runnable{
+
+        private final String queueName;
+        private final PooledConnectionFactory pcf;
+
+        public PooledProducerTask(final PooledConnectionFactory pcf,final 
String queueName){
+            this.pcf=pcf;
+            this.queueName=queueName;
+        }
+
+        public void run(){
+            try{
+                final JmsTemplate jmsTemplate=new JmsTemplate(pcf);
+                jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                jmsTemplate.setExplicitQosEnabled(true);
+                jmsTemplate.setMessageIdEnabled(false);
+                jmsTemplate.setMessageTimestampEnabled(false);
+                jmsTemplate.afterPropertiesSet();
+                final byte[] bytes=new byte[2048];
+                final Random r=new Random();
+                r.nextBytes(bytes);
+                Thread.sleep(2000);
+                final AtomicInteger count=new AtomicInteger();
+                for(int i=0;i<NUM_MESSAGE_TO_SEND;i++){
+                    jmsTemplate.send(queueName,new MessageCreator(){
+
+                        public Message createMessage(Session session) throws 
JMSException{
+                            final BytesMessage 
message=session.createBytesMessage();
+                            message.writeBytes(bytes);
+                            
message.setIntProperty("count",count.incrementAndGet());
+                            message.setStringProperty("producer","pooled");
+                            return message;
+                        }
+                    });
+                }
+            }catch(final Throwable e){
+                e.printStackTrace();
+            }
+        }
+    }
+}

Propchange: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java?view=auto&rev=490319
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
 Tue Dec 26 08:14:14 2006
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+
+public class AMQStackOverFlowTest extends TestCase {
+
+       private static final String URL1 = "tcp://localhost:61616";
+
+       private static final String URL2 = "tcp://localhost:61617";
+
+       public void testStackOverflow() throws Exception {
+               BrokerService brokerService1 = null;
+               BrokerService brokerService2 = null;
+
+               try {
+                       brokerService1 = createBrokerService("broker1", URL1, 
URL2);
+                       brokerService1.start();
+                       brokerService2 = createBrokerService("broker2", URL2, 
URL1);
+                       brokerService2.start();
+
+                       final ActiveMQConnectionFactory cf1 = new 
ActiveMQConnectionFactory(
+                                       URL1);
+                       cf1.setUseAsyncSend(false);
+
+                       final ActiveMQConnectionFactory cf2 = new 
ActiveMQConnectionFactory(
+                                       URL2);
+                       cf2.setUseAsyncSend(false);
+
+                       final JmsTemplate template1 = new JmsTemplate(cf1);
+                       template1.setReceiveTimeout(10000);
+
+                       template1.send("test.q", new MessageCreator() {
+
+                               public Message createMessage(Session session)
+                                               throws JMSException {
+                                       return 
session.createTextMessage("test");
+                               }
+
+                       });
+
+                       final JmsTemplate template2 = new JmsTemplate(cf2);
+                       template2.setReceiveTimeout(10000);
+
+                       final Message m = template2.receive("test.q");
+                       assertTrue(m instanceof TextMessage);
+               
+                       final TextMessage tm = (TextMessage) m;
+
+                       Assert.assertEquals("test", tm.getText());
+
+                       template2.send("test2.q", new MessageCreator() {
+
+                               public Message createMessage(Session session)
+                                               throws JMSException {
+                                       return 
session.createTextMessage("test2");
+                               }
+
+                       });
+
+                       final Message m2 = template1.receive("test2.q");
+
+            assertTrue(m2 instanceof TextMessage);
+
+                       final TextMessage tm2 = (TextMessage) m2;
+
+                       Assert.assertEquals("test2", tm2.getText());
+
+               } finally {
+
+                       brokerService1.stop();
+                       brokerService1 = null;
+                       brokerService2.stop();
+                       brokerService2 = null;
+
+               }
+
+       }
+
+       private BrokerService createBrokerService(final String brokerName,
+                       final String uri1, final String uri2) throws Exception {
+               final BrokerService brokerService = new BrokerService();
+
+               brokerService.setBrokerName(brokerName);
+               brokerService.setPersistent(false);
+               brokerService.setUseJmx(true);
+
+               final UsageManager memoryManager = new UsageManager();
+               memoryManager.setLimit(10);
+               brokerService.setMemoryManager(memoryManager);
+
+               final ArrayList<PolicyEntry> policyEntries = new 
ArrayList<PolicyEntry>();
+
+               final PolicyEntry entry = new PolicyEntry();
+               entry.setQueue(">");
+               entry.setMemoryLimit(1);
+               policyEntries.add(entry);
+
+               final PolicyMap policyMap = new PolicyMap();
+               policyMap.setPolicyEntries(policyEntries);
+               brokerService.setDestinationPolicy(policyMap);
+
+               final TransportConnector tConnector = new TransportConnector();
+               tConnector.setUri(new URI(uri1));
+               tConnector.setBrokerName(brokerName);
+               tConnector.setName(brokerName + ".transportConnector");
+               brokerService.addConnector(tConnector);
+
+               if (uri2 != null) {
+                       final NetworkConnector nc = new 
DiscoveryNetworkConnector(new URI(
+                                       "static:" + uri2));
+                       nc.setBridgeTempDestinations(true);
+                       nc.setBrokerName(brokerName);
+                       nc.setName(brokerName + ".nc");
+                       nc.setPrefetchSize(1);
+                       brokerService.addNetworkConnector(nc);
+               }
+
+               return brokerService;
+
+       }
+}

Propchange: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to