Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java?rev=1405942&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java Mon Nov 5 20:32:47 2012 @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.apollo.amqp.hawtdispatch; + +import org.apache.qpid.proton.codec.WritableBuffer; + +import java.nio.ByteBuffer; + +public class DroppingWritableBuffer implements WritableBuffer +{ + int pos = 0; + + @Override + public boolean hasRemaining() { + return true; + } + + @Override + public void put(byte b) { + pos += 1; + } + + @Override + public void putFloat(float f) { + pos += 4; + } + + @Override + public void putDouble(double d) { + pos += 8; + } + + @Override + public void put(byte[] src, int offset, int length) { + pos += length; + } + + @Override + public void putShort(short s) { + pos += 2; + } + + @Override + public void putInt(int i) { + pos += 4; + } + + @Override + public void putLong(long l) { + pos += 8; + } + + @Override + public int remaining() { + return Integer.MAX_VALUE - pos; + } + + @Override + public int position() { + return pos; + } + + @Override + public void position(int position) { + pos = position; + } + + @Override + public void put(ByteBuffer payload) { + pos += payload.remaining(); + payload.position(payload.limit()); + } +}
Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala (from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala&r1=1405941&r2=1405942&rev=1405942&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala Mon Nov 5 20:32:47 2012 @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.apollo.amqp +package org.apache.activemq.apollo.amqp.test import com.swiftmq.amqp.AMQPContext import com.swiftmq.amqp.v100.client.Connection @@ -94,4 +94,4 @@ class SwiftMQClientTest extends AmqpTest } } -} \ No newline at end of file +} Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties (from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index&r1=1405941&r2=1405942&rev=1405942&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties Mon Nov 5 20:32:47 2012 @@ -5,13 +5,16 @@ ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at -## +## ## http://www.apache.org/licenses/LICENSE-2.0 -## +## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -org.apache.activemq.apollo.amqp.AmqpProtocolFactory \ No newline at end of file + +# This config file is used by the joram jms tests. +# +jms.provider.admin.class=org.apache.activemq.apollo.amqp.joram.ApolloAdmin \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java?rev=1405942&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java Mon Nov 5 20:32:47 2012 @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.apollo.amqp.hawtdispatch; + +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Task; +import org.fusesource.hawtdispatch.transport.*; +import org.junit.After; +import org.junit.Test; + +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.fusesource.hawtdispatch.Dispatch.NOOP; +import static org.fusesource.hawtdispatch.Dispatch.createQueue; +import static org.junit.Assert.*; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class TransportConnectionTest { + + // Using a single dispatch queue for everything should simplify debugging issues. + final DispatchQueue queue = createQueue(); + + @Test + public void testOpenSession() throws Exception { + final CountDownLatch done = new CountDownLatch(1); + + // Setup a little server... + final TcpTransportServer server = startServer(new AmqpListener() { + public void proccessSessionOpen(Session session, Task onComplete) { + System.out.println("session opened.."); + session.open(); + done.countDown(); + } + }); + final String address = server.getBoundAddress(); + + // Start a client.. + queue.execute(new Task() { + public void run() { + try { + System.out.println("Creating a client connection."); + AmqpConnection c = startClient(address); + Session session = c.getProtonConnection().session(); + session.open(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + assertTrue(done.await(3, TimeUnit.SECONDS)); + } + + final ArrayList<AmqpConnection> clients = new ArrayList<AmqpConnection>(); + synchronized private AmqpConnection startClient(String address) throws Exception { + TcpTransport transport = new TcpTransport(); + transport.setDispatchQueue(queue); + transport.connecting(new URI(address), null); + final AmqpConnection clientConnection = new AmqpConnection(); + clientConnection.setListener(new AmqpListener(){ + @Override + public void processTransportConnected() { + clientConnection.pumpOut(); + } + }); + clientConnection.bind(transport); + clients.add(clientConnection); + clientConnection.start(NOOP); + clientConnection.getProtonConnection().open(); + return clientConnection; + } + + @After + synchronized public void stopClients() throws Exception { + for (AmqpConnection client : clients) { + stop(client); + } + clients.clear(); + } + + final ArrayList<TransportServer> servers = new ArrayList<TransportServer>(); + synchronized protected TcpTransportServer startServer(final AmqpListener serverHandler) throws Exception { + final TcpTransportServer server = new TcpTransportServer(new URI("tcp://localhost:0")); + server.setDispatchQueue(queue); + server.setTransportServerListener(new TransportServerListener() { + public void onAccept(Transport transport) throws Exception { + System.out.println("Server accepted a client connection."); + transport.setDispatchQueue(queue); + AmqpConnection serverConnection = new AmqpConnection(); + serverConnection.bind(transport); + serverConnection.setListener(serverHandler); + serverConnection.start(NOOP); + } + + public void onAcceptError(Exception error) { + error.printStackTrace(); + } + }); + start(server); + servers.add(server); + return server; + } + + @After + synchronized public void stopServers() throws Exception { + for (TransportServer server : servers) { + stop(server); + } + servers.clear(); + } + + private void start(TransportServer transport) throws Exception { + final CountDownLatch done = new CountDownLatch(1); + transport.start(new Task() { + @Override + public void run() { + done.countDown(); + } + }); + done.await(); + } + private void stop(TransportServer transport) throws Exception { + final CountDownLatch done = new CountDownLatch(1); + transport.stop(new Task() { + @Override + public void run() { + done.countDown(); + } + }); + done.await(); + } + + private void start(AmqpConnection transport) throws Exception { + final CountDownLatch done = new CountDownLatch(1); + transport.start(new Task() { + @Override + public void run() { + done.countDown(); + } + }); + done.await(); + } + private void stop(AmqpConnection transport) throws Exception { + final CountDownLatch done = new CountDownLatch(1); + transport.stop(new Task() { + @Override + public void run() { + done.countDown(); + } + }); + done.await(); + } +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java?rev=1405942&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java Mon Nov 5 20:32:47 2012 @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp.joram; + +import org.apache.activemq.apollo.broker.Broker; +import org.apache.activemq.apollo.broker.BrokerFactory; +import org.apache.activemq.apollo.util.ServiceControl; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; +import org.objectweb.jtests.jms.admin.Admin; + +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.util.Hashtable; +import java.util.logging.*; + +/** + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class ApolloAdmin implements Admin { + + Context context; + { + // enableJMSFrameTracing(); + try { + // Use the jetty JNDI context since it's mutable. + final Hashtable<String, String> env = new Hashtable<String, String>(); + env.put("java.naming.factory.initial", "org.eclipse.jetty.jndi.InitialContextFactory"); + env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi");; + context = new InitialContext(env); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + static public void enableJMSFrameTracing() throws FileNotFoundException { + final SimpleFormatter formatter = new SimpleFormatter(); + final PrintStream out = new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt"))); + Handler handler = new Handler() { + @Override + public void publish(LogRecord r) { + out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage())); + } + + @Override + public void flush() { + out.flush(); + } + + @Override + public void close() throws SecurityException { + } + }; + + Logger log = Logger.getLogger("FRM"); + log.addHandler(handler); + log.setLevel(Level.FINEST); + } + + public String brokerConfig = "xml:classpath:apollo-amqp.xml"; + + public String getName() { + return getClass().getName(); + } + + static Broker broker; + static int port; + + public void startServer() throws Exception { +// if( broker!=null ) { +// stopServer(); +// } + broker = createBroker(); + ServiceControl.start(broker); + port = ((InetSocketAddress) broker.get_socket_address()).getPort(); + } + + protected Broker createBroker() throws Exception { + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + return BrokerFactory.createBroker(brokerConfig); + } + + public void stopServer() throws Exception { + ServiceControl.stop(broker); + broker = null; + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + public Context createContext() throws NamingException { + return context; + } + + public void createQueue(String name) { + try { + context.bind(name, new QueueImpl("/queue/"+name)); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + public void createTopic(String name) { + try { + context.bind(name, new TopicImpl("/topic/"+name)); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + public void deleteQueue(String name) { + // BrokerTestSupport.delete_queue((Broker)base.broker, name); + try { + context.unbind(name); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + public void deleteTopic(String name) { + try { + context.unbind(name); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + public void createConnectionFactory(String name) { + try { + final ConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, null, null); + context.bind(name, factory); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + public void deleteConnectionFactory(String name) { + try { + context.unbind(name); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + public void createQueueConnectionFactory(String name) { + createConnectionFactory(name); + } + public void createTopicConnectionFactory(String name) { + createConnectionFactory(name); + } + public void deleteQueueConnectionFactory(String name) { + deleteConnectionFactory(name); + } + public void deleteTopicConnectionFactory(String name) { + deleteConnectionFactory(name); + } + +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java?rev=1405942&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java Mon Nov 5 20:32:47 2012 @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp.joram; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; +import org.objectweb.jtests.jms.conform.connection.ConnectionTest; +import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest; +import org.objectweb.jtests.jms.conform.message.MessageBodyTest; +import org.objectweb.jtests.jms.conform.message.MessageDefaultTest; +import org.objectweb.jtests.jms.conform.message.MessageTypeTest; +import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest; +import org.objectweb.jtests.jms.conform.message.properties.JMSXPropertyTest; +import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest; +import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest; +import org.objectweb.jtests.jms.conform.queue.QueueBrowserTest; +import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest; +import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest; +import org.objectweb.jtests.jms.conform.selector.SelectorTest; +import org.objectweb.jtests.jms.conform.session.QueueSessionTest; +import org.objectweb.jtests.jms.conform.session.SessionTest; +import org.objectweb.jtests.jms.conform.session.TopicSessionTest; +import org.objectweb.jtests.jms.conform.session.UnifiedSessionTest; +import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest; + +import java.io.File; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class JoramJmsTest extends TestCase { + + public static Test suite() { + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + + TestSuite suite = new TestSuite(); + + // TODO: Fix these tests.. + if (false) { + // Fails due to durable subs not being implemented. + suite.addTestSuite(TopicSessionTest.class); + // Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl vs QueueImpl mapping issues + suite.addTestSuite(MessageHeaderTest.class); + // Fails due to inconsistent Message mapping in the JMS client. + suite.addTestSuite(MessageTypeTest.class); + suite.addTestSuite(QueueBrowserTest.class); + + } + + // TODO: enable once QPID 0.19 is released + if(false) { + suite.addTestSuite(UnifiedSessionTest.class); + suite.addTestSuite(TemporaryTopicTest.class); + suite.addTestSuite(TopicConnectionTest.class); + } + + if( false ) { + suite.addTestSuite(SelectorSyntaxTest.class); + suite.addTestSuite(QueueSessionTest.class); + suite.addTestSuite(SelectorTest.class); + suite.addTestSuite(TemporaryQueueTest.class); + suite.addTestSuite(SessionTest.class); + } + + // Passing tests + if( false ) { + suite.addTestSuite(ConnectionTest.class); + suite.addTestSuite(JMSXPropertyTest.class); + suite.addTestSuite(MessageBodyTest.class); + suite.addTestSuite(MessageDefaultTest.class); + suite.addTestSuite(MessagePropertyConversionTest.class); + suite.addTestSuite(MessagePropertyTest.class); + } + + return suite; + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala?rev=1405942&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala Mon Nov 5 20:32:47 2012 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp.test + +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterEach +import org.apache.activemq.apollo.broker._ +import org.junit.Test +import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl} +import javax.jms._ + +class AmqpTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach { + override def broker_config_uri = "xml:classpath:apollo-amqp.xml" +} + Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala?rev=1405942&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala Mon Nov 5 20:32:47 2012 @@ -0,0 +1,95 @@ +package org.apache.activemq.apollo.amqp.test + +import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl} +import javax.jms._ + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ + +class QpidJmsTest extends AmqpTestSupport { + + def createConnection: Connection = { + val factory = new ConnectionFactoryImpl("localhost", port, "admin", "password") + val connection = factory.createConnection + connection.setExceptionListener(new ExceptionListener { + def onException(exception: JMSException) { + exception.printStackTrace + } + }) + connection.start + return connection + } + + +// test("browse") { +// val queue = new QueueImpl("queue://txqueue") +// val connection = createConnection +// val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) +// val p = session.createProducer(queue) +// val msg = session.createTextMessage("Hello World") +// msg.setObjectProperty("x", 1) +// p.send(msg) +// val browser = session.createBrowser(queue) +// val enumeration = browser.getEnumeration +// while (enumeration.hasMoreElements) { +// System.out.println("BROWSE " + enumeration.nextElement) +// } +// connection.close +// } + + test("Send Nack Receive") { + val queue = new QueueImpl("/queue/testqueue") + val nMsgs = 1 + val dataFormat: String = "%01024d" + + var connection = createConnection + var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val p = session.createProducer(queue) + var i = 0 + while (i < nMsgs) { + System.out.println("Sending " + i) + p.send(session.createTextMessage(dataFormat.format(i))) + i += 1 + } + connection.close + + System.out.println("=======================================================================================") + System.out.println(" failing a receive ") + System.out.println("=======================================================================================") + connection = createConnection + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE) + var c = session.createConsumer(queue) + i = 0 + while (i < 1) { + val msg: TextMessage = c.receive.asInstanceOf[TextMessage] + if (msg != null) { + val s: String = msg.getText + s should be(dataFormat.format(i)) + System.out.println("Received: " + i) + i += 1 + } + } + connection.close + + System.out.println("=======================================================================================") + System.out.println(" receiving ") + System.out.println("=======================================================================================") + connection = createConnection + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + c = session.createConsumer(queue) + i = 0 + while (i < nMsgs) { + val msg = c.receive.asInstanceOf[TextMessage] + if (msg != null) { + val s = msg.getText + s should be(dataFormat.format(i)) + System.out.println("Received: " + i) + i += 1 + } + } + connection.close + } + + +} \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1405942&r1=1405941&r2=1405942&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Mon Nov 5 20:32:47 2012 @@ -905,47 +905,63 @@ class QueueEntry(val queue:Queue, val se } override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+", size: "+size+"}" } - override def swap_in(space:MemorySpace) = { + override def swap_in(space:MemorySpace):Unit = { if( !loading ) { loading = true - queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records => - queue.dispatch_queue { - loading = false - assert(isLinked) - - var item_count=0 - var size_count=0 - - val tmpList = new LinkedNodeList[QueueEntry]() - records.foreach { record => - val entry = new QueueEntry(queue, record.entry_seq).init(record) - tmpList.addLast(entry) - item_count += 1 - size_count += record.size - } - // we may need to adjust the enqueue count if entries - // were dropped at the store level - var item_delta = (count - item_count) - val size_delta: Int = size - size_count + def complete_load(attempt_counter:Int, records:Seq[QueueEntryRecord]):Unit = { + assert(isLinked) + + var item_count=0 + var size_count=0 + + val tmpList = new LinkedNodeList[QueueEntry]() + records.foreach { record => + val entry = new QueueEntry(queue, record.entry_seq).init(record) + tmpList.addLast(entry) + item_count += 1 + size_count += record.size + } - if ( item_delta!=0 || size_delta!=0 ) { + // we may need to adjust the enqueue count if entries + // were dropped at the store level + var item_delta = (count - item_count) + val size_delta: Int = size - size_count + + if ( item_delta!=0 || size_delta!=0 ) { + if ( attempt_counter < 10) { + warn("Retrying "+attempt_counter+" load do to Queue '%s' detected store change in range [%d:%d]. %d message(s) and %d bytes", queue.id, seq, last, item_delta, size_delta) + attempt_load(attempt_counter+1) + return + } else { warn("Queue '%s' detected store change in range [%d:%d]. %d message(s) and %d bytes", queue.id, seq, last, item_delta, size_delta) queue.enqueue_item_counter += item_delta queue.enqueue_size_counter += size_delta } + } else if( attempt_counter > 1 ) { + warn("Recoved!!!! @ "+attempt_counter) + } - linkAfter(tmpList) - val next = getNext + loading = false + linkAfter(tmpList) + val next = getNext + + // move the subs to the first entry that we just loaded. + parked.foreach(_.advance(next)) + next :::= parked + queue.trigger_swap - // move the subs to the first entry that we just loaded. - parked.foreach(_.advance(next)) - next :::= parked - queue.trigger_swap + unlink + } - unlink + def attempt_load(attempt_counter:Int):Unit = { + queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records => + queue.dispatch_queue { + complete_load(attempt_counter, records) + } } } + attempt_load(1) } }
