Author: rajith
Date: Fri Jun 15 17:21:19 2012
New Revision: 1350704
URL: http://svn.apache.org/viewvc?rev=1350704&view=rev
Log:
Added a finally block for deleting the C++ objects when close is called
to ensure that the underlying c++ objects don't leak.
Filled in blank methods and another round of bug fixing.
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
Fri Jun 15 17:21:19 2012
@@ -51,8 +51,14 @@ public class CppConnection implements Co
@Override
public void close() throws MessagingException
{
- _cppConn.close();
- _cppConn.delete(); //clean up the c++ object
+ try
+ {
+ _cppConn.close();
+ }
+ finally
+ {
+ _cppConn.delete(); //clean up the c++ object
+ }
}
@Override
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
Fri Jun 15 17:21:19 2012
@@ -18,87 +18,91 @@
package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Session;
public class CppReceiver implements Receiver
{
- org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
-
- public CppReceiver(org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
+ private CppSession _ssn;
+ private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
+
+ public CppReceiver(CppSession ssn,
+ org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
{
+ _ssn = ssn;
_cppReceiver = cppReceiver;
}
@Override
- public Message get(long timeout)
+ public Message get(long timeout) throws MessagingException
{
- org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get();
+ org.apache.qpid.messaging.cpp.jni.Message m =
_cppReceiver.get(CppDuration.getDuration(timeout));
return new TextMessage(m.getContent());
-
+
}
@Override
- public Message fetch(long timeout)
+ public Message fetch(long timeout) throws MessagingException
{
- org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch();
+ org.apache.qpid.messaging.cpp.jni.Message m =
_cppReceiver.fetch(CppDuration.getDuration(timeout));
return new TextMessage(m);
}
@Override
- public void setCapacity(int capacity)
+ public void setCapacity(int capacity) throws MessagingException
{
- // TODO Auto-generated method stub
-
+ _cppReceiver.setCapacity(capacity);
}
@Override
- public int getCapacity()
+ public int getCapacity() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppReceiver.getCapacity();
}
@Override
- public int getAvailable()
+ public int getAvailable() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppReceiver.getAvailable();
}
@Override
- public int getUnsettled()
+ public int getUnsettled() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppReceiver.getUnsettled();
}
@Override
- public void close()
+ public void close() throws MessagingException
{
- // TODO Auto-generated method stub
-
+ try
+ {
+ _cppReceiver.close();
+ }
+ finally
+ {
+ _cppReceiver.delete();
+ }
}
@Override
public boolean isClosed()
{
- // TODO Auto-generated method stub
- return false;
+ return _cppReceiver.isClosed();
}
@Override
public String getName()
{
- // TODO Auto-generated method stub
- return null;
+ return _cppReceiver.getName();
}
@Override
- public Session getSession()
+ public Session getSession() throws MessagingException
{
- // TODO Auto-generated method stub
- return null;
+ _ssn.checkError();
+ return _ssn;
}
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
Fri Jun 15 17:21:19 2012
@@ -18,77 +18,84 @@
package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
public class CppSender implements Sender
{
- org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
-
- public CppSender(org.apache.qpid.messaging.cpp.jni.Sender cppSender)
+ private CppSession _ssn;
+ private org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
+
+ public CppSender(CppSession ssn,
+ org.apache.qpid.messaging.cpp.jni.Sender cppSender)
{
+ _ssn = ssn;
_cppSender = cppSender;
}
@Override
- public void send(Message message, boolean sync)
+ public void send(Message message, boolean sync) throws MessagingException
{
_cppSender.send(((TextMessage)message).getCppMessage(),true);
}
@Override
- public void close()
+ public void close() throws MessagingException
{
- // TODO Auto-generated method stub
-
+ try
+ {
+ _cppSender.close();
+ }
+ finally
+ {
+ _cppSender.delete();
+ }
}
@Override
- public void setCapacity(int capacity)
+ public void setCapacity(int capacity) throws MessagingException
{
- //_cppSender.setCapacity(arg0)
+ _cppSender.setCapacity(capacity);
}
@Override
- public int getCapacity()
+ public int getCapacity() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppSender.getCapacity();
}
@Override
- public int getAvailable()
+ public int getAvailable() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppSender.getAvailable();
}
@Override
- public int getUnsettled()
+ public int getUnsettled() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppSender.getUnsettled();
}
@Override
public boolean isClosed()
{
- // TODO Auto-generated method stub
- return false;
+ // The C++ version does not support it.
+ // Needs to be supported at a higher level.
+ throw new UnsupportedOperationException("Not supported by the
underlying c++ client");
}
@Override
- public String getName()
+ public String getName() throws MessagingException
{
- // TODO Auto-generated method stub
- return null;
+ return _cppSender.getName();
}
@Override
- public Session getSession()
+ public Session getSession() throws MessagingException
{
- // TODO Auto-generated method stub
- return null;
+ _ssn.checkError();
+ return _ssn;
}
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
Fri Jun 15 17:21:19 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.messaging.Messagi
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.cpp.jni.Duration;
/**
* This class relies on the SessionManagementDecorator for
@@ -43,7 +44,6 @@ public class CppSession implements Sessi
_conn = conn;
}
-
@Override
public boolean isClosed()
{
@@ -53,8 +53,14 @@ public class CppSession implements Sessi
@Override
public void close() throws MessagingException
{
- _cppSession.close();
- _cppSession.delete(); // delete c++ object.
+ try
+ {
+ _cppSession.close();
+ }
+ finally
+ {
+ _cppSession.delete(); // delete c++ object.
+ }
}
@Override
@@ -114,36 +120,32 @@ public class CppSession implements Sessi
@Override
public Receiver nextReceiver(long timeout) throws MessagingException
{
- // This is not correct ..need to revist
- return new CppReceiver(_cppSession.nextReceiver(new
org.apache.qpid.messaging.cpp.jni.Duration(timeout)));
+ // This needs to be revisited.
+ return new
CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
}
@Override
public Sender createSender(Address address) throws MessagingException
{
- return new CppSender(_cppSession
- .createSender(new org.apache.qpid.messaging.cpp.jni.Address(
- address.toString())));
+ return new CppSender(this,
_cppSession.createSender(address.toString()));
}
@Override
public Sender createSender(String address) throws MessagingException
{
- return new CppSender(_cppSession.createSender(address));
+ return new CppSender(this,_cppSession.createSender(address));
}
@Override
public Receiver createReceiver(Address address) throws MessagingException
{
- return new CppReceiver(_cppSession
- .createReceiver(new org.apache.qpid.messaging.cpp.jni.Address(
- address.toString())));
+ return new CppReceiver(this,
_cppSession.createReceiver(address.toString()));
}
@Override
public Receiver createReceiver(String address) throws MessagingException
{
- return new CppReceiver(_cppSession.createReceiver(address));
+ return new CppReceiver(this,_cppSession.createReceiver(address));
}
@Override
@@ -165,5 +167,4 @@ public class CppSession implements Sessi
{
_cppSession.checkError();
}
-
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
Fri Jun 15 17:21:19 2012
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
/**
@@ -32,10 +33,12 @@ public interface ConnectionExt extends C
public void removeConnectionStateListener(ConnectionStateListener l)
throws ConnectionException;
- public List<Session> getSessions() throws ConnectionException;
+ public List<SessionExt> getSessions() throws ConnectionException;
public void exception(ConnectionException e);
+ public void recreate() throws MessagingException;
+
/**
* The per connection lock that is used by the connection
* and it's child objects. A single lock is used to prevent
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
Fri Jun 15 17:21:19 2012
@@ -30,6 +30,7 @@ import org.apache.qpid.messaging.Session
import org.apache.qpid.messaging.SessionException;
import org.apache.qpid.messaging.ext.ConnectionExt;
import org.apache.qpid.messaging.ext.ConnectionStateListener;
+import org.apache.qpid.messaging.ext.SessionExt;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public class ConnectionManagementDecorat
private Connection _delegate;
private ConnectionState _state = ConnectionState.UNDEFINED;
private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
- private Map<String, Session> _sessions = new
ConcurrentHashMap<String,Session>();
+ private Map<String, SessionExt> _sessions = new
ConcurrentHashMap<String,SessionExt>();
private ConnectionException _lastException;
private List<ConnectionStateListener> _stateListeners = new
ArrayList<ConnectionStateListener>();
@@ -136,7 +137,7 @@ public class ConnectionManagementDecorat
try
{
if (name == null || name.isEmpty()) { name =
generateSessionName(); }
- Session ssn = new
SessionManagementDecorator(this,_delegate.createSession(name));
+ SessionExt ssn = new
SessionManagementDecorator(this,_delegate.createSession(name));
_sessions.put(name, ssn);
return ssn;
}
@@ -157,7 +158,7 @@ public class ConnectionManagementDecorat
try
{
if (name == null || name.isEmpty()) { name =
generateSessionName(); }
- Session ssn = new
SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
+ SessionExt ssn = new
SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
_sessions.put(name, ssn);
return ssn;
}
@@ -198,10 +199,10 @@ public class ConnectionManagementDecorat
}
@Override
- public List<Session> getSessions() throws ConnectionException
+ public List<SessionExt> getSessions() throws ConnectionException
{
checkClosedAndThrowException();
- return new ArrayList<Session>(_sessions.values());
+ return new ArrayList<SessionExt>(_sessions.values());
}
@Override // Called by the delegate or a a session created by this
connection.
@@ -282,4 +283,11 @@ public class ConnectionManagementDecorat
// TODO add local IP and pid to the beginning;
return _ssnNameGenerator.generate().toString();
}
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
Fri Jun 15 17:21:19 2012
@@ -30,6 +30,9 @@ import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.SessionException;
import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.apache.qpid.messaging.ext.ReceiverExt;
+import org.apache.qpid.messaging.ext.SenderExt;
+import org.apache.qpid.messaging.ext.SessionExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +66,8 @@ import org.slf4j.LoggerFactory;
* <ol>
* <li>The application (normal close)</li>
* <li>By the parent via failover (error)</li>
- * <li>By the connection object, if not failover(error)</li>
+ * <li>By the connection object, if no failover(error)</li>
+ * <li>By itself if it receives and exception (error)</li>
* </ol>
* </i>
*
@@ -71,7 +75,7 @@ import org.slf4j.LoggerFactory;
* For the time being, anytime a session exception is received, the session
will be marked CLOSED.
* We need to revisit this.
*/
-public class SessionManagementDecorator implements Session
+public class SessionManagementDecorator implements SessionExt
{
private static Logger _logger =
LoggerFactory.getLogger(SessionManagementDecorator.class);
@@ -80,8 +84,8 @@ public class SessionManagementDecorator
private ConnectionExt _conn;
private Session _delegate;
SessionState _state = SessionState.UNDEFINED;
- private List<Receiver> _receivers = new ArrayList<Receiver>();
- private List<Sender> _senders = new ArrayList<Sender>();
+ private List<ReceiverExt> _receivers = new ArrayList<ReceiverExt>();
+ private List<SenderExt> _senders = new ArrayList<SenderExt>();
private final Object _connectionLock; // global per connection lock
public SessionManagementDecorator(ConnectionExt conn, Session delegate)
@@ -306,7 +310,7 @@ public class SessionManagementDecorator
checkClosedAndThrowException();
try
{
- Sender sender = _delegate.createSender(address);
+ SenderExt sender = new
SenderManagementDecorator(this,_delegate.createSender(address));
_senders.add(sender);
return sender;
}
@@ -326,7 +330,7 @@ public class SessionManagementDecorator
checkClosedAndThrowException();
try
{
- Sender sender = _delegate.createSender(address);
+ SenderExt sender = new
SenderManagementDecorator(this,_delegate.createSender(address));
_senders.add(sender);
return sender;
}
@@ -346,7 +350,7 @@ public class SessionManagementDecorator
checkClosedAndThrowException();
try
{
- Receiver receiver = _delegate.createReceiver(address);
+ ReceiverExt receiver = new
ReceiverManagementDecorator(this,_delegate.createReceiver(address));
_receivers.add(receiver);
return receiver;
}
@@ -366,7 +370,7 @@ public class SessionManagementDecorator
checkClosedAndThrowException();
try
{
- Receiver receiver = _delegate.createReceiver(address);
+ ReceiverExt receiver = new
ReceiverManagementDecorator(this,_delegate.createReceiver(address));
_receivers.add(receiver);
return receiver;
}
@@ -412,6 +416,32 @@ public class SessionManagementDecorator
}
}
+ @Override
+ public void exception(MessagingException e)
+ {
+ if (e instanceof ConnectionException)
+ {
+ handleConnectionException((ConnectionException)e);
+ }
+ else if (e instanceof SessionException)
+ {
+ handleSessionException((SessionException)e);
+ }
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ConnectionExt getConnectionExt()
+ {
+ return _conn;
+ }
+
private void checkClosedAndThrowException() throws SessionException
{
checkClosedAndThrowException("Session is closed. You cannot invoke
methods on a closed sesion");
@@ -470,7 +500,15 @@ public class SessionManagementDecorator
{
synchronized (_connectionLock)
{
- _state = SessionState.CLOSED;
+ try
+ {
+ close();
+ }
+ catch(MessagingException ex)
+ {
+ // Should not throw an exception here.
+ // Even if it did, does't matter as are closing.
+ }
}
return new SessionException("Session has been closed",e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]