Author: gtully
Date: Thu Feb  5 13:07:32 2009
New Revision: 741096

URL: http://svn.apache.org/viewvc?rev=741096&view=rev
Log:
update simple enqueue rate verifier for kaha store

Modified:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741096&r1=741095&r2=741096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 Thu Feb  5 13:07:32 2009
@@ -19,54 +19,40 @@
 import java.io.File;
 import java.text.DateFormat;
 import java.util.Date;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
 
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 public class VerifySteadyEnqueueRate extends TestCase {
 
-    private static final Log LOG = 
LogFactory.getLog(VerifySteadyEnqueueRate.class);
+    private static final Log LOG = LogFactory
+            .getLog(VerifySteadyEnqueueRate.class);
 
-    private final CountDownLatch latch = new CountDownLatch(max_messages);
-    private static int max_messages = 10000000;
-    private static int messageCounter;
-    private String destinationName = getName()+"_Queue";
+    private static int max_messages = 1000000;
+    private String destinationName = getName() + "_Queue";
     private BrokerService broker;
-    private Connection receiverConnection;
-    private Connection producerConnection;
     final boolean useTopic = false;
-    
-    private boolean useAMQPStore=true;
+
+    private boolean useAMQPStore = false;
     protected static final String payload = new String(new byte[24]);
 
     public void setUp() throws Exception {
-        messageCounter = 0;
         startBroker();
-        receiverConnection = createConnection();
-        receiverConnection.start();
-        producerConnection = createConnection();
-        producerConnection.start();
     }
-    
+
     public void tearDown() throws Exception {
-        receiverConnection.close();
-        producerConnection.close();
         broker.stop();
     }
 
@@ -74,44 +60,62 @@
         if (true) {
             return;
         }
-        doTestForDataFileNotDeleted(false);
+        doTestEnqueue(false);
     }
-       
-    private void doTestForDataFileNotDeleted(boolean transacted) throws 
Exception {
+
+    private void doTestEnqueue(final boolean transacted) throws Exception {
         final long min = 100;
-        long max = 0;
+        final AtomicLong max = new AtomicLong(0);
         long reportTime = 0;
-        Receiver receiver = new Receiver() {
-            public void receive(String s) throws Exception {
-                messageCounter++; 
-                latch.countDown();
-            }
-        };
-        //buildReceiver(receiverConnection, destinationName, transacted, 
receiver, useTopic);
 
-        final MessageSender producer = new MessageSender(destinationName, 
producerConnection, transacted, useTopic);
-        for (int i=0; i< max_messages; i++) {
-            long startT = System.currentTimeMillis();
-            producer.send(payload );
-            long endT = System.currentTimeMillis();
-            long duration = endT - startT;
-            
-            if (duration > max) {
-                max = duration;
-            }
-            
-            if (duration > min) {
-                System.err.println(DateFormat.getTimeInstance().format(new 
Date(startT)) + " at message " + i + " send time=" + duration);    
+        Runnable runner = new Runnable() {
+
+            public void run() {
+                try {
+                    MessageSender producer = new MessageSender(destinationName,
+                            createConnection(), transacted, useTopic);
+
+                    for (int i = 0; i < max_messages; i++) {
+                        long startT = System.currentTimeMillis();
+                        producer.send(payload);
+                        long endT = System.currentTimeMillis();
+                        long duration = endT - startT;
+
+                        if (duration > max.get()) {
+                            max.set(duration);
+                        }
+
+                        if (duration > min) {
+                            System.err.println(Thread.currentThread().getName()
+                                    + " "
+                                    + DateFormat.getTimeInstance().format(
+                                            new Date(startT)) + " at message "
+                                    + i + " send time=" + duration);
+                        }
+                    }
+
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+                System.out.println("max = " + max);
             }
+        };
+        ExecutorService executor = Executors.newCachedThreadPool();
+        int numThreads = 6;
+        for (int i = 0; i < numThreads; i++) {
+            executor.execute(runner);
+        }
+        
+        executor.shutdown();
+        while(!executor.isTerminated()) {
+            executor.awaitTermination(10, TimeUnit.SECONDS);
         }
-        System.out.println("max = " + max);
-        //latch.await();
-        //assertEquals(max_messages, messageCounter);
-        
//waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 30000, 
2); 
     }
 
     private Connection createConnection() throws Exception {
-        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getConnectUri());
         return factory.createConnection();
     }
 
@@ -120,21 +124,23 @@
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setPersistent(true);
         broker.setUseJmx(true);
-        
-        if( useAMQPStore ) {
-            AMQPersistenceAdapterFactory factory = 
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
-            // ensure there are a bunch of data files but multiple entries in 
each
-            //factory.setMaxFileLength(1024 * 20);
+
+        if (useAMQPStore) {
+            AMQPersistenceAdapterFactory factory = 
(AMQPersistenceAdapterFactory) broker
+                    .getPersistenceFactory();
+            // ensure there are a bunch of data files but multiple entries in
+            // each
+            // factory.setMaxFileLength(1024 * 20);
             // speed up the test case, checkpoint an cleanup early and often
-            //factory.setCheckpointInterval(500);
-            factory.setCleanupInterval(1000*60*30);
+            // factory.setCheckpointInterval(500);
+            factory.setCleanupInterval(1000 * 60 * 30);
             factory.setSyncOnWrite(false);
-            
-            //int indexBinSize=262144; // good for 6M
-            int indexBinSize=1024;
+
+            // int indexBinSize=262144; // good for 6M
+            int indexBinSize = 1024;
             factory.setIndexMaxBinSize(indexBinSize * 2);
             factory.setIndexBinSize(indexBinSize);
-            factory.setIndexPageSize(192*20);
+            factory.setIndexPageSize(192 * 20);
         } else {
             KahaDBStore kaha = new KahaDBStore();
             kaha.setDirectory(new File("target/activemq-data/kahadb"));
@@ -146,25 +152,4 @@
         broker.start();
         LOG.info("Starting broker..");
     }
-
-    private void buildReceiver(Connection connection, final String queueName, 
boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
-        final Session session = transacted ? connection.createSession(true, 
Session.SESSION_TRANSACTED) : connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer inputMessageConsumer = session.createConsumer(isTopic 
? session.createTopic(queueName) : session.createQueue(queueName));
-        MessageListener messageListener = new MessageListener() {
-
-            public void onMessage(Message message) {
-                try {
-                    ObjectMessage objectMessage = (ObjectMessage)message;
-                    String s = (String)objectMessage.getObject();
-                    receiver.receive(s);
-                    if (session.getTransacted()) {
-                        session.commit();
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        };
-        inputMessageConsumer.setMessageListener(messageListener);
-    }
 }


Reply via email to