Author: rajith
Date: Fri Jun 15 17:20:48 2012
New Revision: 1350702

URL: http://svn.apache.org/viewvc?rev=1350702&view=rev
Log:
QPID-4027 Added extension interfaces for Session, Sender and Receiver.
Added convinience class that converts java long timeout to the C++
Duration object.
Added ReceiverManagementDecorator and SenderManagementDecorator that
provides state management and error handling via the decorator pattern
for a Receiver and a Sender respectively.

Added:
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
    
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java

Added: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java?rev=1350702&view=auto
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
 (added)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppDuration.java
 Fri Jun 15 17:20:48 2012
@@ -0,0 +1,48 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.cpp;
+
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.cpp.jni.Duration;
+
+public class CppDuration
+{
+    public static Duration getDuration(long duration)
+    {
+        if (Receiver.FOREVER == duration)
+        {
+            return Duration.getFOREVER();
+        }
+        else if (Receiver.IMMEDIATE == duration)
+        {
+            return Duration.getIMMEDIATE();
+        }
+        else if (Receiver.SECOND == duration)
+        {
+            return Duration.getSECOND();
+        }
+        else if (Receiver.MINUTE == duration)
+        {
+            return Duration.getMINUTE();
+        }
+        else
+        {
+            return new Duration(duration);
+        }
+    }
+}

Added: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java?rev=1350702&view=auto
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
 (added)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ReceiverExt.java
 Fri Jun 15 17:20:48 2012
@@ -0,0 +1,26 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+
+public interface ReceiverExt extends Receiver
+{
+    public void recreate() throws MessagingException;
+}

Added: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java?rev=1350702&view=auto
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
 (added)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SenderExt.java
 Fri Jun 15 17:20:48 2012
@@ -0,0 +1,26 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Sender;
+
+public interface SenderExt extends Sender
+{
+    public void recreate() throws MessagingException;
+}

Added: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java?rev=1350702&view=auto
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
 (added)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/SessionExt.java
 Fri Jun 15 17:20:48 2012
@@ -0,0 +1,30 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.ext;
+
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Session;
+
+public interface SessionExt extends Session
+{
+    public ConnectionExt getConnectionExt();
+
+    public void exception(MessagingException e);
+
+    public void recreate() throws MessagingException;
+}

Added: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java?rev=1350702&view=auto
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
 (added)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ReceiverManagementDecorator.java
 Fri Jun 15 17:20:48 2012
@@ -0,0 +1,267 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.ReceiverExt;
+import org.apache.qpid.messaging.ext.SessionExt;
+import org.apache.qpid.messaging.util.SessionManagementDecorator.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Receiver.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class ReceiverManagementDecorator implements ReceiverExt
+{
+    private static Logger _logger = 
LoggerFactory.getLogger(ReceiverManagementDecorator.class);
+
+    public enum ReceiverState {OPENED, CLOSED, ERROR};
+
+    private Receiver _delegate;
+    private ReceiverState _state = ReceiverState.OPENED;
+    private SessionExt _ssn;
+    private final Object _connectionLock;  // global per connection lock
+
+    public ReceiverManagementDecorator(SessionExt ssn, Receiver delegate)
+    {
+        _ssn = ssn;
+        _delegate = delegate;
+        _connectionLock = ssn.getConnectionExt().getConnectionLock();
+    }
+
+    @Override
+    public Message get(long timeout) throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.get(timeout);
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public Message fetch(long timeout) throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.fetch(timeout);
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public void setCapacity(int capacity) throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            _delegate.setCapacity(capacity);
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public int getCapacity() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.getCapacity();
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public int getAvailable() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.getAvailable();
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public int getUnsettled() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.getUnsettled();
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public void close() throws MessagingException
+    {
+        checkClosedAndThrowException("Receiver is already closed");
+        synchronized (_connectionLock)
+        {
+            _state = ReceiverState.CLOSED;
+            _delegate.close();
+        }
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+        return _state == ReceiverState.CLOSED;
+    }
+
+    @Override
+    public String getName() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        return _delegate.getName();
+    }
+
+    @Override
+    public Session getSession() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        _ssn.checkError();
+        return _ssn;
+    }
+
+    @Override
+    public void recreate() throws MessagingException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    private void checkClosedAndThrowException() throws ReceiverException
+    {
+        checkClosedAndThrowException("Receiver is closed. You cannot invoke 
methods on a closed receiver");
+    }
+
+    private void checkClosedAndThrowException(String closedMessage) throws 
ReceiverException
+    {
+        switch (_state)
+        {
+        case ERROR:
+            throw new ReceiverException("Receiver is in a temporary error 
state. The session may or may not recover from this");
+        case CLOSED:
+            throw new ReceiverException(closedMessage);
+        }
+    }
+
+    /**
+     * A ConnectionException will cause the Session/Receiver to go into a 
temporary error state,
+     * which prevents it from being used further.
+     * From there the Session and Receiver can be moved into OPENED (if 
failover works) or
+     * CLOSED if there is no failover or if failover has failed.
+     * @param e
+     * @throws MessagingException
+     */
+    private ReceiverException handleConnectionException(ConnectionException e)
+    {
+        synchronized (_connectionLock)
+        {
+            _state = ReceiverState.ERROR;
+            _ssn.exception(e); // This might trigger failover in a layer above.
+            if (_state == ReceiverState.CLOSED)
+            {
+                // The connection has instructed the session and it's child 
objects to be closed.
+                // Either there was no failover, or failover has failed.
+                return new ReceiverException("Receiver is closed due to 
connection error",e);
+            }
+            else
+            {
+                // Asking the application or the Parent handler to retry the 
operation.
+                // The Session and Receiver should be in OPENED state at this 
time.
+                return new ReceiverException("Receiver was in a temporary 
error state due to connection error." +
+                        "Plase retry your operation",e);
+            }
+        }
+    }
+
+    /**
+     * Session Exceptions will generally invalidate the Session.
+     * TODO this needs to be revisited again.
+     * A new session will need to be created in that case.
+     * @param e
+     * @throws MessagingException
+     */
+    private ReceiverException handleSessionException(SessionException e)
+    {
+        synchronized (_connectionLock)
+        {
+            // This should close all receivers (including this) and senders.
+            _ssn.exception(e);
+        }
+        return new ReceiverException("Session has been closed",e);
+    }
+}

Added: 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java?rev=1350702&view=auto
==============================================================================
--- 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
 (added)
+++ 
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SenderManagementDecorator.java
 Fri Jun 15 17:20:48 2012
@@ -0,0 +1,234 @@
+package org.apache.qpid.messaging.util;
+
+import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
+import org.apache.qpid.messaging.Receiver;
+import org.apache.qpid.messaging.ReceiverException;
+import org.apache.qpid.messaging.Sender;
+import org.apache.qpid.messaging.SenderException;
+import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.ext.SenderExt;
+import org.apache.qpid.messaging.ext.SessionExt;
+import 
org.apache.qpid.messaging.util.ReceiverManagementDecorator.ReceiverState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Decorator that adds basic housekeeping tasks to a Sender.
+ * This class adds,
+ * 1. State management.
+ * 2. Exception handling.
+ *
+ */
+public class SenderManagementDecorator implements SenderExt
+{
+    private static Logger _logger = 
LoggerFactory.getLogger(SenderManagementDecorator.class);
+
+    public enum SenderState {OPENED, CLOSED, ERROR};
+
+    private Sender _delegate;
+    private SenderState _state = SenderState.OPENED;
+    private SessionExt _ssn;
+    private final Object _connectionLock;  // global per connection lock
+
+    public SenderManagementDecorator(SessionExt ssn, Sender delegate)
+    {
+        _ssn = ssn;
+        _delegate = delegate;
+        _connectionLock = ssn.getConnectionExt().getConnectionLock();
+    }
+
+    @Override
+    public void send(Message message, boolean sync) throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            _delegate.send(message, sync);
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public void close() throws MessagingException
+    {
+        checkClosedAndThrowException("Sender is already closed");
+        synchronized (_connectionLock)
+        {
+            _state = SenderState.CLOSED;
+            _delegate.close();
+        }
+    }
+
+    @Override
+    public void setCapacity(int capacity) throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            _delegate.setCapacity(capacity);
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public int getCapacity() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.getCapacity();
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public int getAvailable() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.getAvailable();
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public int getUnsettled() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        try
+        {
+            return _delegate.getUnsettled();
+        }
+        catch (ConnectionException e)
+        {
+            throw handleConnectionException(e);
+        }
+        catch (SessionException e)
+        {
+            throw handleSessionException(e);
+        }
+    }
+
+    @Override
+    public boolean isClosed() throws MessagingException
+    {
+        return _state == SenderState.CLOSED;
+    }
+
+    @Override
+    public String getName() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        return _delegate.getName();
+    }
+
+    @Override
+    public Session getSession() throws MessagingException
+    {
+        checkClosedAndThrowException();
+        _ssn.checkError();
+        return _ssn;
+    }
+
+    @Override
+    public void recreate() throws MessagingException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    private void checkClosedAndThrowException() throws ReceiverException
+    {
+        checkClosedAndThrowException("Receiver is closed. You cannot invoke 
methods on a closed receiver");
+    }
+
+    private void checkClosedAndThrowException(String closedMessage) throws 
ReceiverException
+    {
+        switch (_state)
+        {
+        case ERROR:
+            throw new ReceiverException("Receiver is in a temporary error 
state. The session may or may not recover from this");
+        case CLOSED:
+            throw new ReceiverException(closedMessage);
+        }
+    }
+
+    /**
+     * A ConnectionException will cause the Session/Sender to go into a 
temporary error state,
+     * which prevents it from being used further.
+     * From there the Session and Sender can be moved into OPENED (if failover 
works) or
+     * CLOSED if there is no failover or if failover has failed.
+     * @param e
+     * @throws MessagingException
+     */
+    private SenderException handleConnectionException(ConnectionException e)
+    {
+        synchronized (_connectionLock)
+        {
+            _state = SenderState.ERROR;
+            _ssn.exception(e); // This might trigger failover in a layer above.
+            if (_state == SenderState.CLOSED)
+            {
+                // The connection has instructed the session and it's child 
objects to be closed.
+                // Either there was no failover, or failover has failed.
+                return new SenderException("Sender is closed due to connection 
error",e);
+            }
+            else
+            {
+                // Asking the application or the Parent handler to retry the 
operation.
+                // The Session and Receiver should be in OPENED state at this 
time.
+                return new SenderException("Sender was in a temporary error 
state due to connection error." +
+                        "Plase retry your operation",e);
+            }
+        }
+    }
+
+    /**
+     * Session Exceptions will generally invalidate the Session.
+     * TODO this needs to be revisited again.
+     * A new session will need to be created in that case.
+     * @param e
+     * @throws MessagingException
+     */
+    private SenderException handleSessionException(SessionException e)
+    {
+        synchronized (_connectionLock)
+        {
+            // This should close all senders (including this) and receivers.
+            _ssn.exception(e);
+        }
+        return new SenderException("Session has been closed",e);
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to