Author: chirino
Date: Fri Jun 29 21:28:46 2012
New Revision: 1355565
URL: http://svn.apache.org/viewvc?rev=1355565&view=rev
Log:
Pickup fusesource api updates and add use it for a client test.
Added:
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1355565&r1=1355564&r2=1355565&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Fri Jun 29 21:28:46 2012
@@ -35,17 +35,16 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.amqp.dto._
import org.fusesource.amqp._
-import org.fusesource.amqp.Callback
-import org.fusesource.amqp.codec.api.AnnotatedMessage
-import org.fusesource.amqp.codec.marshaller.MessageSupport
-import org.fusesource.amqp.codec.types._
+import org.fusesource.amqp.callback._
+import org.fusesource.amqp.callback.Callback
+import org.fusesource.amqp.types._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object AMQPMessage {
- def apply(annotated:AnnotatedMessage):AMQPMessage = {
+ def apply(annotated:Envelope):AMQPMessage = {
val payload = MessageSupport.toBuffer(annotated)
val rc = AMQPMessage(payload)
rc._annotated = annotated
@@ -53,14 +52,14 @@ object AMQPMessage {
}
}
-case class AMQPMessage(payload:Buffer) extends Message {
+case class AMQPMessage(payload:Buffer) extends
org.apache.activemq.apollo.broker.Message {
import AmqpProtocolHandler._
def protocol = AmqpProtocol
- var _annotated:AnnotatedMessage = _
+ var _annotated:Envelope = _
def annotated = {
if ( _annotated ==null ) {
- _annotated = MessageSupport.decodeAnnotatedMessage(payload)
+ _annotated = MessageSupport.decodeEnvelope(payload)
}
_annotated
}
@@ -196,8 +195,7 @@ class AmqpProtocolHandler extends Protoc
val connector_config =
connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.find(
_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
- val options = new AMQPConnectionOptions
- options.setServer(true);
+ val options = new AMQPServerConnectionOptions
options.setTransport(connection.transport);
options.setMaxFrameSize(1024*4)
options.setIdleTimeout(-1);
@@ -213,7 +211,13 @@ class AmqpProtocolHandler extends Protoc
})
options.setListener(new AMQPConnection.Listener(){
- override def onBegin(begin: Begin) = new AMQPSessionOptions(100, 100,
session_listener)
+ override def onBegin(begin: Begin) = {
+ val rc = new AMQPServerSessionOptions
+ rc.setIncomingWindow(100)
+ rc.setOutgoingWindow(100)
+ rc.setListener(session_listener)
+ rc
+ }
override def onAccepted(session: AMQPSession) {
connection_log.info("accepted: "+session)
@@ -374,13 +378,12 @@ class AmqpProtocolHandler extends Protoc
var receiver: AMQPReceiver = null
// create the producer route...
val options = new AMQPReceiverOptions();
- options.source = attach.getSource.asInstanceOf[Source]
- options.target = attach.getTarget.asInstanceOf[Target]
- options.name = attach.getName
- options.senderSettleMode =
SenderSettleMode.valueOf(attach.getSndSettleMode)
- options.receiverSettleMode =
ReceiverSettleMode.valueOf(attach.getRcvSettleMode)
-
- options.maxMessageSize = 10 * 1024 * 1024;
+ options.setSource(attach.getSource.asInstanceOf[Source])
+ options.setTarget(attach.getTarget.asInstanceOf[Target])
+ options.setName(attach.getName)
+
options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
+
options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
+ options.setMaxMessageSize(10 * 1024 * 1024);
def pump = {
while (target.is_connected && !target.full && receiver.peek() != null) {
@@ -424,7 +427,7 @@ class AmqpProtocolHandler extends Protoc
target.refiller = ^ {
pump
}
- options.listener = new AMQPEndpoint.Listener {
+ options.setListener(new AMQPEndpoint.Listener {
override def onTransfer() = pump
override def onClosed(senderClosed: Boolean, error: Error) {
@@ -435,7 +438,7 @@ class AmqpProtocolHandler extends Protoc
host.router.disconnect(target.addresses, target)
}
}
- }
+ })
// start with 0 credit window so that we don't receive any messages
// until we have verified if that we can connect to the destination..
@@ -517,7 +520,7 @@ class AmqpProtocolHandler extends Protoc
var annotated = if( message.protocol eq AmqpProtocol ) {
val original = message.asInstanceOf[AMQPMessage].annotated
- var annotated = new AnnotatedMessageImpl
+ var annotated = new Envelope
annotated.setHeader(header)
annotated.setDeliveryAnnotations(original.getDeliveryAnnotations)
annotated.setMessageAnnotations(original.getMessageAnnotations)
@@ -531,14 +534,15 @@ class AmqpProtocolHandler extends Protoc
case _ => (message.encoded, "protocol/"+message.protocol.id())
}
- val bare = new ValueMessageImpl(new AMQPBinary(body))
+ val bare = new types.Message
+ bare.setData(new Data(body))
var properties = new Properties()
properties.setContentType(ascii(content_type))
if( delivery.expiration!= 0 ) {
properties.setAbsoluteExpiryTime(new Date(delivery.expiration))
}
bare.setProperties(properties)
- var annotated = new AnnotatedMessageImpl
+ var annotated = new Envelope
annotated.setHeader(header)
annotated.setMessage(bare)
annotated
@@ -555,7 +559,7 @@ class AmqpProtocolHandler extends Protoc
// AMQPEndpoint.Listener interface..
///////////////////////////////////////////////////////////////////
object endpoint_listener extends AMQPEndpoint.Listener {
- override def onTransfer = {
+ override def onTransfer = queue {
sink.refiller.run()
}
override def onClosed(senderClosed: Boolean, error: Error) {
@@ -601,25 +605,28 @@ class AmqpProtocolHandler extends Protoc
def attach_receiver(attach: Attach, address: String,
requested_addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) =
try {
val options = new AMQPSenderOptions();
- options.source = attach.getSource.asInstanceOf[Source]
- options.source.setDefaultOutcome(new Released())
+
+ val src = attach.getSource.asInstanceOf[Source]
+ src.setDefaultOutcome(new Released())
if (attach.getSndSettleMode == SenderSettleMode.SETTLED.getValue) {
// if we are settling... then no other outcomes are possible..
- options.source.setOutcomes(Array())
+ src.setOutcomes(Array())
} else {
- options.source.setOutcomes(Array(
+ src.setOutcomes(Array(
new AMQPSymbol(Accepted.SYMBOLIC_ID),
new AMQPSymbol(Rejected.SYMBOLIC_ID),
new AMQPSymbol(Released.SYMBOLIC_ID),
new AMQPSymbol(Modified.SYMBOLIC_ID)
))
}
- options.target = attach.getTarget.asInstanceOf[Target]
- options.name = attach.getName
- options.senderSettleMode =
SenderSettleMode.valueOf(attach.getSndSettleMode)
- options.receiverSettleMode =
ReceiverSettleMode.valueOf(attach.getRcvSettleMode)
- options.maxMessageSize = 10 * 1024 * 1024;
+ options.setSource(src)
+
+ options.setTarget(attach.getTarget.asInstanceOf[Target])
+ options.setName(attach.getName)
+
options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
+
options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
+ options.setMaxMessageSize(10 * 1024 * 1024);
val subscription_id = attach.getName
@@ -676,7 +683,7 @@ class AmqpProtocolHandler extends Protoc
val from_seq = from_seq_opt.getOrElse(0L)
val source = new AMQPConsumer(subscription_id, addresses, selector,
browser, exclusive, include_seq, from_seq, browser_end);
- options.listener = source.endpoint_listener
+ options.setListener(source.endpoint_listener)
source.sender = AMQP.createSender(options)
host.dispatch_queue {
Added:
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala?rev=1355565&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Fri Jun 29 21:28:46 2012
@@ -0,0 +1,96 @@
+package org.apache.activemq.apollo.amqp
+
+import org.fusesource.amqp.blocking.AMQP
+import org.fusesource.amqp.types._
+import org.fusesource.amqp._
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+class FuseSourceClientTest extends AmqpTestSupport {
+
+ test("broker") {
+
+// val port = 5672
+// val queue = "testqueue"
+
+ val queue = "/queue/fstestqueue"
+
+ val nMsgs = 1
+// val qos = QoS.AT_MOST_ONCE
+
+ try {
+ val connect_options = new AMQPClientOptions
+ connect_options.setHost("127.0.0.1", port)
+ connect_options.setContainerId("client")
+ connect_options.setIdleTimeout(-1)
+ connect_options.setMaxFrameSize(1024*4)
+// connect_options.setListener(new AMQPConnection.Listener())
+ val connection = AMQP.open(connect_options);
+ {
+ var data = "x" * 10 // 1024*20
+
+ var session = connection.createSession(10, 10)
+ val sender_options = new AMQPSenderOptions
+ sender_options.setQoS(AMQPQoS.AT_MOST_ONCE)
+ sender_options.setTarget(queue);
+ var p = AMQP.createSender(sender_options)
+ p.attach(session);
+
+ for (i <- 0 until nMsgs) {
+ var s = "Message #" + (i + 1)
+ println("Sending " + s)
+ p.send(MessageSupport.message(s+", data: "+data))
+ }
+
+ p.close()
+ session.close()
+ }
+ {
+ var session = connection.createSession(10, 10)
+
+ val receiver_options = new AMQPReceiverOptions
+ receiver_options.setQoS(AMQPQoS.AT_MOST_ONCE)
+ receiver_options.setSource(queue);
+ var c = AMQP.createReceiver(receiver_options)
+ c.attach(session);
+
+ // Receive messages non-transacted
+ for (i <- 0 until nMsgs) {
+ val msg = c.receive();
+ if (msg == null)
+
+ msg.getMessage().getData match {
+ case value:AMQPString =>
+ println("Received: " + value.getValue());
+ }
+// if (!msg.isSettled())
+// msg.accept();
+ }
+ c.close()
+ session.close()
+ }
+ connection.close()
+ } catch {
+ case e: Exception => {
+ e.printStackTrace
+ }
+ }
+
+ }
+
+}
\ No newline at end of file