Author: cmacnaug
Date: Thu Jun 18 03:36:41 2009
New Revision: 785887

URL: http://svn.apache.org/viewvc?rev=785887&view=rev
Log:
Updating OpenWireProtocolHandler to use count based window limiter rather than 
size based.

This allows the the protocol handler to work with the activemq-client. 

Modified:
    
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java

Modified: 
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
 Thu Jun 18 03:36:41 2009
@@ -478,7 +478,6 @@
 
     private Broker createBroker(String name, String bindURI, String 
connectUri) throws Exception {
         Broker broker = new Broker();
-        broker.setDefaultVirtualHost(new VirtualHost(name));
         broker.addTransportServer(TransportFactory.bind(new URI(bindURI)));
         broker.addConnectUri(connectUri);
         broker.setDispatcher(dispatcher);

Modified: 
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
 Thu Jun 18 03:36:41 2009
@@ -193,7 +193,7 @@
                     }
                 }
             }
-            
+
             if (deleteAllMessages) {
                 getJournal().start();
                 journal.delete();
@@ -247,7 +247,6 @@
         try {
             open();
 
-            
             store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + 
new Date())), null);
         } finally {
             indexLock.writeLock().unlock();
@@ -441,7 +440,7 @@
         try {
             indexLock.writeLock().lock();
             long start = System.currentTimeMillis();
-            
+
             try {
                 if (!opened.get()) {
                     return;
@@ -718,11 +717,13 @@
 
         public final void rollback() {
             try {
-                if (updateCount > 1) {
-                    journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
-                }
                 if (tx != null) {
+                    if (updateCount > 1) {
+                        journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
+                    }
                     tx.rollback();
+                } else {
+                    throw new IllegalStateException("Not in Transaction");
                 }
             } catch (IOException e) {
                 throw new FatalStoreException(e);

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
 Thu Jun 18 03:36:41 2009
@@ -103,6 +103,7 @@
 
     public OpenwireProtocolHandler() {
         setStoreWireFormat(new OpenWireFormat());
+        
         visitor = new CommandVisitor() {
 
             // 
/////////////////////////////////////////////////////////////////
@@ -333,10 +334,7 @@
         Command command = (Command) o;
         boolean responseRequired = command.isResponseRequired();
         try {
-            
             command.visit(visitor);
-            
-            
         } catch (Exception e) {
             if (responseRequired) {
                 ExceptionResponse response = new ExceptionResponse(e);
@@ -449,7 +447,7 @@
             limiter = new WindowLimiter<MessageDelivery>(true, flow, 
info.getPrefetchSize(), info.getPrefetchSize() / 2) {
                 @Override
                 public int getElementSize(MessageDelivery m) {
-                    return m.getFlowLimiterSize();
+                    return 1;
                 }
             };
 

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
 Thu Jun 18 03:36:41 2009
@@ -39,40 +39,48 @@
     private ConsumerInfo consumerInfo;
 
     private Message lastMessage;
-    
+
     protected void initialize() {
+        inputWindowSize = 1000;
+        inputResumeThreshold = 500;
         // Setup the input processing..
-        final Flow flow = new Flow("client-"+name+"-inbound", false);
-        inputResumeThreshold = inputWindowSize/2;
+        final Flow flow = new Flow("client-" + name + "-inbound", false);
+        inputResumeThreshold = inputWindowSize / 2;
         WindowLimiter<MessageDelivery> limiter = new 
WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, 
inputResumeThreshold) {
             @Override
             protected void sendCredit(int credit) {
                 MessageAck ack = OpenwireSupport.createAck(consumerInfo, 
lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);
                 write(ack);
             }
+
+            public int getElementSize(MessageDelivery md) {
+                return 1;
+            }
         };
         inboundController = new FlowController<MessageDelivery>(new 
FlowControllable<MessageDelivery>() {
             public void flowElemAccepted(ISourceController<MessageDelivery> 
controller, MessageDelivery elem) {
                 messageReceived(controller, elem);
             }
+
             public String toString() {
                 return flow.getFlowName();
             }
+
             public IFlowResource getFlowResource() {
                 return null;
             }
         }, flow, limiter, inboundMutex);
         
inboundController.setExecutor(getDispatcher().createPriorityExecutor(getDispatcher().getDispatchPriorities()
 - 1));
-        
+
     }
-    
+
     protected void setupSubscription() throws Exception, IOException {
-        if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+        if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) {
             activemqDestination = new 
ActiveMQQueue(destination.getName().toString());
         } else {
             activemqDestination = new 
ActiveMQTopic(destination.getName().toString());
         }
-        
+
         connectionInfo = createConnectionInfo(name);
         transport.oneway(connectionInfo);
         sessionInfo = createSessionInfo(connectionInfo);
@@ -81,7 +89,7 @@
         consumerInfo.setPrefetchSize(inputWindowSize);
         transport.oneway(consumerInfo);
     }
-    
+
     public void onCommand(Object command) {
         try {
             if (command.getClass() == WireFormatInfo.class) {


Reply via email to