Author: rgodfrey
Date: Sat Oct 13 13:06:54 2012
New Revision: 1397825
URL: http://svn.apache.org/viewvc?rev=1397825&view=rev
Log:
PROTON-72 : [Proton-j] correctly allocate channels
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1397825&r1=1397824&r2=1397825&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Sat Oct 13 13:06:54 2012
@@ -715,13 +715,9 @@ public class TransportImpl extends Endpo
TransportSession transportSession =
getTransportState(session);
if(session.getLocalState() != EndpointState.UNINITIALIZED
&& !transportSession.beginSent())
{
- int channelId = allocateLocalChannel();
- transportSession.setLocalChannel(channelId);
- _localSessions[channelId] = transportSession;
-
+ int channelId = allocateLocalChannel(transportSession);
Begin begin = new Begin();
-
if(session.getRemoteState() !=
EndpointState.UNINITIALIZED)
{
begin.setRemoteChannel(UnsignedShort.valueOf((short)
transportSession.getRemoteChannel()));
@@ -768,9 +764,26 @@ public class TransportImpl extends Endpo
return transportLink;
}
- private int allocateLocalChannel()
+ private int allocateLocalChannel(TransportSession transportSession)
{
- return 0; //TODO - Implement
+ for( int i=0; i < _localSessions.length; i++)
+ {
+ if( _localSessions[i] == null )
+ {
+ _localSessions[i] = transportSession;
+ transportSession.setLocalChannel(i);
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private int freeLocalChannel(TransportSession transportSession)
+ {
+ final int channel = transportSession.getLocalChannel();
+ _localSessions[channel] = null;
+ transportSession.freeLocalChannel();
+ return channel;
}
private int processEnd(WritableBuffer buffer)
@@ -789,17 +802,11 @@ public class TransportImpl extends Endpo
&& (transportSession =
session.getTransportSession()).isLocalChannelSet()
&& !hasSendableMessages(session))
{
- int channel = transportSession.getLocalChannel();
- transportSession.freeLocalChannel();
- _localSessions[channel] = null;
-
-
+ int channel = freeLocalChannel(transportSession);
End end = new End();
-
int frameBytes = writeFrame(buffer, channel, end, null,
null);
written += frameBytes;
endpoint.clearModified();
-
}
endpoint = endpoint.transportNext();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]