Author: aidan Date: Tue Apr 21 16:04:55 2009 New Revision: 767201 URL: http://svn.apache.org/viewvc?rev=767201&view=rev Log: QPID-1823: Allow recycling of channel IDs
AMQConnection.getNextChannelID: add method to abstract channel id assignment, allow max to be set AMQConnectionDelegate*: add getMaxChannelID AMQConnectionDelegate_0_10: use getNextChannelID for this session-id SessionCreateTest: add test that attempts to create 65555 sessions on one connection AMQConnectionTest: add unit test for getNextChannelID SessionCreateTest takes a long, long time to run so is excluded by default Added: qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java Modified: qpid/branches/0.5-release/qpid/java/010ExcludeList qpid/branches/0.5-release/qpid/java/08ExcludeList qpid/branches/0.5-release/qpid/java/08ExcludeList-nonvm qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Modified: qpid/branches/0.5-release/qpid/java/010ExcludeList URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/010ExcludeList?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/010ExcludeList (original) +++ qpid/branches/0.5-release/qpid/java/010ExcludeList Tue Apr 21 16:04:55 2009 @@ -69,3 +69,7 @@ //QPID-1818 : 0-10 Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* + +// QPID-1823: this takes ages to run +org.apache.qpid.client.SessionCreateTest#* + Modified: qpid/branches/0.5-release/qpid/java/08ExcludeList URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/08ExcludeList?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/08ExcludeList (original) +++ qpid/branches/0.5-release/qpid/java/08ExcludeList Tue Apr 21 16:04:55 2009 @@ -10,3 +10,6 @@ //QPID-1818 : Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* + +// QPID-1823: this takes ages to run +org.apache.qpid.client.SessionCreateTest#* Modified: qpid/branches/0.5-release/qpid/java/08ExcludeList-nonvm URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/08ExcludeList-nonvm?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/08ExcludeList-nonvm (original) +++ qpid/branches/0.5-release/qpid/java/08ExcludeList-nonvm Tue Apr 21 16:04:55 2009 @@ -36,3 +36,6 @@ org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* //QPID-1818 : Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* +// QPID-1823: this takes ages to run +org.apache.qpid.client.SessionCreateTest#* + Modified: qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original) +++ qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Apr 21 16:04:55 2009 @@ -90,6 +90,9 @@ private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private AtomicInteger _idFactory = new AtomicInteger(0); + private int _maxChannelID; + private boolean _cycledIds; public AMQSession get(int channelId) { @@ -179,11 +182,57 @@ _fastAccessSessions[i] = null; } } + + /* + * Synchronized on whole method so that we don't need to consider the + * increment-then-reset path in too much detail + */ + public synchronized int getNextChannelId() + { + int id = 0; + if (!_cycledIds) + { + id = _idFactory.incrementAndGet(); + if (id == _maxChannelID) + { + _cycledIds = true; + _idFactory.set(0); // Go back to the start + } + } + else + { + boolean done = false; + while (!done) + { + // Needs to work second time through + id = _idFactory.incrementAndGet(); + if (id > _maxChannelID) + { + _idFactory.set(0); + id = _idFactory.incrementAndGet(); + } + if ((id & FAST_CHANNEL_ACCESS_MASK) == 0) + { + done = (_fastAccessSessions[id] == null); + } + else + { + done = (!_slowAccessSessions.keySet().contains(id)); + } + } + } + + return id; + } + + public void setMaxChannelID(int maxChannelID) + { + _maxChannelID = maxChannelID; + } } private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); - protected AtomicInteger _idFactory = new AtomicInteger(0); /** * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be @@ -415,6 +464,7 @@ { _delegate = new AMQConnectionDelegate_0_10(this); } + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); if (_logger.isInfoEnabled()) { @@ -567,6 +617,7 @@ Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); + _sessions.setMaxChannelID(_delegate.getMaxChannelID()); } catch (ClassNotFoundException e) { @@ -1395,7 +1446,7 @@ _sessions.put(channelId, session); } - void deregisterSession(int channelId) + public void deregisterSession(int channelId) { _sessions.remove(channelId); } @@ -1540,4 +1591,9 @@ { _delegate.setIdleTimeout(l); } + + public int getNextChannelID() + { + return _sessions.getNextChannelId(); + } } Modified: qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original) +++ qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Tue Apr 21 16:04:55 2009 @@ -50,4 +50,6 @@ <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; void setIdleTimeout(long l); + + int getMaxChannelID(); } Modified: qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original) +++ qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Apr 21 16:04:55 2009 @@ -79,7 +79,7 @@ throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); AMQSession session; try { @@ -105,7 +105,7 @@ public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { _conn.checkNotClosed(); - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); XASessionImpl session; try { @@ -284,4 +284,10 @@ { _qpidConnection.setIdleTimeout(l); } + + @Override + public int getMaxChannelID() + { + return Integer.MAX_VALUE; + } } Modified: qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original) +++ qpid/branches/0.5-release/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Tue Apr 21 16:04:55 2009 @@ -138,7 +138,7 @@ { public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - int channelId = _conn._idFactory.incrementAndGet(); + int channelId = _conn.getNextChannelID(); if (_logger.isDebugEnabled()) { @@ -289,4 +289,10 @@ } public void setIdleTimeout(long l){} + + @Override + public int getMaxChannelID() + { + return (int) (Math.pow(2, 16)-1); + } } Added: qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java?rev=767201&view=auto ============================================================================== --- qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java (added) +++ qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java Tue Apr 21 16:04:55 2009 @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.naming.Context; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class to check that session creation on a connection has no accidental limit + */ +public class SessionCreateTest extends QpidTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); + + Context _context; + + private Connection _clientConnection; + protected int maxSessions = 65555; + + public void testSessionCreationLimit() throws Exception + { + // Create Client + _clientConnection = getConnection("guest", "guest"); + + _clientConnection.start(); + + for (int i=0; i < maxSessions; i++) + { + Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(sess); + sess.close(); + System.out.println("created session: " + i); + } + + _clientConnection.close(); + + } + +} \ No newline at end of file Modified: qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=767201&r1=767200&r2=767201&view=diff ============================================================================== --- qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original) +++ qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Tue Apr 21 16:04:55 2009 @@ -30,6 +30,7 @@ import javax.jms.TopicSession; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionDelegate_0_10; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; @@ -244,6 +245,24 @@ } } + public void testGetChannelID() + { + int maxChannelID = 65536; + if (isBroker010()) + { + maxChannelID = Integer.MAX_VALUE+1; + } + for (int j = 0; j < 3; j++) + { + for (int i = 1; i < maxChannelID; i++) + { + int id = _connection.getNextChannelID(); + assertEquals("On iterartion "+j, i, id); + _connection.deregisterSession(id); + } + } + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(AMQConnectionTest.class); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org