Author: rgodfrey
Date: Fri Oct 19 21:02:03 2012
New Revision: 1400287

URL: http://svn.apache.org/viewvc?rev=1400287&view=rev
Log:
QPID-4382 : Wait for subsequent frames to arrive in synchronous receive

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1400287&r1=1400286&r2=1400287&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
 Fri Oct 19 21:02:03 2012
@@ -20,6 +20,15 @@
  */
 package org.apache.qpid.amqp_1_0.client;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
 import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
 import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.Fra
 import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 public class Connection
 {
     private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
@@ -224,7 +222,6 @@ public class Connection
             }
 
 
-            //ConnectionHandler.OutputHandler outputHandler = new 
ConnectionHandler.OutputHandler(outputStream, out, 
_conn.getDescribedTypeRegistry());
             ConnectionHandler.BytesOutputHandler outputHandler = new 
ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
             Thread outputThread = new Thread(outputHandler);
             outputThread.setDaemon(true);
@@ -236,8 +233,6 @@ public class Connection
             final ConnectionHandler handler = new ConnectionHandler(_conn);
             final InputStream inputStream = s.getInputStream();
 
-            //final AMQPTransport transport = new AMQPTransport(new 
AMQPFrameTransport(_conn));
-
             Thread inputThread = new Thread(new Runnable()
             {
 
@@ -246,7 +241,6 @@ public class Connection
                     try
                     {
                         doRead(handler, inputStream);
-//                        doRead(transport, inputStream);
                     }
                     finally
                     {
@@ -268,85 +262,6 @@ public class Connection
             inputThread.setDaemon(true);
             inputThread.start();
 
-/*
-            Thread outputThread = new Thread(new Runnable()
-            {
-
-                private int _lastWrite;
-
-                public void run()
-                {
-                    try
-                    {
-//                        doRead(handler, inputStream);
-                        final Object lock = new Object();
-                        transport.setOutputStateChangeListener(new 
StateChangeListener()
-                        {
-
-                            public void onStateChange(final boolean active)
-                            {
-                                synchronized (lock)
-                                {
-                                    lock.notifyAll();
-                                }
-                            }
-                        });
-
-                        synchronized(lock)
-                        {
-                            while(transport.isOpenForOutput())
-                            {
-                                _lastWrite = 0;
-                                transport.getNextBytes(new BytesProcessor()
-                                {
-
-                                    public void processBytes(final ByteBuffer 
buf)
-                                    {
-                                        _lastWrite = buf.remaining();
-                                        try
-                                        {
-                                            outputStream.write(buf.array(),
-                                                               
buf.arrayOffset() + buf.position(),
-                                                               buf.limit() - 
buf.position());
-                                        }
-                                        catch (IOException e)
-                                        {
-                                            e.printStackTrace();  //To change 
body of catch statement use File | Settings | File Templates.
-                                        }
-                                    }
-                                });
-                                if(_lastWrite == 0 && 
transport.isOpenForOutput())
-                                {
-                                    try
-                                    {
-                                        lock.wait(1000);
-                                    }
-                                    catch (InterruptedException e)
-                                    {
-                                        e.printStackTrace();  //To change body 
of catch statement use File | Settings | File Templates.
-                                    }
-                                }
-                            }
-                        }
-                    }
-                    finally
-                    {
-                        if(_conn.closedForInput() && _conn.closedForOutput())
-                        {
-                            try
-                            {
-                                s.close();
-                            }
-                            catch (IOException e)
-                            {
-                                e.printStackTrace();  //To change body of 
catch statement use File | Settings | File Templates.
-                            }
-                        }
-                    }
-                }
-            });
-*/
-
             _conn.open();
 
         }
@@ -394,7 +309,7 @@ public class Connection
         }
         catch (IOException e)
         {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+            e.printStackTrace();
         }
 
     }
@@ -419,7 +334,7 @@ public class Connection
         {
             int read;
             boolean done = false;
-            while(!done && (read = inputStream.read(buf)) != -1)
+            while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
             {
                 ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
                 Binary b = new Binary(buf,0,read);
@@ -428,12 +343,6 @@ public class Connection
                 {
                     RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : 
" + b.toString());
                 }
-                /*System.err.println(b);
-                System.err.println("XXX: " + bbuf.hasRemaining() + "; " + 
handler.isDone());
-                if(handler.isDone())
-                {
-                    System.err.println(handler.getClass().getName() + "IS 
DONE!");
-                } */
                 while(bbuf.hasRemaining() && !handler.isDone())
                 {
                     handler.parse(bbuf);
@@ -444,7 +353,7 @@ public class Connection
         }
         catch (IOException e)
         {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
+            e.printStackTrace();
         }
     }
 

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1400287&r1=1400286&r2=1400287&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 Fri Oct 19 21:02:03 2012
@@ -241,7 +241,7 @@ public class Receiver implements Deliver
                     }
                     if(hasMore)
                     {
-                        xfr = receiveFromPrefetch(0L);
+                        xfr = receiveFromPrefetch(-1l);
                         if(xfr== null)
                         {
                             // TODO - this is wrong!!!!
@@ -558,4 +558,4 @@ public class Receiver implements Deliver
         void messageArrived(Receiver receiver);
     }
 
-}
\ No newline at end of file
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to