Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties?rev=1352265&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties Wed Jun 20 18:57:01 2012 @@ -0,0 +1,45 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# Setup the default logging levels +# +log4j.rootLogger=INFO, console, logfile +#log4j.logger.org.apache.activemq.apollo=INFO + +# +# Uncomment one of the following to enable debug logging +# +# log4j.logger.org.apache.activemq.apollo=DEBUG +# log4j.logger.org.apache.activemq.apollo.broker=DEBUG +# log4j.logger.org.apache.activemq.apollo.web=DEBUG +# log4j.logger.org.apache.activemq.apollo.cli=DEBUG + +# Console Settings +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%-5p | %m%n +log4j.appender.console.threshold=TRACE + +# File Settings +log4j.appender.logfile=org.apache.log4j.RollingFileAppender +log4j.appender.logfile.file=apollo.log +log4j.appender.logfile.maxFileSize=5MB +log4j.appender.logfile.maxBackupIndex=5 +log4j.appender.logfile.append=true +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config?rev=1352265&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config Wed Jun 20 18:57:01 2012 @@ -0,0 +1,41 @@ +// --------------------------------------------------------------------------- +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// --------------------------------------------------------------------------- +AmqpSecurityTest { + + org.apache.activemq.apollo.broker.security.SocketAddressLoginModule requisite; + + org.apache.activemq.apollo.broker.security.FileUserLoginModule optional + file="users.properties"; + + // + // For testing purposes, we do a funny thing where we set the user + // file to also be used as the groups file. This only works for the + // test since user==password==group for our tests. + // + org.apache.activemq.apollo.broker.security.FileGroupLoginModule optional + file="users.properties"; + +}; + +AmqpSslSecurityTest { + org.apache.activemq.apollo.broker.security.CertificateLoginModule optional; + + org.apache.activemq.apollo.broker.security.FileGroupLoginModule optional + match="javax.security.auth.x500.X500Principal" + file="users.properties"; + +}; \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml?rev=1352265&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml Wed Jun 20 18:57:01 2012 @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<broker xmlns="http://activemq.apache.org/schema/activemq/apollo"> + <virtual_host id="vh-local"> + </virtual_host> + + <connector bind="tcp://0.0.0.0:61616" id="port-61616"> + <amqp add_user_header="JMSXUserID"> + <add_user_header>GroupId</add_user_header> + <add_user_header kind="UserPrincipal">UserId</add_user_header> + </amqp> + </connector> +</broker> Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties?rev=1352265&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties Wed Jun 20 18:57:01 2012 @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +connect_group=CN=ssl_user|can_only_connect|can_send_create_queue|can_send_queue|can_receive_queue|can_consume_queue|can_send_create_topic|can_send_topic|can_recieve_topic|can_consume_create_ds|can_consume_ds|can_send_create_consume_queue + +guest=guest +can_not_connect=can_not_connect +can_only_connect=can_only_connect +connector_restricted=connector_restricted + +# +# Users with specific roles related to queues +# +can_send_create_queue=can_send_create_queue +can_send_queue=can_send_queue +can_receive_queue=can_receive_queue +can_consume_queue=can_consume_queue +can_send_create_consume_queue=can_send_create_consume_queue + +# +# Users with specific roles related to topics +# +can_send_create_topic=can_send_create_topic +can_send_topic=can_send_topic +can_recieve_topic=can_recieve_topic +can_consume_create_ds=can_consume_create_ds +can_consume_ds=can_consume_ds Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala?rev=1352265&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala Wed Jun 20 18:57:01 2012 @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp + +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterEach +import org.apache.activemq.apollo.broker._ +import java.io.FileInputStream +import org.apache.activemq.apollo.util.FileSupport._ +import org.fusesource.amqp.codec.AMQPProtocolCodec +import com.swiftmq.amqp.v100.client.ExceptionListener +import java.lang.Exception + +class AmqpTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach { + + override def broker_config_uri = "xml:classpath:apollo-amqp.xml" + +// var client = new AmqpClient +// var clients = List[AmqpClient]() +// +// override protected def afterEach() = { +// super.afterEach +// clients.foreach(_.close) +// clients = Nil +// } +// +// def connect_request(version:String, c: AmqpClient, headers:String="", connector:String=null) = { +// val p = connector_port(connector).getOrElse(port) +// c.open("localhost", p) +// version match { +// case "1.0"=> +// c.write( +// "CONNECT\n" + +// headers + +// "\n") +// case "1.1"=> +// c.write( +// "CONNECT\n" + +// "accept-version:1.1\n" + +// "host:localhost\n" + +// headers + +// "\n") +// case x=> throw new RuntimeException("invalid version: %f".format(x)) +// } +// clients ::= c +// c.receive() +// } +// +// def connect(version:String, c: AmqpClient = client, headers:String="", connector:String=null) = { +// val frame = connect_request(version, c, headers, connector) +// frame should startWith("CONNECTED\n") +// frame should include regex("""session:.+?\n""") +// frame should include("version:"+version+"\n") +// c +// } +// +// val receipt_counter = new AtomicLong() +// +// def sync_send(dest:String, body:Any, headers:String="", c:AmqpClient = client) = { +// val rid = receipt_counter.incrementAndGet() +// c.write( +// "SEND\n" + +// "destination:"+dest+"\n" + +// "receipt:"+rid+"\n" + +// headers+ +// "\n" + +// body) +// wait_for_receipt(""+rid, c) +// } +// +// def async_send(dest:String, body:Any, headers:String="", c: AmqpClient = client) = { +// c.write( +// "SEND\n" + +// "destination:"+dest+"\n" + +// headers+ +// "\n" + +// body) +// } +// +// def subscribe(id:String, dest:String, mode:String="auto", persistent:Boolean=false, headers:String="", sync:Boolean=true, c: AmqpClient = client) = { +// val rid = receipt_counter.incrementAndGet() +// c.write( +// "SUBSCRIBE\n" + +// "destination:"+dest+"\n" + +// "id:"+id+"\n" + +// (if(persistent) "persistent:true\n" else "")+ +// "ack:"+mode+"\n"+ +// (if(sync) "receipt:"+rid+"\n" else "") + +// headers+ +// "\n") +// if(sync) { +// wait_for_receipt(""+rid, c) +// } +// } +// +// def unsubscribe(id:String, headers:String="", c: AmqpClient=client) = { +// val rid = receipt_counter.incrementAndGet() +// client.write( +// "UNSUBSCRIBE\n" + +// "id:"+id+"\n" + +// "receipt:"+rid+"\n" + +// headers+ +// "\n") +// wait_for_receipt(""+rid, c) +// } +// +// def assert_received(body:Any, sub:String=null, c: AmqpClient=client):(Boolean)=>Unit = { +// val frame = c.receive() +// frame should startWith("MESSAGE\n") +// if(sub!=null) { +// frame should include ("subscription:"+sub+"\n") +// } +// body match { +// case null => +// case body:scala.util.matching.Regex => frame should endWith regex(body) +// case body => frame should endWith("\n\n"+body) +// } +// // return a func that can ack the message. +// (ack:Boolean)=> { +// val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r +// val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r +// val sub_regex(sub) = frame +// val msgid_regex(msgid) = frame +// c.write( +// (if(ack) "ACK\n" else "NACK\n") + +// "subscription:"+sub+"\n" + +// "message-id:"+msgid+"\n" + +// "\n") +// } +// } +// +// def wait_for_receipt(id:String, c: AmqpClient = client, discard_others:Boolean=false): Unit = { +// if( !discard_others ) { +// val frame = c.receive() +// frame should startWith("RECEIPT\n") +// frame should include("receipt-id:"+id+"\n") +// } else { +// while(true) { +// val frame = c.receive() +// if( frame.startsWith("RECEIPT\n") && frame.indexOf("receipt-id:"+id+"\n")>=0 ) { +// return +// } +// } +// } +// } +} + +import com.swiftmq.amqp.AMQPContext +import com.swiftmq.amqp.v100.client.Connection +import com.swiftmq.amqp.v100.client.QoS +import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue +import com.swiftmq.amqp.v100.types.AMQPString + +object PrintAMQPStream { + def main(args: Array[String]) { + for( arg <- args ) { + println("--------------------------------------------------------") + println(" File: "+arg) + println("--------------------------------------------------------") + using(new FileInputStream(arg)) { is => + val codec = new AMQPProtocolCodec + codec.setReadableByteChannel(is.getChannel) + // codec.skipProtocolHeader() + + var pos = 0L + var frame = codec.read() + var counter = 0 + while( frame !=null ) { + var next_pos = codec.getReadCounter - codec.getReadBytesPendingDecode + counter += 1 + println("@"+pos+" "+frame) + pos = next_pos; + frame = try { + codec.read() + } catch { + case e:java.io.EOFException => null + } + } + } + } + } +} + +class AmqpTest extends AmqpTestSupport { + + test("broker") { + +// val port = 5672 +// val queue = "testqueue" + + val queue = "/queue/testqueue" + + val nMsgs = 1 + val qos = QoS.AT_MOST_ONCE + val ctx = new AMQPContext(AMQPContext.CLIENT); + + try { + + val connection = new Connection(ctx, "127.0.0.1", port, false) + connection.setContainerId("client") + connection.setIdleTimeout(-1) + connection.setMaxFrameSize(1024*4) + connection.setExceptionListener(new ExceptionListener(){ + def onException(e: Exception) { + e.printStackTrace(); + } + }) + connection.connect; + { + var data = "x" * 10 // 1024*20 + + var session = connection.createSession(10, 10) + var p = { + session.createProducer(queue, qos) + } + for (i <- 0 until nMsgs) { + var msg = new com.swiftmq.amqp.v100.messaging.AMQPMessage + var s = "Message #" + (i + 1) + println("Sending " + s) + msg.setAmqpValue(new AmqpValue(new AMQPString(s+", data: "+data))) + p.send(msg) + } + p.close() + session.close() + } + { + var session = connection.createSession(10, 10) + val c = session.createConsumer(queue, 100, qos, true, null); + + // Receive messages non-transacted + for (i <- 0 until nMsgs) + { + val msg = c.receive(); + if (msg == null) + + msg.getAmqpValue().getValue match { + case value:AMQPString => + println("Received: " + value.getValue()); + } + if (!msg.isSettled()) + msg.accept(); + } + c.close() + session.close() + } + connection.close() + } catch { + case e: Exception => { + e.printStackTrace + } + } + + } +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java?rev=1352265&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java Wed Jun 20 18:57:01 2012 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp.dto; + +import org.apache.activemq.apollo.dto.*; +import org.junit.Test; + +import java.io.InputStream; +import java.util.List; + +import static junit.framework.Assert.*; +import static junit.framework.Assert.assertEquals; + + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ + +public class XmlCodecTest { + + private InputStream resource(String path) { + return getClass().getResourceAsStream(path); + } + + @Test + public void unmarshalling() throws Exception { + BrokerDTO dto = XmlCodec.decode(BrokerDTO.class, resource("simple.xml")); + assertNotNull(dto); + + assertEquals(1, dto.connectors.size()); + AcceptingConnectorDTO connector = (AcceptingConnectorDTO)dto.connectors.get(0); + assertEquals(1, connector.protocols.size()); + ProtocolDTO amqp = connector.protocols.get(0); + assertTrue(amqp instanceof AmqpDTO); + assertEquals("JMSXUserID", ((AmqpDTO) amqp).add_user_header); + + List<AddUserHeaderDTO> add_user_headers = ((AmqpDTO) amqp).add_user_headers; + assertEquals(2, add_user_headers.size()); + assertEquals("GroupId", add_user_headers.get(0).name); + assertEquals("UserId", add_user_headers.get(1).name); + assertEquals("UserPrincipal", add_user_headers.get(1).kind); + + } + + +} Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1352265&r1=1352264&r2=1352265&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jun 20 18:57:01 2012 @@ -90,7 +90,7 @@ class VirtualHost(val broker: Broker, va var config:VirtualHostDTO = _ val router:Router = new LocalRouter(this) - var names:List[String] = Nil; + def names:List[String] = config.host_names.toList; var store:Store = null val queue_id_counter = new LongCounter() Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1352265&r1=1352264&r2=1352265&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Jun 20 18:57:01 2012 @@ -183,7 +183,7 @@ class AnyProtocolHandler extends Protoco connection.transport.suspendRead protocol_handler.set_connection(connection); - protocol_handler.on_transport_connected + connection.transport.getTransportListener.onTransportConnected() } override def on_transport_connected = { Modified: activemq/activemq-apollo/trunk/pom.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1352265&r1=1352264&r2=1352265&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/pom.xml (original) +++ activemq/activemq-apollo/trunk/pom.xml Wed Jun 20 18:57:01 2012 @@ -176,6 +176,7 @@ <module>apollo-network</module> <module>apollo-openwire-generator</module> <module>apollo-openwire</module> + <module>apollo-amqp</module> <module>apollo-distro</module> </modules>
