Author: rajith
Date: Fri Jun 15 17:20:48 2012
New Revision: 1350702
URL: http://svn.apache.org/viewvc?rev=1350702&view=rev
Log:
QPID-4027 Added extension interfaces for Session, Sender and Receiver.
Added convinience class that converts java long timeout to the C++
Duration object.
Added ReceiverManagementDecorator and SenderManagementDecorator that
provides state management and error handling via the decorator pattern
for a Receiver and a Sender respectively.
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java?rev=1350702&view=auto
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
(added)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
Fri Jun 15 17:20:48 2012
@@ -0,0 +1,48 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.cpp;
+
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.cpp.jni.Duration;
+
+public class CppDuration
+{
+ public static Duration getDuration(long duration)
+ {
+ if (Receiver.FOREVER == duration)
+ {
+ return Duration.getFOREVER();
+ }
+ else if (Receiver.IMMEDIATE == duration)
+ {
+ return Duration.getIMMEDIATE();
+ }
+ else if (Receiver.SECOND == duration)
+ {
+ return Duration.getSECOND();
+ }
+ else if (Receiver.MINUTE == duration)
+ {
+ return Duration.getMINUTE();
+ }
+ else
+ {
+ return new Duration(duration);
+ }
+ }
+}
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java?rev=1350702&view=auto
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
(added)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
Fri Jun 15 17:20:48 2012
@@ -0,0 +1,26 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+
+public interface ReceiverExt extends Receiver
+{
+ public void recreate() throws MessagingException;
+}
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java?rev=1350702&view=auto
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
(added)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
Fri Jun 15 17:20:48 2012
@@ -0,0 +1,26 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Sender;
+
+public interface SenderExt extends Sender
+{
+ public void recreate() throws MessagingException;
+}
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java?rev=1350702&view=auto
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
(added)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
Fri Jun 15 17:20:48 2012
@@ -0,0 +1,30 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+
+public interface SessionExt extends Session
+{
+ public ConnectionExt getConnectionExt();
+
+ public void exception(MessagingException e);
+
+ public void recreate() throws MessagingException;
+}
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java?rev=1350702&view=auto
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
(added)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
Fri Jun 15 17:20:48 2012
@@ -0,0 +1,267 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ReceiverExt;
+import org.apache.qpid.messaging.ext.SessionExt;
+import org.apache.qpid.messaging.util.SessionManagementDecorator.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Receiver.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class ReceiverManagementDecorator implements ReceiverExt
+{
+ private static Logger _logger =
LoggerFactory.getLogger(ReceiverManagementDecorator.class);
+
+ public enum ReceiverState {OPENED, CLOSED, ERROR};
+
+ private Receiver _delegate;
+ private ReceiverState _state = ReceiverState.OPENED;
+ private SessionExt _ssn;
+ private final Object _connectionLock; // global per connection lock
+
+ public ReceiverManagementDecorator(SessionExt ssn, Receiver delegate)
+ {
+ _ssn = ssn;
+ _delegate = delegate;
+ _connectionLock = ssn.getConnectionExt().getConnectionLock();
+ }
+
+ @Override
+ public Message get(long timeout) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.get(timeout);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public Message fetch(long timeout) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.fetch(timeout);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.setCapacity(capacity);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getCapacity();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getAvailable();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getUnsettled();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Receiver is already closed");
+ synchronized (_connectionLock)
+ {
+ _state = ReceiverState.CLOSED;
+ _delegate.close();
+ }
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return _state == ReceiverState.CLOSED;
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ return _delegate.getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ private void checkClosedAndThrowException() throws ReceiverException
+ {
+ checkClosedAndThrowException("Receiver is closed. You cannot invoke
methods on a closed receiver");
+ }
+
+ private void checkClosedAndThrowException(String closedMessage) throws
ReceiverException
+ {
+ switch (_state)
+ {
+ case ERROR:
+ throw new ReceiverException("Receiver is in a temporary error
state. The session may or may not recover from this");
+ case CLOSED:
+ throw new ReceiverException(closedMessage);
+ }
+ }
+
+ /**
+ * A ConnectionException will cause the Session/Receiver to go into a
temporary error state,
+ * which prevents it from being used further.
+ * From there the Session and Receiver can be moved into OPENED (if
failover works) or
+ * CLOSED if there is no failover or if failover has failed.
+ * @param e
+ * @throws MessagingException
+ */
+ private ReceiverException handleConnectionException(ConnectionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = ReceiverState.ERROR;
+ _ssn.exception(e); // This might trigger failover in a layer above.
+ if (_state == ReceiverState.CLOSED)
+ {
+ // The connection has instructed the session and it's child
objects to be closed.
+ // Either there was no failover, or failover has failed.
+ return new ReceiverException("Receiver is closed due to
connection error",e);
+ }
+ else
+ {
+ // Asking the application or the Parent handler to retry the
operation.
+ // The Session and Receiver should be in OPENED state at this
time.
+ return new ReceiverException("Receiver was in a temporary
error state due to connection error." +
+ "Plase retry your operation",e);
+ }
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ private ReceiverException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ // This should close all receivers (including this) and senders.
+ _ssn.exception(e);
+ }
+ return new ReceiverException("Session has been closed",e);
+ }
+}
Added:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java?rev=1350702&view=auto
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
(added)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
Fri Jun 15 17:20:48 2012
@@ -0,0 +1,234 @@
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.SenderException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.SenderExt;
+import org.apache.qpid.messaging.ext.SessionExt;
+import
org.apache.qpid.messaging.util.ReceiverManagementDecorator.ReceiverState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Sender.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class SenderManagementDecorator implements SenderExt
+{
+ private static Logger _logger =
LoggerFactory.getLogger(SenderManagementDecorator.class);
+
+ public enum SenderState {OPENED, CLOSED, ERROR};
+
+ private Sender _delegate;
+ private SenderState _state = SenderState.OPENED;
+ private SessionExt _ssn;
+ private final Object _connectionLock; // global per connection lock
+
+ public SenderManagementDecorator(SessionExt ssn, Sender delegate)
+ {
+ _ssn = ssn;
+ _delegate = delegate;
+ _connectionLock = ssn.getConnectionExt().getConnectionLock();
+ }
+
+ @Override
+ public void send(Message message, boolean sync) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.send(message, sync);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public void close() throws MessagingException
+ {
+ checkClosedAndThrowException("Sender is already closed");
+ synchronized (_connectionLock)
+ {
+ _state = SenderState.CLOSED;
+ _delegate.close();
+ }
+ }
+
+ @Override
+ public void setCapacity(int capacity) throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ _delegate.setCapacity(capacity);
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getCapacity() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getCapacity();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getAvailable() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getAvailable();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public int getUnsettled() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ try
+ {
+ return _delegate.getUnsettled();
+ }
+ catch (ConnectionException e)
+ {
+ throw handleConnectionException(e);
+ }
+ catch (SessionException e)
+ {
+ throw handleSessionException(e);
+ }
+ }
+
+ @Override
+ public boolean isClosed() throws MessagingException
+ {
+ return _state == SenderState.CLOSED;
+ }
+
+ @Override
+ public String getName() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ return _delegate.getName();
+ }
+
+ @Override
+ public Session getSession() throws MessagingException
+ {
+ checkClosedAndThrowException();
+ _ssn.checkError();
+ return _ssn;
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ private void checkClosedAndThrowException() throws ReceiverException
+ {
+ checkClosedAndThrowException("Receiver is closed. You cannot invoke
methods on a closed receiver");
+ }
+
+ private void checkClosedAndThrowException(String closedMessage) throws
ReceiverException
+ {
+ switch (_state)
+ {
+ case ERROR:
+ throw new ReceiverException("Receiver is in a temporary error
state. The session may or may not recover from this");
+ case CLOSED:
+ throw new ReceiverException(closedMessage);
+ }
+ }
+
+ /**
+ * A ConnectionException will cause the Session/Sender to go into a
temporary error state,
+ * which prevents it from being used further.
+ * From there the Session and Sender can be moved into OPENED (if failover
works) or
+ * CLOSED if there is no failover or if failover has failed.
+ * @param e
+ * @throws MessagingException
+ */
+ private SenderException handleConnectionException(ConnectionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ _state = SenderState.ERROR;
+ _ssn.exception(e); // This might trigger failover in a layer above.
+ if (_state == SenderState.CLOSED)
+ {
+ // The connection has instructed the session and it's child
objects to be closed.
+ // Either there was no failover, or failover has failed.
+ return new SenderException("Sender is closed due to connection
error",e);
+ }
+ else
+ {
+ // Asking the application or the Parent handler to retry the
operation.
+ // The Session and Receiver should be in OPENED state at this
time.
+ return new SenderException("Sender was in a temporary error
state due to connection error." +
+ "Plase retry your operation",e);
+ }
+ }
+ }
+
+ /**
+ * Session Exceptions will generally invalidate the Session.
+ * TODO this needs to be revisited again.
+ * A new session will need to be created in that case.
+ * @param e
+ * @throws MessagingException
+ */
+ private SenderException handleSessionException(SessionException e)
+ {
+ synchronized (_connectionLock)
+ {
+ // This should close all senders (including this) and receivers.
+ _ssn.exception(e);
+ }
+ return new SenderException("Session has been closed",e);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]