consolidate handling of consumer flow handling and subsequent transfer and 
drain flow responses


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b5e981c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b5e981c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b5e981c4

Branch: refs/heads/master
Commit: b5e981c49d11cc925a4d55ffe5b5d5bf8c00e5d2
Parents: 1b75b7e
Author: Robert Gemmell <[email protected]>
Authored: Tue Dec 9 14:40:03 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Tue Dec 9 14:40:03 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 155 +++++++++++--------
 1 file changed, 93 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b5e981c4/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 0152cbf..cca5ab5 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -707,61 +707,7 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, 
Matcher<UnsignedInteger> creditMatcher)
     {
-        Matcher<Boolean> drainMatcher = null;
-        if(drain)
-        {
-            drainMatcher = equalTo(true);
-        }
-        else
-        {
-            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
-        }
-
-        final FlowMatcher flowMatcher = new FlowMatcher()
-                        .withLinkCredit(creditMatcher)
-                        .withHandle(Matchers.notNullValue())
-                        .withDrain(drainMatcher);
-
-        if(drain && sendDrainFlowResponse)
-        {
-            final FlowFrame drainResponse = new FlowFrame();
-            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: 
shouldnt be hard coded
-            
drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); 
//TODO: shouldnt be hard coded
-            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
-            drainResponse.setDrain(true);
-
-            // The flow frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
-            final FrameSender flowResponseSender = new FrameSender(this, 
FrameType.AMQP, -1, drainResponse, null);
-            flowResponseSender.setValueProvider(new ValueProvider()
-            {
-                @Override
-                public void setValues()
-                {
-                    
flowResponseSender.setChannel(flowMatcher.getActualChannel());
-                    drainResponse.setHandle(calculateLinkHandle(flowMatcher));
-                    
drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
-                    
drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // 
Assuming no 'in-flight' messages.
-                    
drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
-                }
-            });
-
-            flowMatcher.onSuccess(flowResponseSender);
-        }
-
-        addHandler(flowMatcher);
-    }
-
-    private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) 
{
-        UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
-
-        return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
-    }
-
-    private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) 
{
-        UnsignedInteger dc = (UnsignedInteger) 
flowMatcher.getReceivedDeliveryCount();
-        UnsignedInteger lc = (UnsignedInteger) 
flowMatcher.getReceivedLinkCredit();
-
-        return dc.add(lc);
+        expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, 
drain, sendDrainFlowResponse, creditMatcher, null);
     }
 
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType 
headerDescribedType,
@@ -770,7 +716,8 @@ public class TestAmqpPeer implements AutoCloseable
                                                  final 
ApplicationPropertiesDescribedType appPropertiesDescribedType,
                                                  final DescribedType content)
     {
-        expectLinkFlowRespondWithTransfer(headerDescribedType, 
messageAnnotationsDescribedType, propertiesDescribedType, 
appPropertiesDescribedType, content, 1);
+        expectLinkFlowRespondWithTransfer(headerDescribedType, 
messageAnnotationsDescribedType, propertiesDescribedType,
+                                          appPropertiesDescribedType, content, 
1);
     }
 
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType 
headerDescribedType,
@@ -780,19 +727,54 @@ public class TestAmqpPeer implements AutoCloseable
                                                   final DescribedType content,
                                                   final int count)
     {
-        if(count <= 0)
+        expectLinkFlowRespondWithTransfer(headerDescribedType, 
messageAnnotationsDescribedType, propertiesDescribedType,
+                                          appPropertiesDescribedType, content, 
count, false, false,
+                                          
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1);
+    }
+
+    public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType 
headerDescribedType,
+            final MessageAnnotationsDescribedType 
messageAnnotationsDescribedType,
+            final PropertiesDescribedType propertiesDescribedType,
+            final ApplicationPropertiesDescribedType 
appPropertiesDescribedType,
+            final DescribedType content,
+            final int count,
+            final boolean drain,
+            final boolean sendDrainFlowResponse,
+            Matcher<UnsignedInteger> creditMatcher,
+            final Integer nextIncomingId)
+    {
+        if (nextIncomingId == null && count > 0)
+        {
+            throw new IllegalArgumentException("The remote NextIncomingId must 
be specified if transfers have been requested");
+        }
+
+        Matcher<Boolean> drainMatcher = null;
+        if(drain)
+        {
+            drainMatcher = equalTo(true);
+        }
+        else
         {
-            throw new IllegalArgumentException("Message count must be >= 1");
+            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
         }
 
-        int nextIncomingId = 1; // TODO: we shouldn't assume this will be the 
first transfer on the session
+        Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null;
+        if(nextIncomingId != null)
+        {
+             remoteNextIncomingIdMatcher = 
Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId));
+        }
+        else
+        {
+            remoteNextIncomingIdMatcher = 
Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE);
+        }
 
         final FlowMatcher flowMatcher = new FlowMatcher()
                         
.withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)))
-                        .withDrain(Matchers.anyOf(equalTo(false), nullValue()))
-                        
.withNextIncomingId(Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId)));
+                        .withDrain(drainMatcher)
+                        .withNextIncomingId(remoteNextIncomingIdMatcher);
 
         CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+        boolean addComposite = false;
 
         for(int i = 0; i < count; i++)
         {
@@ -822,14 +804,63 @@ public class TestAmqpPeer implements AutoCloseable
                 }
             });
 
+            addComposite = true;
             composite.add(transferResponseSender);
         }
 
-        flowMatcher.onSuccess(composite);
+        if(drain && sendDrainFlowResponse)
+        {
+            final FlowFrame drainResponse = new FlowFrame();
+            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: 
shouldnt be hard coded
+            
drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); 
//TODO: shouldnt be hard coded
+            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+            drainResponse.setDrain(true);
+
+            // The flow frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowResponseSender = new FrameSender(this, 
FrameType.AMQP, -1, drainResponse, null);
+            flowResponseSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    
flowResponseSender.setChannel(flowMatcher.getActualChannel());
+                    drainResponse.setHandle(calculateLinkHandle(flowMatcher));
+                    
drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+                    
drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count));
+                    
drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+                }
+            });
+
+            addComposite = true;
+            composite.add(flowResponseSender);
+        }
+
+        if(addComposite) {
+            flowMatcher.onSuccess(composite);
+        }
 
         addHandler(flowMatcher);
     }
 
+    private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) 
{
+        UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
+
+        return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
+    }
+
+    private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) 
{
+        UnsignedInteger dc = (UnsignedInteger) 
flowMatcher.getReceivedDeliveryCount();
+        UnsignedInteger lc = (UnsignedInteger) 
flowMatcher.getReceivedLinkCredit();
+
+        return dc.add(lc);
+    }
+
+    private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, 
int sentCount) {
+        UnsignedInteger nid = (UnsignedInteger) 
flowMatcher.getReceivedNextIncomingId();
+
+        return nid.add(UnsignedInteger.valueOf(sentCount));
+    }
+
     private Binary prepareTransferPayload(final HeaderDescribedType 
headerDescribedType,
                                           final 
MessageAnnotationsDescribedType messageAnnotationsDescribedType,
                                           final PropertiesDescribedType 
propertiesDescribedType,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to