Author: rgodfrey
Date: Fri Oct 17 14:29:21 2014
New Revision: 1632585

URL: http://svn.apache.org/r1632585
Log:
QPID-6164 : Add synchronous publish capability to 0-8/9/9-1

Added:
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
      - copied, changed from r1631344, 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
      - copied, changed from r1631344, 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
      - copied, changed from r1631344, 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
    
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
   (with props)
Modified:
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
    qpid/trunk/qpid/java/test-profiles/Java010Excludes

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Fri Oct 17 14:29:21 2014
@@ -201,6 +201,8 @@ public class AMQChannel
     private final ConfigurationChangeListener _consumerClosedListener = new 
ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = 
new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
+    private boolean _confirmOnPublish;
+    private long _confirmedMessageCounter;
 
 
     public AMQChannel(AMQProtocolEngine connection, int channelId, final 
MessageStore messageStore)
@@ -394,6 +396,11 @@ public class AMQChannel
         // check and deliver if header says body length is zero
         if (_currentMessage.allContentReceived())
         {
+            if(_confirmOnPublish)
+            {
+                _confirmedMessageCounter++;
+            }
+
             try
             {
 
@@ -421,6 +428,10 @@ public class AMQChannel
 
                     if(!checkMessageUserId(_currentMessage.getContentHeader()))
                     {
+                        if(_confirmOnPublish)
+                        {
+                            _connection.writeFrame(new AMQFrame(_channelId, 
new BasicNackBody(_confirmedMessageCounter, false, false)));
+                        }
                         _transaction.addPostTransactionAction(new 
WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
                     }
                     else
@@ -461,6 +472,12 @@ public class AMQChannel
                         }
                         else
                         {
+                            if(_confirmOnPublish)
+                            {
+                                BasicAckBody responseBody = 
_connection.getMethodRegistry()
+                                        
.createBasicAckBody(_confirmedMessageCounter, false);
+                                
_connection.writeFrame(responseBody.generateFrame(_channelId));
+                            }
                             incrementOutstandingTxnsIfNecessary();
                         }
                     }
@@ -503,7 +520,7 @@ public class AMQChannel
                     description, mandatory, isTransactional(), 
closeOnNoRoute));
         }
 
-        if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
+        if (mandatory && isTransactional() && !_confirmOnPublish && 
_connection.isCloseWhenNoRoute())
         {
             _connection.closeConnection(AMQConstant.NO_ROUTE,
                     "No route for message " + currentMessageDescription(), 
_channelId);
@@ -512,6 +529,10 @@ public class AMQChannel
         {
             if (mandatory || message.isImmediate())
             {
+                if(_confirmOnPublish)
+                {
+                    _connection.writeFrame(new AMQFrame(_channelId, new 
BasicNackBody(_confirmedMessageCounter, false, false)));
+                }
                 _transaction.addPostTransactionAction(new 
WriteReturnAction(AMQConstant.NO_ROUTE,
                                                                             
"No Route for message "
                                                                             + 
currentMessageDescription(),
@@ -2236,8 +2257,6 @@ public class AMQChannel
 
                 if (requeue)
                 {
-                    //this requeue represents a message rejected from the 
pre-dispatch queue
-                    //therefore we need to amend the delivery counter.
                     message.decrementDeliveryCount();
 
                     requeue(deliveryTag);
@@ -2359,6 +2378,85 @@ public class AMQChannel
     }
 
     @Override
+    public void receiveBasicNack(final long deliveryTag, final boolean 
multiple, final boolean requeue)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] BasicNack[" +" 
deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + 
requeue + " ]");
+        }
+
+        Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>();
+        _unacknowledgedMessageMap.collect(deliveryTag, multiple, 
nackedMessageMap);
+
+        for(MessageInstance message : nackedMessageMap.values())
+        {
+
+            if (message == null)
+            {
+                _logger.warn("Ignoring nack request as message is null for 
tag:" + deliveryTag);
+            }
+            else
+            {
+
+                if (message.getMessage() == null)
+                {
+                    _logger.warn("Message has already been purged, unable to 
nack.");
+                }
+                else
+                {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Nack-ing: DT:" + deliveryTag
+                                      + "-" + message.getMessage() +
+                                      ": Requeue:" + requeue
+                                      +
+                                      " on channel:" + debugIdentity());
+                    }
+
+                    if (requeue)
+                    {
+                        message.decrementDeliveryCount();
+
+                        requeue(deliveryTag);
+                    }
+                    else
+                    {
+                        message.reject();
+
+                        final boolean maxDeliveryCountEnabled = 
isMaxDeliveryCountEnabled(deliveryTag);
+                        _logger.debug("maxDeliveryCountEnabled: "
+                                      + maxDeliveryCountEnabled
+                                      + " deliveryTag "
+                                      + deliveryTag);
+                        if (maxDeliveryCountEnabled)
+                        {
+                            final boolean deliveredTooManyTimes = 
isDeliveredTooManyTimes(deliveryTag);
+                            _logger.debug("deliveredTooManyTimes: "
+                                          + deliveredTooManyTimes
+                                          + " deliveryTag "
+                                          + deliveryTag);
+                            if (deliveredTooManyTimes)
+                            {
+                                deadLetter(deliveryTag);
+                            }
+                            else
+                            {
+                                message.incrementDeliveryCount();
+                            }
+                        }
+                        else
+                        {
+                            requeue(deliveryTag);
+                        }
+                    }
+                }
+            }
+
+        }
+
+    }
+
+    @Override
     public void receiveChannelFlow(final boolean active)
     {
         if(_logger.isDebugEnabled())
@@ -3355,6 +3453,21 @@ public class AMQChannel
         resend();
     }
 
+    @Override
+    public void receiveConfirmSelect(final boolean nowait)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + _channelId + "] ConfirmSelect [ nowait: " 
+ nowait + " ]");
+        }
+        _confirmOnPublish = true;
+
+        if(!nowait)
+        {
+            _connection.writeFrame(new AMQFrame(_channelId, 
ConfirmSelectOkBody.INSTANCE));
+        }
+    }
+
 
     private void closeChannel(final AMQConstant cause, final String message)
     {

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Fri Oct 17 14:29:21 2014
@@ -85,6 +85,7 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
@@ -432,6 +433,7 @@ public class AMQProtocolEngine implement
                     String.valueOf(_closeWhenNoRoute));
             
serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED,
                                        
String.valueOf(_broker.isMessageCompressionEnabled()));
+            
serverProperties.setString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED,
 Boolean.TRUE.toString());
 
             AMQMethodBody responseBody = 
getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
                                                                                
        (short) pv.getActualMinorVersion(),
@@ -1119,9 +1121,17 @@ public class AMQProtocolEngine implement
                                                                                
              _currentClassId,
                                                                                
              _currentMethodId);
 
-                writeFrame(closeBody.generateFrame(0));
+                try
+                {
+                    writeFrame(closeBody.generateFrame(0));
+
+                    _sender.close();
+                }
+                catch(SenderException e)
+                {
+                    // ignore
+                }
 
-                _sender.close();
             }
             finally
             {

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
 Fri Oct 17 14:29:21 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.AMQException;
@@ -63,7 +64,7 @@ public interface UnacknowledgedMessageMa
     Set<Long> getDeliveryTags();
 
     Collection<MessageInstance> acknowledge(long deliveryTag, boolean 
multiple);
-
+    void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
 }
 
 

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 Fri Oct 17 14:29:21 2014
@@ -46,6 +46,8 @@ import org.apache.qpid.common.ServerProp
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.ChannelOpenBody;
 import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConfirmSelectBody;
+import org.apache.qpid.framing.ConfirmSelectOkBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.framing.TxSelectBody;
@@ -68,6 +70,8 @@ public class AMQConnectionDelegate_8_0 i
     private final AMQConnection _conn;
     private boolean _messageCompressionSupported;
     private boolean _addrSyntaxSupported;
+    private boolean _confirmedPublishSupported;
+    private boolean _confirmedPublishNonTransactionalSupported;
 
     public void closeConnection(long timeout) throws JMSException, AMQException
     {
@@ -94,6 +98,11 @@ public class AMQConnectionDelegate_8_0 i
         return ((cause instanceof ConnectException) || (cause instanceof 
UnresolvedAddressException));
     }
 
+    public boolean isConfirmedPublishSupported()
+    {
+        return _confirmedPublishSupported;
+    }
+
     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) 
throws AMQException, IOException
     {
         if (_logger.isDebugEnabled())
@@ -146,6 +155,8 @@ public class AMQConnectionDelegate_8_0 i
             _conn.setConnected(true);
             _conn.logConnected(network.getLocalAddress(), 
network.getRemoteAddress());
             _messageCompressionSupported = checkMessageCompressionSupported();
+            _confirmedPublishSupported = checkConfirmedPublishSupported();
+            _confirmedPublishNonTransactionalSupported = 
checkConfirmedPublishNonTransactionalSupported();
             return null;
         }
         else
@@ -155,6 +166,32 @@ public class AMQConnectionDelegate_8_0 i
 
     }
 
+    // RabbitMQ supports confirmed publishing, but only on non transactional 
sessions
+    private boolean checkConfirmedPublishNonTransactionalSupported()
+    {
+        FieldTable serverProperties = 
_conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+        if( serverProperties != null
+            && serverProperties.containsKey("capabilities")
+            && serverProperties.get("capabilities") instanceof FieldTable)
+        {
+            FieldTable capabilities = 
serverProperties.getFieldTable("capabilities");
+            if(capabilities.containsKey("publisher_confirms")
+               && capabilities.get("publisher_confirms") instanceof Boolean
+               && capabilities.getBoolean("publisher_confirms"))
+            {
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+        else
+        {
+            return false;
+        }
+    }
+
     public org.apache.qpid.jms.Session createSession(final boolean transacted, 
final int acknowledgeMode, final int prefetch)
             throws JMSException
     {
@@ -266,9 +303,21 @@ public class AMQConnectionDelegate_8_0 i
             }
             TxSelectBody body = 
_conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
 
-            // TODO: Be aware of possible changes to parameter order as 
versions change.
+
             
_conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), 
TxSelectOkBody.class);
         }
+        boolean useConfirms = (_confirmedPublishSupported || (!transacted && 
_confirmedPublishNonTransactionalSupported))
+                              && "all".equals(_conn.getSyncPublish());
+        if(useConfirms)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Issuing ConfirmSelect for " + channelId);
+            }
+            ConfirmSelectBody body = new ConfirmSelectBody(false);
+
+            
_conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), 
ConfirmSelectOkBody.class);
+        }
     }
 
     public void failoverPrep()
@@ -340,7 +389,7 @@ public class AMQConnectionDelegate_8_0 i
                 }
                 catch (IllegalStateException e)
                 {
-                    if (!(e.getMessage().startsWith("Fail-over interupted 
no-op failover support")))
+                    if (!(e.getMessage().startsWith("Fail-over interrupted 
no-op failover support")))
                     {
                         throw e;
                     }
@@ -424,6 +473,14 @@ public class AMQConnectionDelegate_8_0 i
 
     }
 
+    private boolean checkConfirmedPublishSupported()
+    {
+        FieldTable serverProperties = 
_conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
+        return serverProperties != null
+               && 
Boolean.parseBoolean(serverProperties.getString(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED));
+
+    }
+
     public boolean isMessageCompressionSupported()
     {
         return _messageCompressionSupported;
@@ -433,4 +490,9 @@ public class AMQConnectionDelegate_8_0 i
     {
         return _addrSyntaxSupported;
     }
+
+    public boolean isConfirmedPublishNonTransactionalSupported()
+    {
+        return _confirmedPublishNonTransactionalSupported;
+    }
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 Fri Oct 17 14:29:21 2014
@@ -122,7 +122,7 @@ public abstract class BasicMessageProduc
                             ? System.getProperty("qpid.default_mandatory")
                             : "false"));
 
-    private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+    private PublishMode _publishMode = PublishMode.ASYNC_PUBLISH_ALL;
 
     protected BasicMessageProducer(Logger logger,AMQConnection connection, 
AMQDestination destination, boolean transacted, int channelId,
                                    AMQSession session, long producerId, 
Boolean immediate, Boolean mandatory) throws AMQException
@@ -165,16 +165,16 @@ public abstract class BasicMessageProduc
         // Support for deprecated option sync_persistence
         if (syncPub.equals("persistent") || _connection.getSyncPersistence())
         {
-            publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
+            _publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
         }
         else if (syncPub.equals("all"))
         {
-            publishMode = PublishMode.SYNC_PUBLISH_ALL;
+            _publishMode = PublishMode.SYNC_PUBLISH_ALL;
         }
 
         if (_logger.isDebugEnabled())
         {
-               _logger.debug("MessageProducer " + toString() + " using publish 
mode : " + publishMode);
+               _logger.debug("MessageProducer " + toString() + " using publish 
mode : " + _publishMode);
         }
     }
 
@@ -720,12 +720,12 @@ public abstract class BasicMessageProduc
 
     protected PublishMode getPublishMode()
     {
-        return publishMode;
+        return _publishMode;
     }
 
     protected void setPublishMode(PublishMode publishMode)
     {
-        this.publishMode = publishMode;
+        _publishMode = publishMode;
     }
 
     Logger getLogger()

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
 Fri Oct 17 14:29:21 2014
@@ -32,14 +32,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicNackBody;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.CompositeAMQDataBlock;
 import org.apache.qpid.framing.ContentBody;
@@ -211,9 +216,6 @@ public class BasicMessageProducer_0_8 ex
         }
 
 
-        // TODO: This is a hacky way of getting the AMQP class-id for the 
Basic class
-        int classIfForBasic = 
getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
-
         AMQFrame contentHeaderFrame =
             ContentHeaderBody.createAMQFrame(getChannelId(),
                                              contentHeaderProperties, size);
@@ -232,7 +234,7 @@ public class BasicMessageProducer_0_8 ex
 
         frames[0] = publishFrame;
         frames[1] = contentHeaderFrame;
-        CompositeAMQDataBlock compositeFrame = new 
CompositeAMQDataBlock(frames);
+        final CompositeAMQDataBlock compositeFrame = new 
CompositeAMQDataBlock(frames);
 
         try
         {
@@ -246,7 +248,40 @@ public class BasicMessageProducer_0_8 ex
             throw jmse;
         }
 
-        getConnection().getProtocolHandler().writeFrame(compositeFrame);
+        AMQConnectionDelegate_8_0 connectionDelegate80 = 
(AMQConnectionDelegate_8_0) (getConnection().getDelegate());
+
+        boolean useConfirms = getPublishMode() == PublishMode.SYNC_PUBLISH_ALL
+                              && 
(connectionDelegate80.isConfirmedPublishSupported()
+                               || (!getSession().isTransacted() && 
connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
+
+        if(!useConfirms)
+        {
+            getConnection().getProtocolHandler().writeFrame(compositeFrame);
+        }
+        else
+        {
+            final PublishConfirmMessageListener frameListener = new 
PublishConfirmMessageListener(getChannelId());
+            try
+            {
+
+                
getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame,
+                                                                               
       frameListener);
+
+                if(frameListener.isRejected())
+                {
+                    throw new JMSException("The message was not accepted by 
the server (e.g. because the address was no longer valid)");
+                }
+            }
+            catch (AMQException e)
+            {
+                throw new JMSAMQException(e);
+            }
+            catch (FailoverException e)
+            {
+                throw new JMSAMQException("Fail-over interrupted send. Status 
of the send is uncertain.", e);
+
+            }
+        }
     }
 
     /**
@@ -290,7 +325,7 @@ public class BasicMessageProducer_0_8 ex
 
     private int calculateContentBodyFrameCount(ByteBuffer payload)
     {
-        // we substract one from the total frame maximum size to account for 
the end of frame marker in a body frame
+        // we subtract one from the total frame maximum size to account for 
the end of frame marker in a body frame
         // (0xCE byte).
         int frameCount;
         if ((payload == null) || (payload.remaining() == 0))
@@ -313,4 +348,42 @@ public class BasicMessageProducer_0_8 ex
     {
         return (AMQSession_0_8) super.getSession();
     }
+
+    private static class PublishConfirmMessageListener extends 
BlockingMethodFrameListener
+    {
+        private boolean _rejected;
+
+        /**
+         * Creates a new method listener, that filters incoming method to just 
those that match the specified channel id.
+         *
+         * @param channelId The channel id to filter incoming methods with.
+         */
+        public PublishConfirmMessageListener(final int channelId)
+        {
+            super(channelId);
+        }
+
+        @Override
+        public boolean processMethod(final int channelId, final AMQMethodBody 
frame)
+        {
+            if (frame instanceof BasicAckBody)
+            {
+                return true;
+            }
+            else if (frame instanceof BasicNackBody)
+            {
+                _rejected = true;
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+
+        public boolean isRejected()
+        {
+            return _rejected;
+        }
+    }
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
 Fri Oct 17 14:29:21 2014
@@ -68,7 +68,7 @@ public class FailoverNoopSupport<T, E ex
         }
         catch (FailoverException e)
         {
-            throw new IllegalStateException("Fail-over interupted no-op 
failover support. "
+            throw new IllegalStateException("Fail-over interrupted no-op 
failover support. "
                 + "No-op support should only be used where the caller is 
certain fail-over cannot occur.", e);
         }
     }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
 Fri Oct 17 14:29:21 2014
@@ -45,7 +45,7 @@ import org.apache.qpid.client.AMQConnect
  * <p>
  * Wrapping a synchronous method in a FailoverRetrySupport will have the 
effect that the operation will not be
  * started during fail-over, but be delayed until any current fail-over has 
completed. Should a fail-over process want
- * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport 
will detect this and rety the operation
+ * to start whilst waiting for the synchronous reply, the FailoverRetrySupport 
will detect this and retry the operation
  * until it succeeds. Synchronous methods are usually coordinated with a
  * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which 
is notified when a fail-over process wants
  * to start and throws a FailoverException in response to this.
@@ -53,12 +53,6 @@ import org.apache.qpid.client.AMQConnect
  * Wrapping an asynchronous method in a FailoverRetrySupport will have the 
effect that the operation will not be
  * started during fail-over, but be delayed until any current fail-over has 
completed.
  * <p>
- * TODO  Another continuation. Could use an interface Continuation (as 
described in other todos)
- *      Then have a wrapping continuation (this), which blocks on an arbitrary
- *      Condition or Latch (specified in constructor call), that this blocks 
on before calling the wrapped Continuation.
- *      Must work on Java 1.4, so check retrotranslator works on 
Lock/Condition or latch first. Argument and return type
- *      to match wrapped condition as type parameters. Rename to 
AsyncConditionalContinuation or something like that.
- * <p>
  * TODO  InterruptedException not handled well.
  */
 public class FailoverRetrySupport<T, E extends Exception> implements 
FailoverSupport<T, E>

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
 Fri Oct 17 14:29:21 2014
@@ -53,6 +53,7 @@ public class ClientMethodDispatcherImpl 
 
     private static final Logger _logger = 
LoggerFactory.getLogger(ClientMethodDispatcherImpl.class);
 
+
     private static interface DispatcherFactory
     {
         public ClientMethodDispatcherImpl 
createMethodDispatcher(AMQProtocolSession session);
@@ -147,6 +148,13 @@ public class ClientMethodDispatcherImpl 
         return false;
     }
 
+    @Override
+    public boolean dispatchConfirmSelectOk(final ConfirmSelectOkBody 
confirmSelectOkBody, final int channelId)
+            throws AMQException
+    {
+        return false;
+    }
+
     public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int 
channelId) throws AMQException
     {
         _basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
@@ -271,11 +279,19 @@ public class ClientMethodDispatcherImpl 
         throw new AMQMethodNotImplementedException(body);
     }
 
+    @Override
     public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws 
AMQException
     {
-        throw new AMQMethodNotImplementedException(body);
+        return false;
     }
 
+    @Override
+    public boolean dispatchBasicNack(final BasicNackBody basicNackBody, final 
int channelId)
+    {
+        return false;
+    }
+
+
     public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) 
throws AMQException
     {
         throw new AMQMethodNotImplementedException(body);
@@ -400,6 +416,12 @@ public class ClientMethodDispatcherImpl 
         return false;
     }
 
+    @Override
+    public boolean dispatchConfirmSelect(final ConfirmSelectBody body, final 
int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
     public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int 
channelId) throws AMQException
     {
         _exchangeBoundOkMethodHandler.methodReceived(_session, body, 
channelId);

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
 Fri Oct 17 14:29:21 2014
@@ -661,7 +661,7 @@ public class AMQProtocolHandler implemen
      * @param frame
      * @param listener the blocking listener. Note the calling thread will 
block.
      */
-    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, 
BlockingMethodFrameListener listener)
+    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, 
BlockingMethodFrameListener listener)
             throws AMQException, FailoverException
     {
         return writeCommandFrameAndWaitForReply(frame, listener, 
DEFAULT_SYNC_TIMEOUT);
@@ -674,7 +674,7 @@ public class AMQProtocolHandler implemen
      * @param frame
      * @param listener the blocking listener. Note the calling thread will 
block.
      */
-    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, 
BlockingMethodFrameListener listener,
+    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, 
BlockingMethodFrameListener listener,
                                                            long timeout) 
throws AMQException, FailoverException
     {
         try

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
 Fri Oct 17 14:29:21 2014
@@ -209,6 +209,9 @@ public class ClientDecoder extends AMQDe
                 case 0x003c0048:
                     BasicGetEmptyBody.process(in, channelMethodProcessor);
                     break;
+                case 0x003c0050:
+                    BasicAckBody.process(in, channelMethodProcessor);
+                    break;
                 case 0x003c0065:
                     if(!channelMethodProcessor.ignoreAllButCloseOk())
                     {
@@ -221,6 +224,18 @@ public class ClientDecoder extends AMQDe
                         channelMethodProcessor.receiveBasicRecoverSyncOk();
                     }
                     break;
+                case 0x003c0078:
+                    BasicNackBody.process(in, channelMethodProcessor);
+                    break;
+
+                // CONFIRM CLASS:
+
+                case 0x0055000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveConfirmSelectOk();
+                    }
+                    break;
 
                 // TX_CLASS:
 

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
 Fri Oct 17 14:29:21 2014
@@ -197,6 +197,15 @@ public class ServerDecoder extends AMQDe
                 case 0x003c006e:
                     BasicRecoverSyncBody.process(in, channelMethodProcessor);
                     break;
+                case 0x003c0078:
+                    BasicNackBody.process(in, channelMethodProcessor);
+                    break;
+
+                // CONFIRM CLASS:
+
+                case 0x0055000a:
+                    ConfirmSelectBody.process(in, channelMethodProcessor);
+                    break;
 
                 // TX_CLASS:
 
@@ -219,6 +228,7 @@ public class ServerDecoder extends AMQDe
                     }
                     break;
 
+
                 default:
                     throw newUnknownMethodException(classId, methodId,
                                                     
methodProcessor.getProtocolVersion());

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
 Fri Oct 17 14:29:21 2014
@@ -113,7 +113,7 @@ public class BasicAckBody extends AMQMet
     }
 
     public static void process(final MarkableDataInput buffer,
-                               final ServerChannelMethodProcessor dispatcher) 
throws IOException
+                               final ChannelMethodProcessor dispatcher) throws 
IOException
     {
 
         long deliveryTag = buffer.readLong();

Copied: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
 (from r1631344, 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java&p1=qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java&r1=1631344&r2=1632585&rev=1632585&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
 Fri Oct 17 14:29:21 2014
@@ -33,33 +33,39 @@ import java.io.IOException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 
-public class BasicAckBody extends AMQMethodBodyImpl implements 
EncodableAMQDataBlock, AMQMethodBody
+public class BasicNackBody extends AMQMethodBodyImpl implements 
EncodableAMQDataBlock, AMQMethodBody
 {
 
     public static final int CLASS_ID =  60;
-    public static final int METHOD_ID = 80;
+    public static final int METHOD_ID = 120;
 
     // Fields declared in specification
     private final long _deliveryTag; // [deliveryTag]
     private final byte _bitfield0; // [multiple]
 
     // Constructor
-    public BasicAckBody(MarkableDataInput buffer) throws 
AMQFrameDecodingException, IOException
+    public BasicNackBody(MarkableDataInput buffer) throws 
AMQFrameDecodingException, IOException
     {
         _deliveryTag = buffer.readLong();
         _bitfield0 = buffer.readByte();
     }
 
-    public BasicAckBody(
+    public BasicNackBody(
             long deliveryTag,
-            boolean multiple
-                       )
+            boolean multiple,
+            boolean requeue
+                        )
     {
         _deliveryTag = deliveryTag;
         byte bitfield0 = (byte)0;
         if( multiple )
         {
-            bitfield0 = (byte) (((int) bitfield0) | (1 << 0));
+            bitfield0 = (byte) (((int) bitfield0) | 1);
+
+        }
+        if( requeue )
+        {
+            bitfield0 = (byte) (((int) bitfield0) | 2);
         }
         _bitfield0 = bitfield0;
     }
@@ -78,9 +84,15 @@ public class BasicAckBody extends AMQMet
     {
         return _deliveryTag;
     }
+
     public final boolean getMultiple()
     {
-        return (((int)(_bitfield0)) & ( 1 << 0)) != 0;
+        return (((int)(_bitfield0)) &  1) != 0;
+    }
+
+    public final boolean getRequeue()
+    {
+        return (((int)(_bitfield0)) &  2 ) != 0;
     }
 
     protected int getBodySize()
@@ -97,30 +109,35 @@ public class BasicAckBody extends AMQMet
 
     public boolean execute(MethodDispatcher dispatcher, int channelId) throws 
AMQException
        {
-        return dispatcher.dispatchBasicAck(this, channelId);
+        return dispatcher.dispatchBasicNack(this, channelId);
        }
 
     public String toString()
     {
-        StringBuilder buf = new StringBuilder("[BasicAckBodyImpl: ");
+        StringBuilder buf = new StringBuilder("[BasicNackBodyImpl: ");
         buf.append( "deliveryTag=" );
         buf.append(  getDeliveryTag() );
         buf.append( ", " );
         buf.append( "multiple=" );
         buf.append(  getMultiple() );
+        buf.append( ", " );
+        buf.append( "requeue=" );
+        buf.append(  getRequeue() );
         buf.append("]");
         return buf.toString();
     }
 
     public static void process(final MarkableDataInput buffer,
-                               final ServerChannelMethodProcessor dispatcher) 
throws IOException
+                               final ChannelMethodProcessor dispatcher) throws 
IOException
     {
 
         long deliveryTag = buffer.readLong();
-        boolean multiple = (buffer.readByte() & 0x01) != 0;
+        byte bitfield = buffer.readByte();
+        boolean multiple = (bitfield & 0x01) != 0;
+        boolean requeue = (bitfield & 0x02) != 0;
         if(!dispatcher.ignoreAllButCloseOk())
         {
-            dispatcher.receiveBasicAck(deliveryTag, multiple);
+            dispatcher.receiveBasicNack(deliveryTag, multiple, requeue);
         }
     }
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
 Fri Oct 17 14:29:21 2014
@@ -35,4 +35,8 @@ public interface ChannelMethodProcessor
     void receiveMessageHeader(BasicContentHeaderProperties properties, long 
bodySize);
 
     boolean ignoreAllButCloseOk();
+
+    void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue);
+
+    void receiveBasicAck(long deliveryTag, boolean multiple);
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientChannelMethodProcessor.java
 Fri Oct 17 14:29:21 2014
@@ -75,4 +75,5 @@ public interface ClientChannelMethodProc
 
     void receiveTxRollbackOk();
 
+    void receiveConfirmSelectOk();
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ClientMethodDispatcher.java
 Fri Oct 17 14:29:21 2014
@@ -72,4 +72,6 @@ public interface ClientMethodDispatcher
             throws AMQException;
 
     boolean dispatchChannelAlert(ChannelAlertBody channelAlertBody, int 
channelId) throws AMQException;
+
+    boolean dispatchConfirmSelectOk(ConfirmSelectOkBody confirmSelectOkBody, 
int channelId) throws AMQException;
 }

Copied: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
 (from r1631344, 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java&p1=qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java&r1=1631344&r2=1632585&rev=1632585&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
 Fri Oct 17 14:29:21 2014
@@ -33,24 +33,24 @@ import java.io.IOException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.MarkableDataInput;
 
-public class ChannelFlowOkBody extends AMQMethodBodyImpl implements 
EncodableAMQDataBlock, AMQMethodBody
+public class ConfirmSelectBody extends AMQMethodBodyImpl implements 
EncodableAMQDataBlock, AMQMethodBody
 {
 
-    public static final int CLASS_ID =  20;
-    public static final int METHOD_ID = 21;
+    public static final int CLASS_ID =  85;
+    public static final int METHOD_ID = 10;
 
     // Fields declared in specification
-    private final boolean _active; // [active]
+    private final boolean _nowait; // [active]
 
     // Constructor
-    public ChannelFlowOkBody(MarkableDataInput buffer) throws 
AMQFrameDecodingException, IOException
+    public ConfirmSelectBody(MarkableDataInput buffer) throws 
AMQFrameDecodingException, IOException
     {
-        _active = (buffer.readByte() & 0x01) == 0x01;
+        _nowait = (buffer.readByte() & 0x01) == 0x01;
     }
 
-    public ChannelFlowOkBody(boolean active)
+    public ConfirmSelectBody(boolean nowait)
     {
-        _active = active;
+        _nowait = nowait;
     }
 
     public int getClazz()
@@ -63,44 +63,43 @@ public class ChannelFlowOkBody extends A
         return METHOD_ID;
     }
 
-    public final boolean getActive()
+    public final boolean getNowait()
     {
-        return _active;
+        return _nowait;
     }
 
     protected int getBodySize()
     {
-        int size = 1;
-        return size;
+        return 1;
     }
 
     public void writeMethodPayload(DataOutput buffer) throws IOException
     {
-        writeBitfield( buffer, _active ? (byte)1 : (byte)0 );
+        writeBitfield( buffer, _nowait ? (byte)1 : (byte)0 );
     }
 
     public boolean execute(MethodDispatcher dispatcher, int channelId) throws 
AMQException
        {
-        return dispatcher.dispatchChannelFlowOk(this, channelId);
+        return dispatcher.dispatchConfirmSelect(this, channelId);
        }
 
     public String toString()
     {
-        StringBuilder buf = new StringBuilder("[ChannelFlowOkBodyImpl: ");
+        StringBuilder buf = new StringBuilder("[ConfirmSelectBody: ");
         buf.append( "active=" );
-        buf.append(  getActive() );
+        buf.append(  getNowait() );
         buf.append("]");
         return buf.toString();
     }
 
     public static void process(final MarkableDataInput buffer,
-                               final ChannelMethodProcessor dispatcher)
+                               final ServerChannelMethodProcessor dispatcher)
             throws IOException
     {
-        boolean active = (buffer.readByte() & 0x01) == 0x01;
+        boolean nowait = (buffer.readByte() & 0x01) == 0x01;
         if(!dispatcher.ignoreAllButCloseOk())
         {
-            dispatcher.receiveChannelFlowOk(active);
+            dispatcher.receiveConfirmSelect(nowait);
         }
     }
 }

Copied: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
 (from r1631344, 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java&p1=qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java&r1=1631344&r2=1632585&rev=1632585&view=diff
==============================================================================
--- 
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
 Fri Oct 17 14:29:21 2014
@@ -32,15 +32,15 @@ import java.io.IOException;
 
 import org.apache.qpid.AMQException;
 
-public class TxSelectOkBody extends AMQMethodBodyImpl implements 
EncodableAMQDataBlock, AMQMethodBody
+public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements 
EncodableAMQDataBlock, AMQMethodBody
 {
 
-    public static final int CLASS_ID =  90;
+    public static final int CLASS_ID =  85;
     public static final int METHOD_ID = 11;
 
-    public static final TxSelectOkBody INSTANCE = new TxSelectOkBody();
+    public static final ConfirmSelectOkBody INSTANCE = new 
ConfirmSelectOkBody();
 
-    private TxSelectOkBody()
+    private ConfirmSelectOkBody()
     {
     }
 
@@ -66,12 +66,12 @@ public class TxSelectOkBody extends AMQM
 
     public boolean execute(MethodDispatcher dispatcher, int channelId) throws 
AMQException
        {
-        return dispatcher.dispatchTxSelectOk(this, channelId);
+        return dispatcher.dispatchConfirmSelectOk(this, channelId);
        }
 
     public String toString()
     {
-        return "[TxSelectOkBody]";
+        return "[ConfirmSelectOkBody]";
     }
 
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
 Fri Oct 17 14:29:21 2014
@@ -358,6 +358,12 @@ public class FrameCreatingMethodProcesso
         }
 
         @Override
+        public void receiveConfirmSelectOk()
+        {
+            _processedMethods.add(new AMQFrame(_channelId, 
ConfirmSelectOkBody.INSTANCE));
+        }
+
+        @Override
         public void receiveAccessRequest(final AMQShortString realm,
                                          final boolean exclusive,
                                          final boolean passive,
@@ -564,6 +570,12 @@ public class FrameCreatingMethodProcesso
         }
 
         @Override
+        public void receiveConfirmSelect(final boolean nowait)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new 
ConfirmSelectBody(nowait)));
+        }
+
+        @Override
         public void receiveChannelFlow(final boolean active)
         {
             _processedMethods.add(new AMQFrame(_channelId, new 
ChannelFlowBody(active)));
@@ -607,5 +619,11 @@ public class FrameCreatingMethodProcesso
         {
             return false;
         }
+
+        @Override
+        public void receiveBasicNack(final long deliveryTag, final boolean 
multiple, final boolean requeue)
+        {
+            _processedMethods.add(new AMQFrame(_channelId, new 
BasicNackBody(deliveryTag, multiple, requeue)));
+        }
     }
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodDispatcher.java
 Fri Oct 17 14:29:21 2014
@@ -32,4 +32,6 @@ package org.apache.qpid.framing;
 public interface MethodDispatcher extends
                      ClientMethodDispatcher, ServerMethodDispatcher
 {
+
+    boolean dispatchBasicNack(BasicNackBody basicNackBody, int channelId);
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerChannelMethodProcessor.java
 Fri Oct 17 14:29:21 2014
@@ -89,4 +89,5 @@ public interface ServerChannelMethodProc
 
     void receiveTxRollback();
 
+    void receiveConfirmSelect(boolean nowait);
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ServerMethodDispatcher.java
 Fri Oct 17 14:29:21 2014
@@ -68,4 +68,6 @@ public interface ServerMethodDispatcher
     boolean dispatchQueueUnbind(QueueUnbindBody queueUnbindBody, int 
channelId) throws AMQException;
 
     boolean dispatchBasicRecoverSync(BasicRecoverSyncBody 
basicRecoverSyncBody, int channelId) throws AMQException;
+
+    boolean dispatchConfirmSelect(ConfirmSelectBody confirmSelectBody, int 
channelId) throws AMQException;
 }

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
 Fri Oct 17 14:29:21 2014
@@ -60,6 +60,8 @@ public class ConnectionStartProperties
 
     public static final String SESSION_FLOW = "qpid.session_flow";
 
+    public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = 
"qpid.confirmed_publish_supported";
+
     public static int _pid;
 
     public static final String _platformInfo;

Added: 
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java?rev=1632585&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
 Fri Oct 17 14:29:21 2014
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class SyncPublishTest extends QpidBrokerTestCase
+{
+    private Connection _connection;
+
+    @Override
+    public void setUp() throws Exception
+    {
+
+        super.setUp();
+        Map<String, String> options = new HashMap<>();
+        options.put(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all");
+        _connection = getConnectionWithOptions(options);
+    }
+
+    @Override
+    public void tearDown() throws Exception
+    {
+        _connection.close();
+        super.tearDown();
+    }
+
+    public void testAnonPublisherUnknownDestination() throws Exception
+    {
+        Session session = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+        try
+        {
+            
producer.send(session.createQueue("direct://amq.direct/unknown/unknown"),session.createTextMessage("hello"));
+            fail("Send to unknown destination should result in error");
+        }
+        catch (JMSException e)
+        {
+            // pass
+        }
+    }
+
+
+    public void testAnonPublisherUnknownDestinationTransactional() throws 
Exception
+    {
+        Session session = _connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(null);
+        try
+        {
+            
producer.send(session.createQueue("direct://amq.direct/unknown/unknown"),session.createTextMessage("hello"));
+            fail("Send to unknown destination should result in error");
+        }
+        catch (JMSException e)
+        {
+            // pass
+        }
+        try
+        {
+            session.commit();
+        }
+        catch (JMSException e)
+        {
+            fail("session should commit successfully even though the message 
was not sent");
+        }
+
+    }
+
+    public void testQueueRemovedAfterConsumerCreated() throws JMSException
+    {
+        Session session = _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue queue = session.createTemporaryQueue();
+        MessageProducer producer = session.createProducer(queue);
+        try
+        {
+            producer.send(session.createTextMessage("hello"));
+        }
+        catch (JMSException e)
+        {
+            fail("Send to temporary queue should succeed");
+        }
+
+        try
+        {
+            queue.delete();
+        }
+        catch (JMSException e)
+        {
+            fail("temporary queue should be deletable");
+        }
+
+        try
+        {
+            producer.send(session.createTextMessage("hello"));
+            fail("Send to deleted temporary queue should not succeed");
+        }
+        catch (JMSException e)
+        {
+            // pass
+        }
+
+
+    }
+}

Propchange: 
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/SyncPublishTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1632585&r1=1632584&r2=1632585&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Fri Oct 17 14:29:21 2014
@@ -76,3 +76,5 @@ org.apache.qpid.systest.management.jmx.Q
 
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
 
 
org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
+
+org.apache.qpid.client.SyncPublishTest#*



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to