Author: rajith
Date: Fri Jun 15 17:21:19 2012
New Revision: 1350704

URL: http://svn.apache.org/viewvc?rev=1350704&view=rev
Log:
Added a finally block for deleting the C++ objects when close is called
to ensure that the underlying c++ objects don't leak.
Filled in blank methods and another round of bug fixing.

Modified:
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
 Fri Jun 15 17:21:19 2012
@@ -51,8 +51,14 @@ public class CppConnection implements Co
     @Override
     public void close() throws MessagingException
     {
-        _cppConn.close();
-        _cppConn.delete(); //clean up the c++ object
+        try
+        {
+            _cppConn.close();
+        }
+        finally
+        {
+            _cppConn.delete(); //clean up the c++ object
+        }
     }
 
     @Override

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
 Fri Jun 15 17:21:19 2012
@@ -18,87 +18,91 @@
 package org.apache.qpid.messaging.cpp;
 
 import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
 import org.apache.qpid.messaging.Receiver;
 import org.apache.qpid.messaging.Session;
 
 public class CppReceiver implements Receiver
 {
-    org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
-    
-    public CppReceiver(org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
+    private CppSession _ssn;
+    private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
+
+    public CppReceiver(CppSession ssn,
+            org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
     {
+        _ssn = ssn;
         _cppReceiver = cppReceiver;
     }
 
     @Override
-    public Message get(long timeout)
+    public Message get(long timeout) throws MessagingException
     {
-        org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get();
+        org.apache.qpid.messaging.cpp.jni.Message m = 
_cppReceiver.get(CppDuration.getDuration(timeout));
         return new TextMessage(m.getContent());
-        
+
     }
 
     @Override
-    public Message fetch(long timeout)
+    public Message fetch(long timeout) throws MessagingException
     {
-        org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch();
+        org.apache.qpid.messaging.cpp.jni.Message m = 
_cppReceiver.fetch(CppDuration.getDuration(timeout));
         return new TextMessage(m);
     }
 
     @Override
-    public void setCapacity(int capacity)
+    public void setCapacity(int capacity) throws MessagingException
     {
-        // TODO Auto-generated method stub
-
+        _cppReceiver.setCapacity(capacity);
     }
 
     @Override
-    public int getCapacity()
+    public int getCapacity() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return 0;
+        return _cppReceiver.getCapacity();
     }
 
     @Override
-    public int getAvailable()
+    public int getAvailable() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return 0;
+        return _cppReceiver.getAvailable();
     }
 
     @Override
-    public int getUnsettled()
+    public int getUnsettled() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return 0;
+        return _cppReceiver.getUnsettled();
     }
 
     @Override
-    public void close()
+    public void close() throws MessagingException
     {
-        // TODO Auto-generated method stub
-
+        try
+        {
+            _cppReceiver.close();
+        }
+        finally
+        {
+            _cppReceiver.delete();
+        }
     }
 
     @Override
     public boolean isClosed()
     {
-        // TODO Auto-generated method stub
-        return false;
+        return _cppReceiver.isClosed();
     }
 
     @Override
     public String getName()
     {
-        // TODO Auto-generated method stub
-        return null;
+        return _cppReceiver.getName();
     }
 
     @Override
-    public Session getSession()
+    public Session getSession() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return null;
+        _ssn.checkError();
+        return _ssn;
     }
 
 }

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
 Fri Jun 15 17:21:19 2012
@@ -18,77 +18,84 @@
 package org.apache.qpid.messaging.cpp;
 
 import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
 import org.apache.qpid.messaging.Sender;
 import org.apache.qpid.messaging.Session;
 
 public class CppSender implements Sender
 {
-    org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
-    
-    public CppSender(org.apache.qpid.messaging.cpp.jni.Sender cppSender)
+    private CppSession _ssn;
+    private org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
+
+    public CppSender(CppSession ssn,
+            org.apache.qpid.messaging.cpp.jni.Sender cppSender)
     {
+        _ssn = ssn;
         _cppSender = cppSender;
     }
 
     @Override
-    public void send(Message message, boolean sync)
+    public void send(Message message, boolean sync) throws MessagingException
     {
         _cppSender.send(((TextMessage)message).getCppMessage(),true);
     }
 
     @Override
-    public void close()
+    public void close() throws MessagingException
     {
-        // TODO Auto-generated method stub
-
+        try
+        {
+            _cppSender.close();
+        }
+        finally
+        {
+            _cppSender.delete();
+        }
     }
 
     @Override
-    public void setCapacity(int capacity)
+    public void setCapacity(int capacity) throws MessagingException
     {
-        //_cppSender.setCapacity(arg0)
+        _cppSender.setCapacity(capacity);
     }
 
     @Override
-    public int getCapacity()
+    public int getCapacity() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return 0;
+        return _cppSender.getCapacity();
     }
 
     @Override
-    public int getAvailable()
+    public int getAvailable() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return 0;
+        return _cppSender.getAvailable();
     }
 
     @Override
-    public int getUnsettled()
+    public int getUnsettled() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return 0;
+        return _cppSender.getUnsettled();
     }
 
     @Override
     public boolean isClosed()
     {
-        // TODO Auto-generated method stub
-        return false;
+        // The C++ version does not support it.
+        // Needs to be supported at a higher level.
+        throw new UnsupportedOperationException("Not supported by the 
underlying c++ client");
     }
 
     @Override
-    public String getName()
+    public String getName() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return null;
+        return _cppSender.getName();
     }
 
     @Override
-    public Session getSession()
+    public Session getSession() throws MessagingException
     {
-        // TODO Auto-generated method stub
-        return null;
+        _ssn.checkError();
+        return _ssn;
     }
 
 }

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
 Fri Jun 15 17:21:19 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.messaging.Messagi
 import org.apache.qpid.messaging.Receiver;
 import org.apache.qpid.messaging.Sender;
 import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.cpp.jni.Duration;
 
 /**
  *  This class relies on the SessionManagementDecorator for
@@ -43,7 +44,6 @@ public class CppSession implements Sessi
         _conn = conn;
     }
 
-
     @Override
     public boolean isClosed()
     {
@@ -53,8 +53,14 @@ public class CppSession implements Sessi
     @Override
     public void close() throws MessagingException
     {
-        _cppSession.close();
-        _cppSession.delete(); // delete c++ object.
+        try
+        {
+            _cppSession.close();
+        }
+        finally
+        {
+            _cppSession.delete(); // delete c++ object.
+        }
     }
 
     @Override
@@ -114,36 +120,32 @@ public class CppSession implements Sessi
     @Override
     public Receiver nextReceiver(long timeout) throws MessagingException
     {
-        // This is not correct ..need to revist
-        return new CppReceiver(_cppSession.nextReceiver(new 
org.apache.qpid.messaging.cpp.jni.Duration(timeout)));
+        // This needs to be revisited.
+        return new 
CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
     }
 
     @Override
     public Sender createSender(Address address) throws MessagingException
     {        
-        return new CppSender(_cppSession
-                .createSender(new org.apache.qpid.messaging.cpp.jni.Address(
-                        address.toString())));
+        return new CppSender(this, 
_cppSession.createSender(address.toString()));
     }
 
     @Override
     public Sender createSender(String address) throws MessagingException
     {
-        return new CppSender(_cppSession.createSender(address));
+        return new CppSender(this,_cppSession.createSender(address));
     }
 
     @Override
     public Receiver createReceiver(Address address) throws MessagingException
     {
-        return new CppReceiver(_cppSession
-                .createReceiver(new org.apache.qpid.messaging.cpp.jni.Address(
-                        address.toString())));
+        return new CppReceiver(this, 
_cppSession.createReceiver(address.toString()));
     }
 
     @Override
     public Receiver createReceiver(String address) throws MessagingException
     {
-        return new CppReceiver(_cppSession.createReceiver(address));
+        return new CppReceiver(this,_cppSession.createReceiver(address));
     }
 
     @Override
@@ -165,5 +167,4 @@ public class CppSession implements Sessi
     {
         _cppSession.checkError();
     }
-
 }

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
 Fri Jun 15 17:21:19 2012
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.qpid.messaging.Connection;
 import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessagingException;
 import org.apache.qpid.messaging.Session;
 
 /**
@@ -32,10 +33,12 @@ public interface ConnectionExt extends C
 
     public void removeConnectionStateListener(ConnectionStateListener l) 
throws ConnectionException;
 
-    public List<Session> getSessions() throws ConnectionException;
+    public List<SessionExt> getSessions() throws ConnectionException;
 
     public void exception(ConnectionException e);
 
+    public void recreate() throws MessagingException;
+
     /**
      *  The per connection lock that is used by the connection
      *  and it's child objects. A single lock is used to prevent

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
 Fri Jun 15 17:21:19 2012
@@ -30,6 +30,7 @@ import org.apache.qpid.messaging.Session
 import org.apache.qpid.messaging.SessionException;
 import org.apache.qpid.messaging.ext.ConnectionExt;
 import org.apache.qpid.messaging.ext.ConnectionStateListener;
+import org.apache.qpid.messaging.ext.SessionExt;
 import org.apache.qpid.util.UUIDGen;
 import org.apache.qpid.util.UUIDs;
 import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public class ConnectionManagementDecorat
     private Connection _delegate;
     private ConnectionState _state = ConnectionState.UNDEFINED;
     private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
-    private Map<String, Session> _sessions = new 
ConcurrentHashMap<String,Session>();
+    private Map<String, SessionExt> _sessions = new 
ConcurrentHashMap<String,SessionExt>();
     private ConnectionException _lastException;
     private List<ConnectionStateListener> _stateListeners = new 
ArrayList<ConnectionStateListener>();
 
@@ -136,7 +137,7 @@ public class ConnectionManagementDecorat
         try
         {
             if (name == null || name.isEmpty()) { name = 
generateSessionName(); }
-            Session ssn =  new 
SessionManagementDecorator(this,_delegate.createSession(name));
+            SessionExt ssn =  new 
SessionManagementDecorator(this,_delegate.createSession(name));
             _sessions.put(name, ssn);
             return ssn;
         }
@@ -157,7 +158,7 @@ public class ConnectionManagementDecorat
         try
         {
             if (name == null || name.isEmpty()) { name = 
generateSessionName(); }
-            Session ssn = new 
SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
+            SessionExt ssn = new 
SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
             _sessions.put(name, ssn);
             return ssn;
         }
@@ -198,10 +199,10 @@ public class ConnectionManagementDecorat
     }
 
     @Override
-    public List<Session> getSessions() throws ConnectionException
+    public List<SessionExt> getSessions() throws ConnectionException
     {
         checkClosedAndThrowException();
-        return new ArrayList<Session>(_sessions.values());
+        return new ArrayList<SessionExt>(_sessions.values());
     }
 
     @Override // Called by the delegate or a a session created by this 
connection.
@@ -282,4 +283,11 @@ public class ConnectionManagementDecorat
         // TODO add local IP and pid to the beginning;
         return _ssnNameGenerator.generate().toString();
     }
+
+    @Override
+    public void recreate() throws MessagingException
+    {
+        // TODO Auto-generated method stub
+
+    }
 }

Modified: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java?rev=1350704&r1=1350703&r2=1350704&view=diff
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
 (original)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
 Fri Jun 15 17:21:19 2012
@@ -30,6 +30,9 @@ import org.apache.qpid.messaging.Sender;
 import org.apache.qpid.messaging.Session;
 import org.apache.qpid.messaging.SessionException;
 import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.apache.qpid.messaging.ext.ReceiverExt;
+import org.apache.qpid.messaging.ext.SenderExt;
+import org.apache.qpid.messaging.ext.SessionExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +66,8 @@ import org.slf4j.LoggerFactory;
  *      <ol>
  *       <li>The application (normal close)</li>
  *       <li>By the parent via failover (error)</li>
- *       <li>By the connection object, if not failover(error)</li>
+ *       <li>By the connection object, if no failover(error)</li>
+ *       <li>By itself if it receives and exception (error)</li>
  *      </ol>
  * </i>
  *
@@ -71,7 +75,7 @@ import org.slf4j.LoggerFactory;
  * For the time being, anytime a session exception is received, the session 
will be marked CLOSED.
  * We need to revisit this.
  */
-public class SessionManagementDecorator implements Session
+public class SessionManagementDecorator implements SessionExt
 {
     private static Logger _logger = 
LoggerFactory.getLogger(SessionManagementDecorator.class);
 
@@ -80,8 +84,8 @@ public class SessionManagementDecorator 
     private ConnectionExt _conn;
     private Session _delegate;
     SessionState _state = SessionState.UNDEFINED;
-    private List<Receiver> _receivers = new ArrayList<Receiver>();
-    private List<Sender> _senders = new ArrayList<Sender>();
+    private List<ReceiverExt> _receivers = new ArrayList<ReceiverExt>();
+    private List<SenderExt> _senders = new ArrayList<SenderExt>();
     private final Object _connectionLock;  // global per connection lock
 
     public SessionManagementDecorator(ConnectionExt conn, Session delegate)
@@ -306,7 +310,7 @@ public class SessionManagementDecorator 
         checkClosedAndThrowException();
         try
         {
-            Sender sender = _delegate.createSender(address);
+            SenderExt sender = new 
SenderManagementDecorator(this,_delegate.createSender(address));
             _senders.add(sender);
             return sender;
         }
@@ -326,7 +330,7 @@ public class SessionManagementDecorator 
         checkClosedAndThrowException();
         try
         {
-            Sender sender = _delegate.createSender(address);
+            SenderExt sender = new 
SenderManagementDecorator(this,_delegate.createSender(address));
             _senders.add(sender);
             return sender;
         }
@@ -346,7 +350,7 @@ public class SessionManagementDecorator 
         checkClosedAndThrowException();
         try
         {
-            Receiver receiver = _delegate.createReceiver(address);
+            ReceiverExt receiver = new 
ReceiverManagementDecorator(this,_delegate.createReceiver(address));
             _receivers.add(receiver);
             return receiver;
         }
@@ -366,7 +370,7 @@ public class SessionManagementDecorator 
         checkClosedAndThrowException();
         try
         {
-            Receiver receiver = _delegate.createReceiver(address);
+            ReceiverExt receiver = new 
ReceiverManagementDecorator(this,_delegate.createReceiver(address));
             _receivers.add(receiver);
             return receiver;
         }
@@ -412,6 +416,32 @@ public class SessionManagementDecorator 
         }
     }
 
+    @Override
+    public void exception(MessagingException e)
+    {
+        if (e instanceof ConnectionException)
+        {
+            handleConnectionException((ConnectionException)e);
+        }
+        else if (e instanceof SessionException)
+        {
+            handleSessionException((SessionException)e);
+        }
+    }
+
+    @Override
+    public void recreate() throws MessagingException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public ConnectionExt getConnectionExt()
+    {
+        return _conn;
+    }
+
     private void checkClosedAndThrowException() throws SessionException
     {
         checkClosedAndThrowException("Session is closed. You cannot invoke 
methods on a closed sesion");
@@ -470,7 +500,15 @@ public class SessionManagementDecorator 
     {
         synchronized (_connectionLock)
         {
-            _state = SessionState.CLOSED;
+            try
+            {
+                close();
+            }
+            catch(MessagingException ex)
+            {
+                // Should not throw an exception here.
+                // Even if it did, does't matter as are closing.
+            }
         }
         return new SessionException("Session has been closed",e);
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to