Author: nmittler
Date: Sun Apr 23 15:08:21 2006
New Revision: 396329

URL: http://svn.apache.org/viewcvs?rev=396329&view=rev
Log:
Fixed handling of bytes message for Jira issue AMQ-685

Modified:
    
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
    
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
    
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
    
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp

Modified: 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
 (original)
+++ 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
 Sun Apr 23 15:08:21 2006
@@ -86,41 +86,39 @@
 
////////////////////////////////////////////////////////////////////////////////
 int BufferedInputStream::read( char* buffer, 
        const int bufferSize ) throw (ActiveMQException){
-               
-       int totalRead = 0;
-       
-       // Get the number of bytes that can be copied directly from
-       // the buffer.
-       int bytesToCopy = min( tail-head, bufferSize );
-       
-       // Copy the data to the output buffer.  
-       memcpy( buffer, this->buffer+head, bytesToCopy );
-       
-       // Increment the total bytes read.
-       totalRead += bytesToCopy;
        
-       // Increment the head position.  If the buffer is now empty,
-       // reset the positions.
-       head += bytesToCopy;
-       if( head == tail ){
-               head = tail = 0;
-       }
-       
-       // If we still haven't filled the output buffer, read a buffer's
+       // If we still haven't filled the output buffer AND there is data
+       // on the input stream to be read, read a buffer's
        // worth from the stream.
-       if( bytesToCopy < bufferSize ){
-               
-               // Buffer as much data as we can.
-               bufferData();
+       int totalRead = 0;
+       while( totalRead < bufferSize ){                
                
                // Get the remaining bytes to copy.
-               bytesToCopy = min( tail-head, (bufferSize-bytesToCopy) );
+               int bytesToCopy = min( tail-head, (bufferSize-totalRead) );
                
                // Copy the data to the output buffer.  
                memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
                
                // Increment the total bytes read.
                totalRead += bytesToCopy;
+               
+               // Increment the head position.  If the buffer is now empty,
+               // reset the positions and buffer more data.
+               head += bytesToCopy;
+               if( head == tail ){
+                       
+                       // Reset the buffer indicies.
+                       head = tail = 0;
+                       
+                       // If there is no more data currently available on the 
+                       // input stream, stop the loop.
+                       if( stream->available() == 0 ){
+                               break;
+                       }
+                       
+                       // Buffer as much data as we can.
+                       bufferData();
+               }                               
        }
        
        // Return the total number of bytes read.

Modified: 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
 (original)
+++ 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
 Sun Apr 23 15:08:21 2006
@@ -66,21 +66,29 @@
 }
 
 
////////////////////////////////////////////////////////////////////////////////
-void BufferedOutputStream::flush() throw (ActiveMQException){
+void BufferedOutputStream::emptyBuffer() throw (ActiveMQException){
        
        if( head != tail ){
                stream->write( buffer+head, tail-head );
        }
        head = tail = 0;
+}
+               
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::flush() throw (ActiveMQException){
        
+       // Empty the contents of the buffer to the output stream.
+       emptyBuffer();
+       
+       // Flush the output stream.
        stream->flush();
 }
 
 
////////////////////////////////////////////////////////////////////////////////
 void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
        
-       if( tail == bufferSize-1 ){
-               flush();
+       if( tail >= bufferSize ){
+               emptyBuffer();
        }
        
        buffer[tail++] = c;     
@@ -89,13 +97,14 @@
 
////////////////////////////////////////////////////////////////////////////////
               
 void BufferedOutputStream::write( const char* buffer, const int len ) 
        throw (ActiveMQException)
-{
-       
-       int pos = 0;
-       
+{              
        // Iterate until all the data is written.
-       while( pos < len ){
+       for( int pos=0; pos < len; ){
                
+               if( tail >= bufferSize ){
+                       emptyBuffer();
+               }
+       
                // Get the number of bytes left to write.
                int bytesToWrite = min( bufferSize-tail, len-pos );
                
@@ -106,12 +115,7 @@
                tail += bytesToWrite;
                
                // Decrease the number of bytes to write.
-               pos += bytesToWrite;
-               
-               // If we don't have enough space in the buffer, flush it.
-               if( bytesToWrite < len || tail >= bufferSize ){
-                       flush();
-               }               
+               pos += bytesToWrite;    
        }       
 }
 

Modified: 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h 
(original)
+++ 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h 
Sun Apr 23 15:08:21 2006
@@ -42,6 +42,11 @@
        
                void init( OutputStream* stream, const int bufSize );
                
+               /**
+                * Writes the contents of the buffer to the output stream.
+                */
+       void emptyBuffer() throw (ActiveMQException);
+        
        private:
        
                OutputStream* stream;

Modified: 
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp 
(original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp 
Sun Apr 23 15:08:21 2006
@@ -74,30 +74,84 @@
 
 
////////////////////////////////////////////////////////////////////////////////
 int SocketStream::read( char* buffer, const int bufferSize ) throw 
(ActiveMQException){
-       
-       int len = recv( socket->getHandle(), buffer, bufferSize, 0 );
-       if( len < 0 ){
-        socket->close();
-               char buf[500];
-               strerror_r( errno, buf, 500 );
-               throw IOException( 
string("stomp::io::SocketStream::read(char*,int) - ") + buf );
-       }
-       
-    /*printf("SocketStream:read():");
-    for( int ix=0; ix<len; ++ix ){
-        if( buffer[ix] > 20 )
-            printf("%c", buffer[ix] );
-        else
-            printf("[%d]", buffer[ix] );
-    }
-    printf("\n");*/
     
-       return len;
+    int bytesAvailable = available();
+    
+    while( true ){
+        
+        int len = ::recv(socket->getHandle(), (char*)buffer, bufferSize, 0);
+        
+        // Check for typical error conditions.
+        if( len < 0 ){
+                        
+            #if defined(unix) && !defined(__CYGWIN__)
+            
+                // If the socket was temporarily unavailable - just try again.
+                if( errno == EAGAIN ){
+                    continue;
+                }
+                
+                // Create the error string.
+                char* errorString = ::strerror(errno);
+                
+            #else
+            
+                // If the socket was temporarily unavailable - just try again.
+                int errorCode = ::WSAGetLastError();
+                if( errorCode == WSAEWOULDBLOCK ){
+                    continue;
+                }
+                
+                // Create the error string.
+                static const int errorStringSize = 512;
+                char errorString[errorStringSize];
+                memset( errorString, 0, errorStringSize );
+                FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+                  0,
+                  errorCode,
+                  0,
+                  errorString,
+                  errorStringSize - 1,
+                  NULL);
+                  
+            #endif
+            
+            // Otherwise, this was a bad error - throw an exception.
+            throw IOException( string("stomp::io::SocketStream::write(char) - 
") + errorString );
+        }
+        
+        // No error, but no data - check for a broken socket.
+        if( len == 0 ){
+            
+            // If the poll showed data, but we failed to read any,
+            // the socket is broken.
+            if( bytesAvailable > 0 ){
+                throw IOException( "activemq::io::SocketInputStream::read - 
The connection is broken" );
+            }
+            
+            // Socket is not broken, just had no data.
+            return 0;
+        }
+        
+        #ifdef SOCKET_IO_DEBUG
+            printf("SocketStream:read(), numbytes:%d -", len);
+            for( int ix=0; ix<len; ++ix ){
+                if( buffer[ix] > 20 )
+                    printf("%c", buffer[ix] );
+                else
+                    printf("[%d]", buffer[ix] );
+            }
+            printf("\n");
+        #endif
+    
+        // Data was read successfully - return the bytes read.
+        return len;
+    }
 }
 
 
////////////////////////////////////////////////////////////////////////////////
 void SocketStream::write( const char c ) throw (ActiveMQException){
-       
+           
        /*if( c > 20 ){
                printf("%c", c );
        }
@@ -109,29 +163,41 @@
                char buf[500];
                strerror_r( errno, buf, 500 );
                throw IOException( string("stomp::io::SocketStream::write(char) 
- ") + buf );
-       }
+       }    
 }
 
 
////////////////////////////////////////////////////////////////////////////////
 void SocketStream::write( const char* buffer, const int len ) 
        throw (ActiveMQException)
 {
-       /*for( int ix=0; ix<len; ++ix ){
-               char c = buffer[ix];
-               if( c > 20 ){
-                       printf("%c", c );
-               }
-               else printf("[%d]", c );
-       }*/
+    #ifdef SOCKET_IO_DEBUG
+        printf("SocketStream:write(), numbytes:%d -", len);
+       for( int ix=0; ix<len; ++ix ){
+               char c = buffer[ix];
+               if( c > 20 ){
+                       printf("%c", c );
+               }
+               else printf("[%d]", c );
+       }
+        printf("\n" );
+    #endif
        
        int remaining = len;
        while( remaining > 0 ) {
        
-       int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL 
);              
+        int flags = 0;
+        #if defined(OSX)
+            flags = SO_NOSIGPIPE;
+        #elif defined( unix )
+            flags = MSG_NOSIGNAL;
+        #endif
+        
+       int length = send( socket->getHandle(), buffer, remaining, flags );     
        
        if( length < 0 ){
             socket->close();
                char buf[500];
                        strerror_r( errno, buf, 500 );
+            printf("exception in write\n" );
                        throw IOException( 
string("stomp::io::SocketStream::write(char*,int) - ") + buf );
        }
        

Modified: 
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- 
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
 (original)
+++ 
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
 Sun Apr 23 15:08:21 2006
@@ -63,6 +63,22 @@
 
////////////////////////////////////////////////////////////////////////////////
 int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw 
(ActiveMQException){
        
+    int content_length = 0;
+   
+   // Check for the content-length header.  This is optional - if not provided
+   // we stop when we encounter a \0\n.
+   const StompFrame::HeaderInfo* headerInfo = 
frame.getHeaderInfo(StompFrame::HEADER_CONTENTLENGTH);
+   if( headerInfo != NULL )
+   {
+      const char* lengthProperty = headerInfo->value;
+      char* stopped_string = NULL;
+
+      content_length = strtoul(
+         lengthProperty, 
+         &stopped_string, 
+         10);
+   }
+   
        int pos = 0;
        
        while( pos < bufLen ){
@@ -72,12 +88,12 @@
                
                // Increment the position pointer.
                pos++;
-               
-               // If we've reached the end of the body - return.
-               if( (buf[pos-1]=='\0' && pos==1) ||
-                       (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){ 
                                                        
-                       return pos;
-               }
+        
+        // Are we at the end of the frame?  The end frame pattern is \0\n
+        bool foundFrameEndPattern = (pos >= 2 && buf[pos-2]=='\0' && 
buf[pos-1] == '\n'); 
+        if( (pos > content_length) && foundFrameEndPattern ){                  
           
+            return pos;
+        }
        }
        
        // Reading is not complete.
@@ -220,6 +236,7 @@
        write( body, frame.getBodyLength() );
        }
        write( '\0' );
+       write( '\n' );
        
        // Flush the stream.
        flush();


Reply via email to