Author: chirino
Date: Mon Jun 29 15:33:41 2009
New Revision: 789361

URL: http://svn.apache.org/viewvc?rev=789361&view=rev
Log:
BrokerDatabase now exposed 2 properties:
 * flushDelay 
 * storeBypass - Allows you to disable canceling database operations.
 

Modified:
    
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789361&r1=789360&r2=789361&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
 Mon Jun 29 15:33:41 2009
@@ -58,6 +58,7 @@
 public class BrokerDatabase extends 
AbstractLimitedFlowResource<BrokerDatabase.OperationBase> implements Service, 
DispatcherAware {
 
     private static final boolean DEBUG = false;
+    
     private final Store store;
     private final Flow databaseFlow = new Flow("database", false);
 
@@ -80,8 +81,10 @@
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
     // requested.
-    private final long FLUSH_DELAY_MS = 10;
-    private final Runnable flushDelayCallback;
+    private long flushDelay = 10;
+
+       private final Runnable flushDelayCallback;
+    private boolean storeBypass = true;
 
     public interface DatabaseListener {
         /**
@@ -283,7 +286,7 @@
             op.opSequenceNumber = opSequenceNumber++;
             opQueue.addLast(op);
             if (op.flushRequested || storeLimiter.getThrottled()) {
-                if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
+                if (op.isDelayable() && flushDelay > 0) {
                     scheduleDelayedFlush(op.opSequenceNumber);
                 } else {
                     updateFlushPointer(op.opSequenceNumber);
@@ -313,7 +316,7 @@
 
         if (requestedDelayedFlushPointer == -1) {
             requestedDelayedFlushPointer = delayedFlushPointer;
-            dispatcher.schedule(flushDelayCallback, FLUSH_DELAY_MS, 
TimeUnit.MILLISECONDS);
+            dispatcher.schedule(flushDelayCallback, flushDelay, 
TimeUnit.MILLISECONDS);
         }
 
     }
@@ -705,15 +708,17 @@
         public static final int BASE_MEM_SIZE = 20;
 
         public boolean cancel() {
-            if (executePending.compareAndSet(true, false)) {
-                cancelled.set(true);
-                // System.out.println("Cancelled: " + this);
-                synchronized (opQueue) {
-                    unlink();
-                    storeController.elementDispatched(this);
-                }
-                return true;
-            }
+               if( storeBypass ) {
+                   if (executePending.compareAndSet(true, false)) {
+                       cancelled.set(true);
+                       // System.out.println("Cancelled: " + this);
+                       synchronized (opQueue) {
+                           unlink();
+                           storeController.elementDispatched(this);
+                       }
+                       return true;
+                   }
+               }
             return cancelled.get();
         }
 
@@ -1276,4 +1281,35 @@
             return "AddTxOpOperation " + record.getKey() + super.toString();
         }
     }
+    
+    public long getFlushDelay() {
+               return flushDelay;
+       }
+
+       public void setFlushDelay(long flushDelay) {
+               this.flushDelay = flushDelay;
+       }
+
+       /**
+        * @return true if operations are allowed to bypass the store.
+        */
+       public boolean isStoreBypass() {
+               return storeBypass;
+       }
+
+       /**
+        * Sets if persistent operations should be allowed to bypass the store.
+        * Defaults to true, as this will give you the best performance.  In 
some  
+        * cases, you want to disable this as the store being used will double
+        * as an audit log and you do not want any persistent operations
+        * to bypass the store.
+        * 
+        * When store bypass is disabled, all {...@link Operation#cancel()} 
requests
+        * will return false.
+        * 
+        * @param enable if true will enable store bypass
+        */
+       public void setStoreBypass(boolean enable) {
+               this.storeBypass = enable;
+       }
 }

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789361&r1=789360&r2=789361&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
 Mon Jun 29 15:33:41 2009
@@ -24,11 +24,14 @@
 
 import javax.jms.JMSException;
 
+import junit.framework.TestCase;
+
+import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerDatabase;
 import org.apache.activemq.apollo.broker.BrokerQueueStore;
-import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
+import 
org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -58,20 +61,19 @@
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
-
-import junit.framework.TestCase;
 
 public class SharedQueuePerfTest extends TestCase {
 
-    private static int PERFORMANCE_SAMPLES = 5;
+    private static int PERFORMANCE_SAMPLES = 500000;
 
     IDispatcher dispatcher;
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
-    private static final boolean PERSISTENT = false;
+    private static final boolean PERSISTENT = true;
     private static final boolean PURGE_STORE = true;
+    // Producers send sync and operations are never canceled. 
+    private static final boolean TEST_MAX_STORE_LATENCY = true;
     private static final int THREAD_POOL_SIZE = 
Runtime.getRuntime().availableProcessors();
 
     protected MetricAggregator totalProducerRate = new 
MetricAggregator().name("Aggregate Producer Rate").unit("items");
@@ -96,6 +98,10 @@
         dispatcher.start();
         database = new BrokerDatabase(createStore());
         database.setDispatcher(dispatcher);
+        if( TEST_MAX_STORE_LATENCY ) {
+               database.setFlushDelay(0);
+               database.setStoreBypass(false);
+        }
         database.start();
         queueStore = new BrokerQueueStore();
         queueStore.setDatabase(database);
@@ -286,7 +292,8 @@
     class Producer implements Dispatchable, 
FlowUnblockListener<OpenWireMessageDelivery> {
         private AtomicBoolean stopped = new AtomicBoolean(false);
         private String name;
-        protected final MetricCounter rate = new MetricCounter();
+        protected final MetricCounter sendRate = new MetricCounter();
+               AtomicBoolean waitingForAck = new AtomicBoolean();
         private final DispatchContext dispatchContext;
 
         protected IFlowController<OpenWireMessageDelivery> outboundController;
@@ -303,8 +310,8 @@
 
         public Producer(String name, IQueue<Long, MessageDelivery> 
targetQueue) {
             this.name = name;
-            rate.name("Producer " + name + " Rate");
-            totalProducerRate.add(rate);
+            sendRate.name("Producer " + name + " Rate");
+            totalProducerRate.add(sendRate);
             dispatchContext = dispatcher.register(this, name);
             // create a 1024 byte payload (2 bytes per char):
             payload = new String(new byte[512]);
@@ -356,9 +363,32 @@
         }
 
         public boolean dispatch() {
+            // If flow controlled stop until flow control is lifted.
+            if (outboundController.isSinkBlocked()) {
+                if (outboundController.addUnblockListener(this)) {
+                    return true;
+                }
+            }
+
+            if( TEST_MAX_STORE_LATENCY ) {
+               // We can't send again until we get persist ack.
+               if( waitingForAck.get() ) {
+                    return true;
+               }
+            }
+            
             if (next == null) {
                 try {
-                    createNextMessage();
+                       next = createNextMessage();
+                                       if (TEST_MAX_STORE_LATENCY) {
+                                               waitingForAck.set(true);
+                                               next.setPersistListener(new 
PersistListener() {
+                                                       public void 
onMessagePersisted(OpenWireMessageDelivery delivery) {
+                                                               
waitingForAck.set(false);
+                                                               
dispatchContext.requestDispatch();
+                                                       }
+                                               });
+                                       }
                 } catch (JMSException e) {
                     e.printStackTrace();
                     stopped.set(true);
@@ -366,20 +396,13 @@
                 }
             }
 
-            // If flow controlled stop until flow control is lifted.
-            if (outboundController.isSinkBlocked()) {
-                if (outboundController.addUnblockListener(this)) {
-                    return true;
-                }
-            }
-
+            sendRate.increment();
             outboundQueue.add(next, null);
-            rate.increment();
             next = null;
             return stopped.get();
         }
 
-        private void createNextMessage() throws JMSException {
+        private OpenWireMessageDelivery createNextMessage() throws 
JMSException {
             ActiveMQTextMessage message = new ActiveMQTextMessage();
             message.setJMSPriority(priority);
             message.setProducerId(producerId);
@@ -389,7 +412,7 @@
             if (payload != null) {
                 message.setText(payload);
             }
-            next = new OpenWireMessageDelivery(message);
+            return new OpenWireMessageDelivery(message);
         }
 
         public void onFlowUnblocked(ISinkController<OpenWireMessageDelivery> 
controller) {


Reply via email to