Author: tabish
Date: Fri Jun 3 19:39:32 2011
New Revision: 1131161
URL: http://svn.apache.org/viewvc?rev=1131161&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30
Add support for sending the Queue Browse end MessageDispatch command.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1131161&r1=1131160&r2=1131161&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Fri Jun 3 19:39:32 2011
@@ -17,14 +17,12 @@
package org.apache.activemq.apollo.openwire
-import dto.OpenwireConnectionStatusDTO
import OpenwireConstants._
import org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
import collection.mutable.{ListBuffer, HashMap}
-import AsciiBuffer._
import org.apache.activemq.apollo.broker._
import BufferConversions._
import java.io.IOException
@@ -533,14 +531,43 @@ class OpenwireProtocolHandler extends Pr
def producer = p
def consumer = ConsumerContext.this
+ var closed = false
val outbound_session = outbound_sessions.open(producer.dispatch_queue)
def downstream = outbound_session
def close = {
- outbound_sessions.close(outbound_session)
- release
+
+ assert(producer.dispatch_queue.isExecuting)
+ if( !closed ) {
+ closed = true
+ if( browser ) {
+ // Then send the end of browse message.
+ var dispatch = new MessageDispatch
+ dispatch.setConsumerId(this.consumer.info.getConsumerId)
+ dispatch.setMessage(null)
+ dispatch.setDestination(null)
+
+ if( outbound_session.full ) {
+ // session is full so use an overflow sink so to hold the
message,
+ // and then trigger closing the session once it empties out.
+ val sink = new OverflowSink(outbound_session)
+ sink.refiller = ^{
+ outbound_sessions.close(outbound_session)
+ release
+ }
+ sink.offer(dispatch)
+ } else {
+ outbound_session.offer(dispatch)
+ outbound_sessions.close(outbound_session)
+ release
+ }
+ } else {
+ outbound_sessions.close(outbound_session)
+ release
+ }
+ }
}
def remaining_capacity = outbound_session.remaining_capacity