Author: rajith
Date: Tue Jun 26 15:54:10 2012
New Revision: 1354074
URL: http://svn.apache.org/viewvc?rev=1354074&view=rev
Log:
QPID-4027 Made modifications to reflect the changes made to interfaces.
Fixed bugs identified in testing.
Modified:
qpid/branches/address-refactor2/qpid/cpp/bindings/swig_java_cpp_helper.i
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
qpid/branches/address-refactor2/qpid/java/tools/etc/test.log4j
Modified:
qpid/branches/address-refactor2/qpid/cpp/bindings/swig_java_cpp_helper.i
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/cpp/bindings/swig_java_cpp_helper.i?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
--- qpid/branches/address-refactor2/qpid/cpp/bindings/swig_java_cpp_helper.i
(original)
+++ qpid/branches/address-refactor2/qpid/cpp/bindings/swig_java_cpp_helper.i
Tue Jun 26 15:54:10 2012
@@ -581,7 +581,7 @@ void WriteOnlyVariantMapWrapper::put(con
qpid::types::Variant v = convertJavaObjectToVariant(env,obj);
- if (v)
+ if (!v.isVoid())
{
varMap_[key] = v;
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
Tue Jun 26 15:54:10 2012
@@ -17,6 +17,8 @@
*/
package org.apache.qpid.messaging;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,4 +51,6 @@ public abstract class ConnectionFactory
}
public abstract Connection createConnection(String url);
+
+ public abstract Connection createConnection(String url, Map<String,Object>
options);
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
Tue Jun 26 15:54:10 2012
@@ -24,7 +24,7 @@ package org.apache.qpid.messaging;
* to whatever settings have been configured), then an instance of
* this class will be thrown to signal that.
*/
-public class TransportFailureException extends ConnectionException
+public class TransportFailureException extends MessagingException
{
public TransportFailureException(String message, Throwable cause)
@@ -36,5 +36,4 @@ public class TransportFailureException e
{
super(message);
}
-
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
Tue Jun 26 15:54:10 2012
@@ -17,11 +17,18 @@
*/
package org.apache.qpid.messaging.cpp;
-import org.apache.qpid.messaging.Connection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.messaging.ConnectionException;
import org.apache.qpid.messaging.MessageFactory;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.TransportFailureException;
import org.apache.qpid.messaging.cpp.jni.NativeConnection;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.SessionInternal;
/**
* This class relies on the ConnectionManagementDecorator for
@@ -29,21 +36,60 @@ import org.apache.qpid.messaging.cpp.jni
* This class is merely a delegate/wrapper for the,
* underlying c++ connection object.
*/
-public class CppConnection implements Connection
+public class CppConnection implements ConnectionInternal
{
private static MessageFactory _MSG_FACTORY = new CppMessageFactory();
private NativeConnection _cppConn;
+ private String _url;
+ private Map<String,Object> _options;
+ private long _serialNumber = 0L; // used for avoiding spurious failover
calls.
+
+ public CppConnection(String url, Map<String,Object> options)
+ {
+ _cppConn = createNativeConnection(url,options);
+ }
- public CppConnection(String url)
+ private NativeConnection createNativeConnection(String url,
Map<String,Object> options)
{
- _cppConn = new NativeConnection(url);
+ _url = url;
+ _options = options;
+ if (options == null || options.size() == 0)
+ {
+ return new NativeConnection(url);
+ }
+ else
+ {
+ return new NativeConnection(url,options);
+ }
}
@Override
public void open() throws MessagingException
{
_cppConn.open();
+ _serialNumber++; //wrap around ?
+ }
+
+ public void reconnect(String url,Map<String,Object> options) throws
TransportFailureException
+ {
+ try
+ {
+ if (_cppConn != null && _cppConn.isOpen())
+ {
+ close();
+ }
+ _cppConn = createNativeConnection(url,options);
+ open();
+ }
+ catch (TransportFailureException e)
+ {
+ throw e;
+ }
+ catch (MessagingException e)
+ {
+ throw new TransportFailureException("Error reconnecting",e);
+ }
}
@Override
@@ -62,19 +108,20 @@ public class CppConnection implements Co
finally
{
_cppConn.delete(); //clean up the c++ object
+ _cppConn = null;
}
}
@Override
public Session createSession(String name) throws MessagingException
{
- return new CppSession(this,_cppConn.createSession());
+ return new CppSession(this,_cppConn.createSession(name),name);
}
@Override
public Session createTransactionalSession(String name) throws
MessagingException
{
- return new CppSession(this,_cppConn.createTransactionalSession());
+ return new
CppSession(this,_cppConn.createTransactionalSession(name),name);
}
@Override
@@ -88,4 +135,66 @@ public class CppConnection implements Co
{
return _MSG_FACTORY;
}
+
+ @Override
+ public void addConnectionEventListener(ConnectionEventListener l)
+ throws ConnectionException
+ { // NOOP
+ }
+
+ @Override
+ public void removeConnectionEventListener(ConnectionEventListener l)
+ throws ConnectionException
+ { // NOOP
+ }
+
+ @Override
+ public List<SessionInternal> getSessions() throws ConnectionException
+ { // NOOP
+ return null;
+ }
+
+ @Override
+ public void exception(TransportFailureException e, long serialNumber)
+ { // NOOP
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ { // NOOP
+ }
+
+ @Override
+ public void unregisterSession(SessionInternal sesion)
+ { // NOOP
+ }
+
+ @Override
+ public Object getConnectionLock()
+ { // NOOP
+ return null;
+ }
+
+ @Override
+ public String getConnectionURL()
+ {
+ return _url;
+ }
+
+ @Override
+ public Map<String, Object> getConnectionOptions()
+ {
+ return _options;
+ }
+
+ @Override
+ public long getSerialNumber()
+ {
+ return _serialNumber;
+ }
+
+ NativeConnection getNativeConnection()
+ {
+ return _cppConn;
+ }
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
Tue Jun 26 15:54:10 2012
@@ -17,9 +17,11 @@
*/
package org.apache.qpid.messaging.cpp;
+import java.util.Map;
+
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionFactory;
-import org.apache.qpid.messaging.util.ConnectionManagementDecorator;
+import org.apache.qpid.messaging.util.failover.ConnectionFailoverDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +40,15 @@ public class CppConnectionFactory extend
{
}
+ @Override
public Connection createConnection(String url)
{
- return new ConnectionManagementDecorator(new CppConnection(url));
+ return createConnection(url, null);
+ }
+
+ @Override
+ public Connection createConnection(String url, Map<String, Object> options)
+ {
+ return new ConnectionFailoverDecorator(new CppConnection(url,options),
new Object());
}
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
Tue Jun 26 15:54:10 2012
@@ -19,22 +19,24 @@ package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeReceiver;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
-public class CppReceiver implements Receiver
+public class CppReceiver implements ReceiverInternal
{
- private CppSession _ssn;
+ private final CppSession _ssn;
private NativeReceiver _cppReceiver;
- private CppMessageFactory _msgFactory;
+ private final CppMessageFactory _msgFactory;
+ private final String _address;
- public CppReceiver(CppSession ssn, NativeReceiver cppReceiver) throws
MessagingException
+ public CppReceiver(CppSession ssn, NativeReceiver cppReceiver, String
address) throws MessagingException
{
_ssn = ssn;
_cppReceiver = cppReceiver;
_msgFactory =
(CppMessageFactory)ssn.getConnection().getMessageFactory();
+ _address = address;
}
@Override
@@ -106,4 +108,10 @@ public class CppReceiver implements Rece
_ssn.checkError();
return _ssn;
}
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ _cppReceiver = _ssn.getNativeSession().createReceiver(_address);
+ }
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
Tue Jun 26 15:54:10 2012
@@ -25,18 +25,21 @@ import org.apache.qpid.messaging.cpp.Cpp
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeSender;
import org.apache.qpid.messaging.internal.MessageInternal;
+import org.apache.qpid.messaging.internal.SenderInternal;
-public class CppSender implements Sender
+public class CppSender implements SenderInternal
{
- private CppSession _ssn;
+ private final CppSession _ssn;
private NativeSender _cppSender;
- private CppMessageFactory _msgFactory;
+ private final CppMessageFactory _msgFactory;
+ private final String _address;
- public CppSender(CppSession ssn, NativeSender cppSender) throws
MessagingException
+ public CppSender(CppSession ssn, NativeSender cppSender, String address)
throws MessagingException
{
_ssn = ssn;
_cppSender = cppSender;
_msgFactory =
(CppMessageFactory)ssn.getConnection().getMessageFactory();
+ _address = address;
}
@Override
@@ -121,4 +124,10 @@ public class CppSender implements Sender
_ssn.checkError();
return _ssn;
}
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ _cppSender = _ssn.getNativeSession().createSender(_address);
+ }
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
Tue Jun 26 15:54:10 2012
@@ -23,10 +23,14 @@ import org.apache.qpid.messaging.Message
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Sender;
-import org.apache.qpid.messaging.Session;
-import org.apache.qpid.messaging.cpp.jni.Duration;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeSession;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
+import org.apache.qpid.messaging.internal.SenderInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
/**
* This class relies on the SessionManagementDecorator for
@@ -34,15 +38,17 @@ import org.apache.qpid.messaging.cpp.jni
* This class is merely a delegate/wrapper for the,
* underlying c++ session object.
*/
-public class CppSession implements Session
+public class CppSession implements SessionInternal
{
private NativeSession _cppSession;
private CppConnection _conn;
+ private String _name;
- public CppSession(CppConnection conn,NativeSession cppSsn)
+ public CppSession(CppConnection conn,NativeSession cppSsn, String name)
{
_cppSession = cppSsn;
_conn = conn;
+ _name = name;
}
@Override
@@ -122,31 +128,32 @@ public class CppSession implements Sessi
public Receiver nextReceiver(long timeout) throws MessagingException
{
// This needs to be revisited.
- return new
CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
+ //return new
CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
+ return null;
}
@Override
public Sender createSender(Address address) throws MessagingException
- {
- return new CppSender(this,
_cppSession.createSender(address.toString()));
+ {
+ return new CppSender(this,
_cppSession.createSender(address.toString()),address.toString());
}
@Override
public Sender createSender(String address) throws MessagingException
{
- return new CppSender(this,_cppSession.createSender(address));
+ return new CppSender(this,_cppSession.createSender(address),address);
}
@Override
public Receiver createReceiver(Address address) throws MessagingException
{
- return new CppReceiver(this,
_cppSession.createReceiver(address.toString()));
+ return new CppReceiver(this,
_cppSession.createReceiver(address.toString()),address.toString());
}
@Override
public Receiver createReceiver(String address) throws MessagingException
{
- return new CppReceiver(this,_cppSession.createReceiver(address));
+ return new
CppReceiver(this,_cppSession.createReceiver(address),address);
}
@Override
@@ -168,4 +175,48 @@ public class CppSession implements Sessi
{
_cppSession.checkError();
}
+
+ @Override
+ public ConnectionInternal getConnectionInternal()
+ {
+ return _conn;
+ }
+
+ @Override
+ public void exception(TransportFailureException e, long serialNumber)
+ {//NOOP
+ }
+
+ @Override
+ public void exception(SessionException e)
+ {//NOOP
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO need to keep track if it's transactional or not
+ _cppSession = _conn.getNativeConnection().createSession(_name);
+ }
+
+ @Override
+ public String getName()
+ {//NOOP
+ return _name;
+ }
+
+ @Override
+ public void unregisterReceiver(ReceiverInternal receiver)
+ {//NOOP
+ }
+
+ @Override
+ public void unregisterSender(SenderInternal sender)
+ {//NOOP
+ }
+
+ NativeSession getNativeSession()
+ {
+ return _cppSession;
+ }
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
Tue Jun 26 15:54:10 2012
@@ -36,7 +36,9 @@ public class CppTest
{
public static void main(String[] args) throws Exception
{
- Connection con =
ConnectionFactory.get().createConnection("localhost:5672");
+ HashMap<String,Object> options = new HashMap<String,Object>();
+ options.put("reconnect_urls", "localhost:6672,localhost:7672");
+ Connection con =
ConnectionFactory.get().createConnection("localhost:5672",options);
con.open();
Session ssn = con.createSession(null);
Sender sender = ssn.createSender("amq.topic/test");
@@ -50,6 +52,8 @@ public class CppTest
msg.setProperty("boolean", true);
sender.send(msg, false);
+ Thread.sleep(2000);
+
StringMessage stringMsg = (StringMessage) receiver.fetch(0);
System.out.println("Received message " + stringMsg + " with content
type : " + stringMsg.getContentType() + " and content : " +
stringMsg.getString());
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
Tue Jun 26 15:54:10 2012
@@ -19,11 +19,9 @@ package org.apache.qpid.messaging.intern
import org.apache.qpid.messaging.ConnectionException;
-public interface ConnectionStateListener
+public interface ConnectionEventListener
{
public void exception(ConnectionException e);
- public void opened();
-
- public void closed();
+ public void eventOccured(ConnectionEvent event);
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
Tue Jun 26 15:54:10 2012
@@ -18,24 +18,28 @@
package org.apache.qpid.messaging.internal;
import java.util.List;
+import java.util.Map;
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionException;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.TransportFailureException;
/**
* An extended interface meant for API implementors.
*/
public interface ConnectionInternal extends Connection
{
- public void addConnectionStateListener(ConnectionStateListener l) throws
ConnectionException;
+ public void addConnectionEventListener(ConnectionEventListener l) throws
ConnectionException;
- public void removeConnectionStateListener(ConnectionStateListener l)
throws ConnectionException;
+ public void removeConnectionEventListener(ConnectionEventListener l)
throws ConnectionException;
public List<SessionInternal> getSessions() throws ConnectionException;
- public void exception(ConnectionException e);
+ public void exception(TransportFailureException e, long serialNumber);
+
+ public void reconnect(String url, Map<String,Object> options) throws
TransportFailureException;
public void recreate() throws MessagingException;
@@ -48,4 +52,16 @@ public interface ConnectionInternal exte
* perhaps at the cost of a minor perf degradation.
*/
public Object getConnectionLock();
+
+ public String getConnectionURL();
+
+ public Map<String,Object> getConnectionOptions();
+
+ /**
+ * Every time a protocol connection is established a new serial number
+ * is assigned to the connection to distinguish itself from a previous
+ * version. This is useful in avoiding the same connection exception being
+ * notified by multiple sessions (and it's children), resulting in
spurious failover calls.
+ */
+ public long getSerialNumber();
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
Tue Jun 26 15:54:10 2012
@@ -20,10 +20,10 @@ package org.apache.qpid.messaging.intern
public interface FailoverStrategy
{
public boolean failoverAllowed();
-
+
public ConnectionString getNextConnectionString();
-
+
public ConnectionString getCurrentConnectionString();
-
+
public void connectionAttained(ConnectionInternal conn);
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
Tue Jun 26 15:54:10 2012
@@ -29,5 +29,4 @@ public abstract class FailoverStrategyFa
}
public abstract FailoverStrategy getFailoverStrategy(ConnectionInternal
con);
-
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
Tue Jun 26 15:54:10 2012
@@ -19,12 +19,16 @@ package org.apache.qpid.messaging.intern
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
public interface SessionInternal extends Session
{
public ConnectionInternal getConnectionInternal();
- public void exception(MessagingException e);
+ public void exception(TransportFailureException e, long serialNumber);
+
+ public void exception(SessionException e);
public void recreate() throws MessagingException;
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
Tue Jun 26 15:54:10 2012
@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractConnectionDecorator implements ConnectionInternal
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
-
+
protected ConnectionInternal _delegate;
protected final Object _connectionLock;
protected List<ConnectionEventListener> _stateListeners = new
ArrayList<ConnectionEventListener>();
protected Map<String, SessionInternal> _sessions = new
ConcurrentHashMap<String,SessionInternal>();
-
+
protected AbstractConnectionDecorator(ConnectionInternal delegate, Object
lock)
{
_delegate = delegate;
@@ -58,7 +58,7 @@ public abstract class AbstractConnection
_delegate.open();
}
}
-
+
public void reconnect(String url, Map<String,Object> options) throws
TransportFailureException
{
synchronized (_connectionLock)
@@ -168,7 +168,7 @@ public abstract class AbstractConnection
@Override
public Object getConnectionLock()
- {
+ {
return _connectionLock;
}
@@ -189,7 +189,7 @@ public abstract class AbstractConnection
{
return _delegate.getSerialNumber();
}
-
+
protected void notifyEvent(ConnectionEvent event)
{
for (ConnectionEventListener l: _stateListeners)
@@ -197,6 +197,6 @@ public abstract class AbstractConnection
l.eventOccured(event);
}
}
-
+
protected abstract void checkPreConditions() throws ConnectionException;
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
Tue Jun 26 15:54:10 2012
@@ -54,7 +54,7 @@ public abstract class AbstractSenderDeco
public int getCapacity() throws MessagingException
{
checkPreConditions();
- return _delegate.getCapacity();
+ return _delegate.getCapacity();
}
@Override
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
Tue Jun 26 15:54:10 2012
@@ -90,7 +90,7 @@ public abstract class AbstractSessionDec
_conn.unregisterSession(this);
}
}
-
+
@Override
public boolean isClosed()
{
@@ -137,14 +137,14 @@ public abstract class AbstractSessionDec
public void release(Message message) throws MessagingException
{
checkPreConditions();
- _delegate.release(message);
+ _delegate.release(message);
}
@Override
public void sync(boolean block) throws MessagingException
{
checkPreConditions();
- _delegate.sync(block);
+ _delegate.sync(block);
}
@Override
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
Tue Jun 26 15:54:10 2012
@@ -32,8 +32,8 @@ public class DefaultFailoverStrategy imp
/** seconds (give up and report failure after specified time) */
private long _reconnectTimeout = 1000;
- /** n (give up and report failure after specified number of attempts) */
- private int _reconnectLimit = 1;
+ /** n (give up and report failure after specified number of attempts) */
+ private int _reconnectLimit = 1;
/** seconds (initial delay between failed reconnection attempts) */
private long _reconnectIntervalMin = 1000;
@@ -74,7 +74,7 @@ public class DefaultFailoverStrategy imp
@Override
public boolean failoverAllowed()
{
- return (_attempts < _reconnectLimit);
+ return (_attempts < _reconnectLimit);
}
@Override
@@ -94,7 +94,7 @@ public class DefaultFailoverStrategy imp
@Override
public ConnectionString getCurrentConnectionString()
{
- return _currentUrl;
+ return _currentUrl;
}
@Override
@@ -102,7 +102,7 @@ public class DefaultFailoverStrategy imp
{
_attempts = 0;
}
-
+
class ConnectionStringImpl implements ConnectionString
{
private final String _url;
@@ -113,7 +113,7 @@ public class DefaultFailoverStrategy imp
_url = url;
_options = options;
}
-
+
public String getUrl()
{
return _url;
@@ -125,4 +125,4 @@ public class DefaultFailoverStrategy imp
}
}
-}
\ No newline at end of file
+}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
Tue Jun 26 15:54:10 2012
@@ -6,7 +6,6 @@ import org.apache.qpid.messaging.interna
public class DefaultFailoverStrategyFactory extends FailoverStrategyFactory
{
-
@Override
public FailoverStrategy getFailoverStrategy(ConnectionInternal con)
{
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
Tue Jun 26 15:54:10 2012
@@ -187,7 +187,7 @@ public class ReceiverFailoverDecorator e
if (_state == ReceiverState.CLOSED)
{
throw new MessagingException("Receiver is already closed");
- }
+ }
_state = ReceiverState.CLOSED;
super.close();
}
@@ -216,7 +216,7 @@ public class ReceiverFailoverDecorator e
_delegate.recreate();
}
}
-
+
@Override
public void eventOccured(ConnectionEvent event)
{
@@ -234,7 +234,7 @@ public class ReceiverFailoverDecorator e
case POST_FAILOVER:
try
{
- if (_state != ReceiverState.OPENED)
+ if (_state != ReceiverState.OPENED)
{
close();
}
@@ -246,7 +246,7 @@ public class ReceiverFailoverDecorator e
_connectionLock.notifyAll();
break;
default:
- break; //ignore the rest
+ break; //ignore the rest
}
}
}
@@ -255,7 +255,7 @@ public class ReceiverFailoverDecorator e
public void exception(ConnectionException e)
{// NOOP
}
-
+
protected void checkPreConditions() throws ReceiverException
{
switch (_state)
@@ -266,7 +266,7 @@ public class ReceiverFailoverDecorator e
waitForFailoverToComplete();
}
}
-
+
protected void waitForFailoverToComplete() throws ReceiverException
{
synchronized (_connectionLock)
@@ -295,7 +295,7 @@ public class ReceiverFailoverDecorator e
}
return new ReceiverException("Session has been closed",e);
}
-
+
protected void failover(TransportFailureException e, long serialNumber)
throws ReceiverException
{
synchronized (_connectionLock)
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
Tue Jun 26 15:54:10 2012
@@ -34,7 +34,7 @@ public class SenderFailoverDecorator ext
private long _failoverTimeout = Long.getLong("qpid.failover-timeout",
1000);
private ReceiverException _lastException;
private long _connSerialNumber = 0;
-
+
public SenderFailoverDecorator(SessionInternal ssn, SenderInternal
delegate)
{
super(ssn,delegate);
@@ -72,7 +72,7 @@ public class SenderFailoverDecorator ext
if (_state == SenderState.CLOSED)
{
throw new MessagingException("Sender is already closed");
- }
+ }
_state = SenderState.CLOSED;
super.close();
}
@@ -188,7 +188,7 @@ public class SenderFailoverDecorator ext
_delegate.recreate();
}
}
-
+
@Override
public void eventOccured(ConnectionEvent event)
{
@@ -206,7 +206,7 @@ public class SenderFailoverDecorator ext
case POST_FAILOVER:
try
{
- if (_state != SenderState.OPENED)
+ if (_state != SenderState.OPENED)
{
close();
}
@@ -218,7 +218,7 @@ public class SenderFailoverDecorator ext
_connectionLock.notifyAll();
break;
default:
- break; //ignore the rest
+ break; //ignore the rest
}
}
}
@@ -246,7 +246,7 @@ public class SenderFailoverDecorator ext
}
}
}
-
+
protected void failover(TransportFailureException e, long serialNumber)
throws SenderException
{
synchronized (_connectionLock)
@@ -260,7 +260,7 @@ public class SenderFailoverDecorator ext
waitForFailoverToComplete();
}
}
-
+
protected void checkPreConditions() throws SenderException
{
switch (_state)
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
Tue Jun 26 15:54:10 2012
@@ -45,7 +45,7 @@ public class SessionFailoverDecorator ex
private long _failoverTimeout = Long.getLong("qpid.failover-timeout",
1000);
private SessionException _lastException;
private long _connSerialNumber = 0;
-
+
public SessionFailoverDecorator(ConnectionInternal conn, SessionInternal
delegate)
{
super(conn,delegate);
@@ -278,7 +278,7 @@ public class SessionFailoverDecorator ex
try
{
SenderInternal sender = new SenderFailoverDecorator(this,
- (SenderInternal) _delegate.createSender(address));
+ (SenderInternal) _delegate.createSender(address));
synchronized (_connectionLock)
{
_senders.add(sender);
@@ -394,7 +394,7 @@ public class SessionFailoverDecorator ex
throw handleSessionException(e);
}
}
-
+
@Override
public boolean isClosed()
{
@@ -420,7 +420,7 @@ public class SessionFailoverDecorator ex
_lastException = ex;
}
}
-
+
public void exception(SessionException e)
{
handleSessionException(e);
@@ -473,7 +473,7 @@ public class SessionFailoverDecorator ex
case POST_FAILOVER:
try
{
- if (_state != SessionState.OPENED)
+ if (_state != SessionState.OPENED)
{
close();
}
@@ -485,7 +485,7 @@ public class SessionFailoverDecorator ex
_connectionLock.notifyAll();
break;
default:
- break; //ignore the rest
+ break; //ignore the rest
}
}
}
@@ -503,7 +503,7 @@ public class SessionFailoverDecorator ex
waitForFailoverToComplete();
}
}
-
+
protected void checkPreConditions() throws SessionException
{
switch (_state)
@@ -537,7 +537,7 @@ public class SessionFailoverDecorator ex
}
return new SessionException("Session has been closed",e);
}
-
+
protected void waitForFailoverToComplete() throws SessionException
{
synchronized (_connectionLock)
Modified: qpid/branches/address-refactor2/qpid/java/tools/etc/test.log4j
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/tools/etc/test.log4j?rev=1354074&r1=1354073&r2=1354074&view=diff
==============================================================================
--- qpid/branches/address-refactor2/qpid/java/tools/etc/test.log4j (original)
+++ qpid/branches/address-refactor2/qpid/java/tools/etc/test.log4j Tue Jun 26
15:54:10 2012
@@ -18,7 +18,7 @@
#
log4j.rootLogger=${root.logging.level}
-log4j.logger.org.apache.qpid=ERROR, console
+log4j.logger.org.apache.qpid=DEBUG, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]