Author: chirino
Date: Thu Mar 1 00:24:41 2012
New Revision: 1295355
URL: http://svn.apache.org/viewvc?rev=1295355&view=rev
Log:
Upgrade to next hawtdispatch version.. Add a UDP based transport and protocol.
We should be abel to take syslog events to deliver them to a topic.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/pom.xml
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
Thu Mar 1 00:24:41 2012
@@ -14,4 +14,6 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.protocol.AnyProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.protocol.AnyProtocolFactory
+org.apache.activemq.apollo.broker.protocol.UdpProtocolFactory
+org.apache.activemq.apollo.broker.protocol.RawProtocolFactory
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/resources/META-INF/services/org.apache.activemq.apollo/transport-factory.index
Thu Mar 1 00:24:41 2012
@@ -18,3 +18,4 @@ org.apache.activemq.apollo.broker.transp
org.apache.activemq.apollo.broker.transport.TcpTransportFactory
org.apache.activemq.apollo.broker.transport.SslTransportFactory
org.apache.activemq.apollo.broker.jetty.WebSocketTransportFactory
+org.apache.activemq.apollo.broker.transport.UdpTransportFactory
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala?rev=1295355&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
Thu Mar 1 00:24:41 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import java.nio.ByteBuffer
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker._
+import java.lang.{Class, String}
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class RawProtocolFactory extends ProtocolFactory {
+ def create() = RawProtocol
+ def create(config: String): Protocol = {
+ config match {
+ case "raw" => RawProtocol
+ case _ => null
+ }
+ }
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object RawProtocol extends Protocol {
+
+ val PROTOCOL_ID = new AsciiBuffer(id)
+ def id = "raw"
+
+ def encode(message: Message):MessageRecord = {
+ message match {
+ case message:RawMessage =>
+ val rc = new MessageRecord
+ rc.protocol = PROTOCOL_ID
+ rc.buffer = message.payload
+ rc
+ case _ => throw new RuntimeException("Invalid message type");
+ }
+ }
+
+ def decode(message: MessageRecord) = {
+ assert( message.protocol == PROTOCOL_ID )
+ RawMessage(message.buffer)
+ }
+
+ def createProtocolCodec = throw new UnsupportedOperationException()
+ def createProtocolHandler = throw new UnsupportedOperationException()
+ def isIdentifiable = false
+ def maxIdentificaionLength = throw new UnsupportedOperationException()
+ def matchesIdentification(buffer: Buffer) = throw new
UnsupportedOperationException()
+}
+
+case class RawMessage(payload:Buffer) extends Message {
+
+ def getBodyAs[T](toType : Class[T]) = {
+ if( toType.isAssignableFrom(classOf[Buffer]) ) {
+ toType.cast(payload)
+ } else if( toType == classOf[Array[Byte]] ) {
+ toType.cast(payload.toByteArray)
+ } else if( toType == classOf[ByteBuffer] ) {
+ toType.cast(payload.toByteBuffer)
+ } else {
+ null
+ }
+ }
+
+ def getLocalConnectionId = null
+ def getProperty(name: String) = null
+ def expiration = 0L
+ def persistent = false
+ def priority = 0
+ def protocol = RawProtocol
+ def release() {}
+ def retain() {}
+ def retained() = 0
+}
\ No newline at end of file
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1295355&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
Thu Mar 1 00:24:41 2012
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.protocol
+
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtdispatch.transport.ProtocolCodec
+import java.nio.ByteBuffer
+import org.fusesource.hawtdispatch._
+import java.nio.channels.{DatagramChannel, WritableByteChannel,
ReadableByteChannel}
+import java.net.SocketAddress
+import org.apache.activemq.apollo.dto.{UdpDTO, AcceptingConnectorDTO}
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import java.util.Map.Entry
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import java.lang.String
+
+
+/**
+ * <p>
+ * A protocol factory for the UDP protocol.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class UdpProtocolFactory extends ProtocolFactory {
+
+ def create() = UdpProtocol
+
+ def create(config: String): Protocol = {
+ if (config == "udp") {
+ return UdpProtocol
+ }
+ return null
+ }
+
+}
+
+/**
+ * <p>
+ * The UDP protocol made for handling the UDP transport.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object UdpProtocol extends Protocol {
+
+ def id = "udp"
+
+ def createProtocolCodec = new UdpProtocolCodec()
+ def createProtocolHandler = new UdpProtocolHandler
+ def encode(message: Message) = throw new UnsupportedOperationException
+ def decode(message: MessageRecord) = throw new UnsupportedOperationException
+ def isIdentifiable = false
+ def maxIdentificaionLength = throw new UnsupportedOperationException()
+ def matchesIdentification(buffer: Buffer) = throw new
UnsupportedOperationException()
+
+}
+
+case class UdpMessage(from:SocketAddress, buffer:ByteBuffer)
+
+class UdpProtocolCodec extends ProtocolCodec {
+
+ def protocol = UdpProtocol.id
+
+ var channel: DatagramChannel = null
+ def setReadableByteChannel(channel: ReadableByteChannel) = {
+ this.channel = channel.asInstanceOf[DatagramChannel]
+ }
+
+ var read_counter = 0L
+ var read_read_size = 0L
+
+ def read: AnyRef = {
+ if (channel == null) {
+ throw new IllegalStateException
+ }
+ val buffer = ByteBuffer.allocate(channel.socket().getReceiveBufferSize)
+ val from = channel.receive(buffer)
+ if( from == null ) {
+ null
+ } else {
+ buffer.flip()
+ read_read_size = buffer.remaining()
+ read_counter += read_read_size
+ UdpMessage(from, buffer)
+ }
+ }
+
+ def getLastReadSize = read_read_size
+ def getReadCounter = read_counter
+ def getReadBufferSize = channel.socket().getReceiveBufferSize
+
+ def unread(buffer: Array[Byte]) = throw new UnsupportedOperationException()
+
+ // This protocol only supports receiving..
+ def setWritableByteChannel(channel: WritableByteChannel) = {}
+ def write(value: AnyRef) = ProtocolCodec.BufferState.FULL
+ def full: Boolean = true
+ def flush = ProtocolCodec.BufferState.FULL
+ def getWriteCounter = 0L
+ def getLastWriteSize = 0
+ def getWriteBufferSize = 0
+
+}
+
+trait UdpDecoder {
+ def init(handler:UdpProtocolHandler)
+ def address(message:UdpMessage):AsciiBuffer
+ def decode_addresses(value:AsciiBuffer):Array[SimpleAddress]
+ def decode_delivery(message:UdpMessage):Delivery
+}
+
+class DefaultUdpDecoder extends UdpDecoder {
+
+ var topic_address:AsciiBuffer = _
+ var topic_address_decoded:Array[SimpleAddress] = _
+
+ def init(handler:UdpProtocolHandler) = {
+ val topic_name = Option(handler.config.topic).getOrElse("udp")
+ topic_address_decoded =
LocalRouter.destination_parser.decode_multi_destination(topic_name, (name)=>
LocalRouter.destination_parser.decode_single_destination("topic:"+name, null))
+ topic_address = new
AsciiBuffer(LocalRouter.destination_parser.encode_destination(topic_address_decoded))
+ }
+
+ def address(message: UdpMessage) = topic_address
+ def decode_addresses(value: AsciiBuffer) = topic_address_decoded
+ def decode_delivery(message: UdpMessage) = {
+ val delivery = new Delivery
+ delivery.size = message.buffer.remaining()
+ delivery.message = RawMessage(new Buffer(message.buffer))
+ delivery
+ }
+}
+
+object UdpProtocolHandler extends Log
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class UdpProtocolHandler extends ProtocolHandler {
+ import UdpProtocolHandler._
+
+ def protocol = UdpProtocol.id
+ def session_id = None
+
+ var decoder:UdpDecoder = _
+ var buffer_size = 0
+ var host:VirtualHost = _
+ var connection_log:Log = _
+ var config:UdpDTO = _
+
+ def broker = connection.connector.broker
+ def queue = connection.dispatch_queue
+
+ override def on_transport_connected = {
+ connection.transport.resumeRead
+ import collection.JavaConversions._
+
+ config = (connection.connector.config match {
+ case connector_config:AcceptingConnectorDTO =>
+ connector_config.protocols.flatMap{ _ match {
+ case x:UdpDTO => Some(x)
+ case _ => None
+ }}.headOption
+ case _ => None
+ }).getOrElse(new UdpDTO)
+
+ val decoder_name =
Option(config.decoder).getOrElse(classOf[DefaultUdpDecoder].getName)
+ decoder = try {
+
this.getClass.getClassLoader.loadClass(decoder_name).newInstance().asInstanceOf[UdpDecoder]
+ } catch {
+ case x =>
+ warn(x)
+ connection.stop()
+ new DefaultUdpDecoder
+ }
+ buffer_size =
MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+ decoder.init(this)
+
+ broker.dispatch_queue {
+ var host = broker.get_default_virtual_host
+ queue {
+ this.host = host
+ connection_log = this.host.connection_log
+ connection.transport.resumeRead()
+ if(host==null) {
+ warn("Could not find default virtual host")
+ connection.stop()
+ }
+ }
+ }
+
+ }
+
+ var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](1000) {
+ override def onCacheEviction(eldest: Entry[AsciiBuffer,
StompProducerRoute]) = {
+ host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
+ }
+ }
+
+ override def on_transport_command(command: AnyRef) = {
+ val msg = command.asInstanceOf[UdpMessage]
+ val address = decoder.address(msg)
+ var route = producerRoutes.get(address);
+ if( route == null ) {
+ route = new StompProducerRoute(address)
+ producerRoutes.put(address, route)
+ val security_context = new SecurityContext
+ security_context.connector_id = connection.connector.id
+ security_context.local_address = connection.transport.getLocalAddress
+ host.dispatch_queue {
+ val rc = host.router.connect(route.addresses, route, security_context)
+ if( rc.isDefined ) {
+
+ }
+ }
+ }
+ route.send(msg);
+ }
+
+ class StompProducerRoute(dest: AsciiBuffer) extends
DeliveryProducerRoute(host.router) {
+ val addresses = decoder.decode_addresses(dest)
+ val key = addresses.toList
+
+ override def send_buffer_size = buffer_size
+ override def connection = Some(UdpProtocolHandler.this.connection)
+ override def dispatch_queue = queue
+
+ var inbound_queue_size = 0
+
+ val sink_switch = new MutableSink[Delivery]()
+
+ val inbound_queue = new OverflowSink[Delivery](sink_switch) {
+ override protected def onDelivered(value: Delivery) = {
+ inbound_queue_size -= value.size
+ }
+ }
+
+ override protected def on_connected = {
+ sink_switch.downstream = Some(this)
+ }
+
+ def send(frame:UdpMessage) = {
+ // Drop older entries to make room for this new one..
+ while( inbound_queue_size >= buffer_size ) {
+ inbound_queue.removeFirst
+ }
+
+ val delivery = decoder.decode_delivery(frame)
+ inbound_queue_size += delivery.size
+ inbound_queue.offer(delivery)
+ }
+ }
+
+}
+
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java?rev=1295355&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
(added)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/transport/UdpTransportFactory.java
Thu Mar 1 00:24:41 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.transport;
+
+import org.apache.activemq.apollo.util.IntrospectionSupport;
+import org.apache.activemq.apollo.util.URISupport;
+import org.fusesource.hawtdispatch.transport.Transport;
+import org.fusesource.hawtdispatch.transport.TransportServer;
+import org.fusesource.hawtdispatch.transport.UdpTransport;
+import org.fusesource.hawtdispatch.transport.UdpTransportServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class UdpTransportFactory implements TransportFactory.Provider {
+ private static final Logger LOG =
LoggerFactory.getLogger(UdpTransportFactory.class);
+
+ public TransportServer bind(String location) throws Exception {
+
+ URI uri = new URI(location);
+ Map<String, String> options = new HashMap<String,
String>(URISupport.parseParamters(uri));
+
+ UdpTransportServer server = createUdpTransportServer(uri, options);
+ if (server == null) return null;
+
+ Map<String, String> copy = new HashMap<String, String>(options);
+ IntrospectionSupport.setProperties(server, options);
+ return server;
+ }
+
+ public Transport connect(String location) throws Exception {
+ throw new UnsupportedOperationException() ;
+ }
+
+ protected UdpTransportServer createUdpTransportServer(final URI location,
final Map<String, String> options) throws IOException, URISyntaxException,
Exception {
+ if( !location.getScheme().equals("udp") ) {
+ return null;
+ }
+
+ return new UdpTransportServer(location) {
+ @Override
+ protected UdpTransport createTransport() {
+ UdpTransport transport = super.createTransport();
+ IntrospectionSupport.setProperties(transport, options);
+ return transport;
+ }
+ };
+ }
+}
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/Module.java
Thu Mar 1 00:24:41 2012
@@ -36,7 +36,8 @@ public class Module implements DtoModule
QueueDestinationDTO.class,
NullStoreDTO.class,
SimpleStoreStatusDTO.class,
- DetectDTO.class
+ DetectDTO.class,
+ UdpDTO.class
};
}
}
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java?rev=1295355&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
(added)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/UdpDTO.java
Thu Mar 1 00:24:41 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Configuration for the udp protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="udp")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UdpDTO extends ProtocolDTO {
+
+ /**
+ * Class name of the decoder that will be used to interpret the
+ * UDP message
+ */
+ @XmlAttribute(name="decoder")
+ public String decoder;
+
+ @XmlAttribute(name="buffer_size")
+ public String buffer_size;
+
+ @XmlAttribute(name="topic")
+ public String topic;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ UdpDTO detectDTO = (UdpDTO) o;
+
+ if (decoder != null ? !decoder.equals(detectDTO.decoder) :
detectDTO.decoder != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (decoder != null ? decoder.hashCode() : 0);
+ return result;
+ }
+}
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
Thu Mar 1 00:24:41 2012
@@ -66,4 +66,5 @@ TopicStatusDTO
ValueDTO
VirtualHostDTO
VirtualHostStatusDTO
-WebAdminDTO
\ No newline at end of file
+WebAdminDTO
+UdpDTO
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
Thu Mar 1 00:24:41 2012
@@ -27,5 +27,6 @@
</virtual_host>
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
+ <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
</broker>
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Thu Mar 1 00:24:41 2012
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.stomp
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import java.lang.String
-import java.net.InetSocketAddress
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker.{LocalRouter, KeyStorage, Broker,
BrokerFactory}
import java.util.concurrent.TimeUnit._
@@ -27,6 +26,9 @@ import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.dto.{QueueStatusDTO, TopicStatusDTO,
KeyStorageDTO}
import java.util.concurrent.atomic.AtomicLong
import FileSupport._
+import java.net.{DatagramSocket, InetSocketAddress}
+import java.nio.channels.DatagramChannel
+import org.fusesource.hawtbuf.AsciiBuffer
class StompTestSupport extends FunSuiteSupport with ShouldMatchers with
BeforeAndAfterEach with Logging {
var broker: Broker = null
@@ -59,10 +61,12 @@ class StompTestSupport extends FunSuiteS
clients = Nil
}
+ def connector_port(connector:String):Option[Int] = Option(connector).map {
id =>
+
broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(port)
+ }
+
def connect_request(version:String, c: StompClient, headers:String="",
connector:String=null) = {
- val p = Option(connector).map{ id =>
-
broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(port)
- }.getOrElse(port)
+ val p = connector_port(connector).getOrElse(port)
c.open("localhost", p)
version match {
case "1.0"=>
@@ -2374,5 +2378,21 @@ class StompTempDestinationTest extends S
frame should startWith("MESSAGE\n")
frame should include("reply-to:sms:8139993334444\n")
}
+}
+
+class StompUdpInteropTest extends StompTestSupport {
+
+ test("UDP to STOMP interop") {
+
+ connect("1.1")
+ subscribe("0", "/topic/udp")
+
+ val udp_port:Int = connector_port("udp").get
+ val channel = DatagramChannel.open();
+ val target = new InetSocketAddress("127.0.0.1", udp_port)
+ channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
+
+ assert_received("Hello")
+ }
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1295355&r1=1295354&r2=1295355&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Thu Mar 1 00:24:41 2012
@@ -96,7 +96,7 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <hawtdispatch-version>1.9</hawtdispatch-version>
+ <hawtdispatch-version>1.10-SNAPSHOT</hawtdispatch-version>
<hawtbuf-version>1.9</hawtbuf-version>
<stompjms-version>1.9</stompjms-version>