Author: nmittler
Date: Sun Apr 23 15:08:21 2006
New Revision: 396329
URL: http://svn.apache.org/viewcvs?rev=396329&view=rev
Log:
Fixed handling of bytes message for Jira issue AMQ-685
Modified:
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
Modified:
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
---
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
(original)
+++
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
Sun Apr 23 15:08:21 2006
@@ -86,41 +86,39 @@
////////////////////////////////////////////////////////////////////////////////
int BufferedInputStream::read( char* buffer,
const int bufferSize ) throw (ActiveMQException){
-
- int totalRead = 0;
-
- // Get the number of bytes that can be copied directly from
- // the buffer.
- int bytesToCopy = min( tail-head, bufferSize );
-
- // Copy the data to the output buffer.
- memcpy( buffer, this->buffer+head, bytesToCopy );
-
- // Increment the total bytes read.
- totalRead += bytesToCopy;
- // Increment the head position. If the buffer is now empty,
- // reset the positions.
- head += bytesToCopy;
- if( head == tail ){
- head = tail = 0;
- }
-
- // If we still haven't filled the output buffer, read a buffer's
+ // If we still haven't filled the output buffer AND there is data
+ // on the input stream to be read, read a buffer's
// worth from the stream.
- if( bytesToCopy < bufferSize ){
-
- // Buffer as much data as we can.
- bufferData();
+ int totalRead = 0;
+ while( totalRead < bufferSize ){
// Get the remaining bytes to copy.
- bytesToCopy = min( tail-head, (bufferSize-bytesToCopy) );
+ int bytesToCopy = min( tail-head, (bufferSize-totalRead) );
// Copy the data to the output buffer.
memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
// Increment the total bytes read.
totalRead += bytesToCopy;
+
+ // Increment the head position. If the buffer is now empty,
+ // reset the positions and buffer more data.
+ head += bytesToCopy;
+ if( head == tail ){
+
+ // Reset the buffer indicies.
+ head = tail = 0;
+
+ // If there is no more data currently available on the
+ // input stream, stop the loop.
+ if( stream->available() == 0 ){
+ break;
+ }
+
+ // Buffer as much data as we can.
+ bufferData();
+ }
}
// Return the total number of bytes read.
Modified:
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
---
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
(original)
+++
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp
Sun Apr 23 15:08:21 2006
@@ -66,21 +66,29 @@
}
////////////////////////////////////////////////////////////////////////////////
-void BufferedOutputStream::flush() throw (ActiveMQException){
+void BufferedOutputStream::emptyBuffer() throw (ActiveMQException){
if( head != tail ){
stream->write( buffer+head, tail-head );
}
head = tail = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::flush() throw (ActiveMQException){
+ // Empty the contents of the buffer to the output stream.
+ emptyBuffer();
+
+ // Flush the output stream.
stream->flush();
}
////////////////////////////////////////////////////////////////////////////////
void BufferedOutputStream::write( const char c ) throw (ActiveMQException){
- if( tail == bufferSize-1 ){
- flush();
+ if( tail >= bufferSize ){
+ emptyBuffer();
}
buffer[tail++] = c;
@@ -89,13 +97,14 @@
////////////////////////////////////////////////////////////////////////////////
void BufferedOutputStream::write( const char* buffer, const int len )
throw (ActiveMQException)
-{
-
- int pos = 0;
-
+{
// Iterate until all the data is written.
- while( pos < len ){
+ for( int pos=0; pos < len; ){
+ if( tail >= bufferSize ){
+ emptyBuffer();
+ }
+
// Get the number of bytes left to write.
int bytesToWrite = min( bufferSize-tail, len-pos );
@@ -106,12 +115,7 @@
tail += bytesToWrite;
// Decrease the number of bytes to write.
- pos += bytesToWrite;
-
- // If we don't have enough space in the buffer, flush it.
- if( bytesToWrite < len || tail >= bufferSize ){
- flush();
- }
+ pos += bytesToWrite;
}
}
Modified:
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
---
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
(original)
+++
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h
Sun Apr 23 15:08:21 2006
@@ -42,6 +42,11 @@
void init( OutputStream* stream, const int bufSize );
+ /**
+ * Writes the contents of the buffer to the output stream.
+ */
+ void emptyBuffer() throw (ActiveMQException);
+
private:
OutputStream* stream;
Modified:
incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
(original)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp
Sun Apr 23 15:08:21 2006
@@ -74,30 +74,84 @@
////////////////////////////////////////////////////////////////////////////////
int SocketStream::read( char* buffer, const int bufferSize ) throw
(ActiveMQException){
-
- int len = recv( socket->getHandle(), buffer, bufferSize, 0 );
- if( len < 0 ){
- socket->close();
- char buf[500];
- strerror_r( errno, buf, 500 );
- throw IOException(
string("stomp::io::SocketStream::read(char*,int) - ") + buf );
- }
-
- /*printf("SocketStream:read():");
- for( int ix=0; ix<len; ++ix ){
- if( buffer[ix] > 20 )
- printf("%c", buffer[ix] );
- else
- printf("[%d]", buffer[ix] );
- }
- printf("\n");*/
- return len;
+ int bytesAvailable = available();
+
+ while( true ){
+
+ int len = ::recv(socket->getHandle(), (char*)buffer, bufferSize, 0);
+
+ // Check for typical error conditions.
+ if( len < 0 ){
+
+ #if defined(unix) && !defined(__CYGWIN__)
+
+ // If the socket was temporarily unavailable - just try again.
+ if( errno == EAGAIN ){
+ continue;
+ }
+
+ // Create the error string.
+ char* errorString = ::strerror(errno);
+
+ #else
+
+ // If the socket was temporarily unavailable - just try again.
+ int errorCode = ::WSAGetLastError();
+ if( errorCode == WSAEWOULDBLOCK ){
+ continue;
+ }
+
+ // Create the error string.
+ static const int errorStringSize = 512;
+ char errorString[errorStringSize];
+ memset( errorString, 0, errorStringSize );
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+ 0,
+ errorCode,
+ 0,
+ errorString,
+ errorStringSize - 1,
+ NULL);
+
+ #endif
+
+ // Otherwise, this was a bad error - throw an exception.
+ throw IOException( string("stomp::io::SocketStream::write(char) -
") + errorString );
+ }
+
+ // No error, but no data - check for a broken socket.
+ if( len == 0 ){
+
+ // If the poll showed data, but we failed to read any,
+ // the socket is broken.
+ if( bytesAvailable > 0 ){
+ throw IOException( "activemq::io::SocketInputStream::read -
The connection is broken" );
+ }
+
+ // Socket is not broken, just had no data.
+ return 0;
+ }
+
+ #ifdef SOCKET_IO_DEBUG
+ printf("SocketStream:read(), numbytes:%d -", len);
+ for( int ix=0; ix<len; ++ix ){
+ if( buffer[ix] > 20 )
+ printf("%c", buffer[ix] );
+ else
+ printf("[%d]", buffer[ix] );
+ }
+ printf("\n");
+ #endif
+
+ // Data was read successfully - return the bytes read.
+ return len;
+ }
}
////////////////////////////////////////////////////////////////////////////////
void SocketStream::write( const char c ) throw (ActiveMQException){
-
+
/*if( c > 20 ){
printf("%c", c );
}
@@ -109,29 +163,41 @@
char buf[500];
strerror_r( errno, buf, 500 );
throw IOException( string("stomp::io::SocketStream::write(char)
- ") + buf );
- }
+ }
}
////////////////////////////////////////////////////////////////////////////////
void SocketStream::write( const char* buffer, const int len )
throw (ActiveMQException)
{
- /*for( int ix=0; ix<len; ++ix ){
- char c = buffer[ix];
- if( c > 20 ){
- printf("%c", c );
- }
- else printf("[%d]", c );
- }*/
+ #ifdef SOCKET_IO_DEBUG
+ printf("SocketStream:write(), numbytes:%d -", len);
+ for( int ix=0; ix<len; ++ix ){
+ char c = buffer[ix];
+ if( c > 20 ){
+ printf("%c", c );
+ }
+ else printf("[%d]", c );
+ }
+ printf("\n" );
+ #endif
int remaining = len;
while( remaining > 0 ) {
- int length = send( socket->getHandle(), buffer, remaining, MSG_NOSIGNAL
);
+ int flags = 0;
+ #if defined(OSX)
+ flags = SO_NOSIGPIPE;
+ #elif defined( unix )
+ flags = MSG_NOSIGNAL;
+ #endif
+
+ int length = send( socket->getHandle(), buffer, remaining, flags );
if( length < 0 ){
socket->close();
char buf[500];
strerror_r( errno, buf, 500 );
+ printf("exception in write\n" );
throw IOException(
string("stomp::io::SocketStream::write(char*,int) - ") + buf );
}
Modified:
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp?rev=396329&r1=396328&r2=396329&view=diff
==============================================================================
---
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
(original)
+++
incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
Sun Apr 23 15:08:21 2006
@@ -63,6 +63,22 @@
////////////////////////////////////////////////////////////////////////////////
int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw
(ActiveMQException){
+ int content_length = 0;
+
+ // Check for the content-length header. This is optional - if not provided
+ // we stop when we encounter a \0\n.
+ const StompFrame::HeaderInfo* headerInfo =
frame.getHeaderInfo(StompFrame::HEADER_CONTENTLENGTH);
+ if( headerInfo != NULL )
+ {
+ const char* lengthProperty = headerInfo->value;
+ char* stopped_string = NULL;
+
+ content_length = strtoul(
+ lengthProperty,
+ &stopped_string,
+ 10);
+ }
+
int pos = 0;
while( pos < bufLen ){
@@ -72,12 +88,12 @@
// Increment the position pointer.
pos++;
-
- // If we've reached the end of the body - return.
- if( (buf[pos-1]=='\0' && pos==1) ||
- (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){
- return pos;
- }
+
+ // Are we at the end of the frame? The end frame pattern is \0\n
+ bool foundFrameEndPattern = (pos >= 2 && buf[pos-2]=='\0' &&
buf[pos-1] == '\n');
+ if( (pos > content_length) && foundFrameEndPattern ){
+ return pos;
+ }
}
// Reading is not complete.
@@ -220,6 +236,7 @@
write( body, frame.getBodyLength() );
}
write( '\0' );
+ write( '\n' );
// Flush the stream.
flush();