Date: Monday, January 29, 2007 @ 17:43:57
Author: gilles
Path: /cvsroot/carob/carob
Added: include/BufferedSocket.hpp (1.1) src/BufferedSocket.cpp (1.1)
(new class)
Implemented socket with buffered read and write operations
----------------------------+
include/BufferedSocket.hpp | 152 +++++++++++++++++++++++++++++++++++++++++++
src/BufferedSocket.cpp | 141 +++++++++++++++++++++++++++++++++++++++
2 files changed, 293 insertions(+)
Index: carob/include/BufferedSocket.hpp
diff -u /dev/null carob/include/BufferedSocket.hpp:1.1
--- /dev/null Mon Jan 29 17:43:57 2007
+++ carob/include/BufferedSocket.hpp Mon Jan 29 17:43:57 2007
@@ -0,0 +1,152 @@
+/*
+ * Sequoia: Database clustering technology.
+ * Copyright (C) 2007 Continuent, Inc.
+ * Contact: [EMAIL PROTECTED]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Initial developer(s): Gilles Rayrat
+ * Contributor(s):
+ */
+
+#ifndef _BUFFERED_SOCKET_H_
+#define _BUFFERED_SOCKET_H_
+
+#include "JavaSocket.hpp"
+
+namespace CarobNS {
+
+/** Size of the input buffer */
+#define READ_BUFFER_SIZE 2048
+#define WRITE_BUFFER_SIZE 512
+
+/**
+ * Holds a buffer for read operations
+ */
+class ReadCache
+{
+public:
+ /** Creates and empty cache */
+ ReadCache() : data_boundary(0), data_index(0) {} ;
+ /** Returns the size of valid data left to be read*/
+ int getDataSize() { return data_boundary - data_index; }
+ /** Returns a pointer to the first byte of data available for reading */
+ java_byte* getData() { return &(buffer[data_index]); }
+ /** Tells that the given length of data has been read */
+ void discard(int length) { data_index += length; }
+ /** Set the new valid data size */
+ void setDataSize(int newSize) { data_boundary = newSize; }
+ /** Discards any data */
+ void reset() { data_boundary = 0; data_index = 0; }
+private:
+ /** Buffer in which data read from the network is stored */
+ java_byte buffer[READ_BUFFER_SIZE];
+ /** Last available byte of read data + 1 */
+ int data_boundary;
+ /** Index of the next byte to be read */
+ int data_index;
+};
+
+class WriteCache
+{
+public:
+ /** Creates and empty cache */
+ WriteCache() : write_index(0) {};
+ /** Returns the size of data written until now */
+ int getDataSize() { return write_index; }
+ /** Returns a pointer to the next byte to write to */
+ java_byte* getWritePtr() { return &(buffer[write_index]); }
+ /** Returns a pointer to the first available byte */
+ java_byte* getData() { return buffer; }
+ /** Tells that the given length of data has been written */
+ void forward(int len) { write_index += len; }
+ /** Discards all data */
+ void reset() { write_index = 0; }
+ /** Returns the size left in the buffer */
+ int getRemainingSize() { return WRITE_BUFFER_SIZE -
write_index; }
+//private:
+ /** Buffer in which data read from the network is stored */
+ java_byte buffer[WRITE_BUFFER_SIZE];
+ /** Index of the next byte to be read */
+ int write_index;
+};
+
+
+/**
+ * Extends the <code>JavaSocket</code> class to allow buffered read operations
+ * @see JavaSocket
+ */
+class BufferedSocket : public JavaSocket
+{
+public:
+ /**
+ * Empty constructor
+ */
+ BufferedSocket() throw (CodecException) : JavaSocket() {} ;
+ /**
+ * Sends the buffered write data to the socket
+ */
+ void flush() const throw (SocketIOException,
UnexpectedException);
+protected:
+ /**
+ * Substitute for recv. Waits for incomming data by calling pollOnSingleFd
+ * and loops until full length has been received or an erro occured. If
+ * shutdown() is called during the loop, throws a SocketIOException to inform
+ * callers that the socket is not longer readable
+ * @param buf data to send
+ * @param len full buffer length
+ * @param flags send options, see recv man page
+ * @return the total number of bytes send, -1 in case of failure
+ * @throw SocketIOException if interrupted by shutdown() function
+ */
+ int32_t recvFully(void *buf, const int len, const int flags)
+ const throw (SocketIOException, UnexpectedException);
+ /**
+ * Wrapper over send(...) function to handle errors and throw exceptions
+ * @param fctName name of the calling function (for logging purposes)
+ * @param objName name of the object to be send (for logging purposes)
+ * @param buf data to be send
+ * @param len length of buf
+ * @param flags send option, see recv man page
+ * @throws SocketIOException
+ */
+ void sendToSocket(const wchar_t fctName[],
+ const std::wstring& objName, const void* buf, int
len,
+ int flags) const throw (SocketIOException,
+ UnexpectedException);
+private:
+ /**
+ * Buffer in which data read from the network is stored.<br>
+ * As read operations don't really modify the socket, this object doesn't
+ * affects constness
+ */
+ mutable ReadCache read_cache;
+ /**
+ * Buffer for data to be written<br>
+ */
+ mutable WriteCache write_cache;
+ /**
+ * Buffered socket receive function.<br>
+ * Reads available data in <code>read_buffer</code> and returns the number of
+ * bytes read (ie. available)
+ *
+ * @param recvFlags flags to pass to recv function
+ * @throw SocketIOException if an error occurs during the receive operation
+ * @return the number of bytes received
+ */
+ int fillReadBuffer(int recvFlags) const throw
(SocketIOException,
+ UnexpectedException);
+};
+
+} //namespace CarobNS
+#endif /*_BUFFERED_SOCKET_H_*/
Index: carob/src/BufferedSocket.cpp
diff -u /dev/null carob/src/BufferedSocket.cpp:1.1
--- /dev/null Mon Jan 29 17:43:57 2007
+++ carob/src/BufferedSocket.cpp Mon Jan 29 17:43:57 2007
@@ -0,0 +1,141 @@
+/*
+ * Sequoia: Database clustering technology.
+ * Copyright (C) 2007 Emic Networks
+ * Contact: [EMAIL PROTECTED]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Initial developer(s): Gilles Rayrat
+ * Contributor(s):
+ */
+
+#include "BufferedSocket.hpp"
+#include "SystemDependantDefs.hpp"
+
+#ifdef __MINGW32__
+ #include <ws2tcpip.h>
+#else
+ #include <sys/types.h> // socket(), send(), etc.
+ #include <sys/socket.h> // "
+#endif
+
+using namespace CarobNS;
+
+/** Returns the number of octets read or the (negative) error from recv() */
+int BufferedSocket::recvFully(void *buf, const int len, const int flags)
+ const throw (SocketIOException, UnexpectedException)
+{
+ int alreadyRead = 0; // Keeps the whole size read
+ int remainingBytesToRead = len;
+ while (!canceled && (remainingBytesToRead > 0))
+ {
+ int availableBytes = read_cache.getDataSize();
+ if (availableBytes < 1)
+ {
+ // First send data that has been buffered
+ flush();
+ availableBytes = fillReadBuffer(flags);
+ }
+ int toBeReadThisTime = remainingBytesToRead;
+ if (toBeReadThisTime > availableBytes)
+ {
+ // not enough data ready, just read those available. Next bytes will be
+ // read in the next loop round
+ toBeReadThisTime = availableBytes;
+ }
+ memcpy(static_cast<java_byte*>(static_cast<java_byte*>(buf)+alreadyRead),
+ read_cache.getData(),
+ toBeReadThisTime);
+ read_cache.discard(toBeReadThisTime);
+ alreadyRead += toBeReadThisTime;
+ remainingBytesToRead -= toBeReadThisTime;
+ }
+ // We were shutdown()
+ if (!connected)
+ throw SocketIOException(L"JavaSocket has been shutdown, read aborted");
+ return alreadyRead;
+}
+
+int BufferedSocket::fillReadBuffer(int recvFlags) const throw
(SocketIOException, UnexpectedException)
+{
+ // Wait for incoming data
+ int pollRetVal = pollOnSingleFd(socket_fd, 1000); // 1000 = 1 second timeout
+ int readThisTime = 0;
+ if (pollRetVal > 0) // data is ready to be read
+ // if (pollRetVal == 0) => timeout, no response, do nothing
+ {
+ read_cache.reset();
+ int readThisTime = recv(socket_fd, read_cache.getData(), READ_BUFFER_SIZE,
recvFlags);
+ read_cache.setDataSize(readThisTime);
+ if (readThisTime < 0)
+ throw SocketIOException(L"Could not read from socket.");
+ if (readThisTime == 0)
+ throw SocketIOException(L"Peer reset connection while we were receiving
data");
+ }
+ return readThisTime;
+}
+
+void BufferedSocket::sendToSocket(const wchar_t fctName[], const std::wstring&
objName,
+ const void* buf, int len, int flags) const
+ throw (SocketIOException, UnexpectedException)
+{
+ int lenToCopy = len;
+ if (write_cache.getRemainingSize() < lenToCopy)
+ {
+ // not enough room: copy only # of bytes remaining in buffer
+ lenToCopy = write_cache.getRemainingSize();
+ }
+ memcpy(write_cache.getWritePtr(), buf, lenToCopy);
+ write_cache.forward(lenToCopy);
+ if (len > lenToCopy)
+ {
+ // did not copy everything => have to flush and copy the rest
+ flush();
+ sendToSocket(fctName, objName, static_cast<const
java_byte*>(buf)+lenToCopy,
+ len-lenToCopy, flags);
+ }
+}
+
+void BufferedSocket::flush() const throw (SocketIOException,
UnexpectedException)
+{
+ const wchar_t fctName[] = L"BufferedSocket::flush";
+
+ if (write_cache.getDataSize() == 0)
+ {
+ return;
+ }
+ int ret = send(socket_fd,
+ static_cast<const void*>(write_cache.getData()),
+ write_cache.getDataSize(),
+ SOCKET_SEND_FLAGS);
+ if ( ret == -1 )
+ {
+ throw SocketIOException(std::wstring(fctName) + L": could not write to
socket");
+ }
+ if (ret != write_cache.getDataSize())
+ {
+ // should never happen
+ throw SocketIOException(std::wstring(fctName) + L": only " +
toUserString(ret)
+ + L"/" + toWString(write_cache.getDataSize())
+ + L" bytes could be send to socket");
+ }
+ write_cache.reset();
+}
+
+/*
+ * Local Variables:
+ * c-file-style: "bsd"
+ * c-basic-offset: 2
+ * indent-tabs-mode: nil
+ * End:
+ */
_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits