Date: Monday, January 29, 2007 @ 17:43:57
  Author: gilles
    Path: /cvsroot/carob/carob

   Added: include/BufferedSocket.hpp (1.1) src/BufferedSocket.cpp (1.1)

(new class)
Implemented socket with buffered read and write operations


----------------------------+
 include/BufferedSocket.hpp |  152 +++++++++++++++++++++++++++++++++++++++++++
 src/BufferedSocket.cpp     |  141 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 293 insertions(+)


Index: carob/include/BufferedSocket.hpp
diff -u /dev/null carob/include/BufferedSocket.hpp:1.1
--- /dev/null   Mon Jan 29 17:43:57 2007
+++ carob/include/BufferedSocket.hpp    Mon Jan 29 17:43:57 2007
@@ -0,0 +1,152 @@
+/*
+ * Sequoia: Database clustering technology.
+ * Copyright (C) 2007 Continuent, Inc.
+ * Contact: [EMAIL PROTECTED]
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Initial developer(s): Gilles Rayrat
+ * Contributor(s):
+ */
+
+#ifndef _BUFFERED_SOCKET_H_
+#define _BUFFERED_SOCKET_H_
+
+#include "JavaSocket.hpp"
+
+namespace CarobNS {
+
+/** Size of the input buffer */
+#define READ_BUFFER_SIZE    2048
+#define WRITE_BUFFER_SIZE    512
+
+/**
+ * Holds a buffer for read operations
+ */
+class ReadCache
+{
+public:
+  /** Creates and empty cache */
+  ReadCache() : data_boundary(0), data_index(0) {} ;
+  /** Returns the size of valid data left to be read*/
+  int               getDataSize() { return data_boundary - data_index; }
+  /** Returns a pointer to the first byte of data available for reading */
+  java_byte*        getData() { return &(buffer[data_index]); }
+  /** Tells that the given length of data has been read */
+  void              discard(int length) { data_index += length; }
+  /** Set the new valid data size */
+  void              setDataSize(int newSize) { data_boundary = newSize; }
+  /** Discards any data */
+  void              reset() { data_boundary = 0; data_index = 0; }
+private:
+  /** Buffer in which data read from the network is stored */
+  java_byte         buffer[READ_BUFFER_SIZE];
+  /** Last available byte of read data + 1 */
+  int               data_boundary;
+  /** Index of the next byte to be read */
+  int               data_index;
+};
+
+class WriteCache
+{
+public:
+  /** Creates and empty cache */
+  WriteCache() : write_index(0) {};
+  /** Returns the size of data written until now */
+  int               getDataSize() { return write_index; }
+  /** Returns a pointer to the next byte to write to */
+  java_byte*        getWritePtr() { return &(buffer[write_index]); }
+  /** Returns a pointer to the first available byte */
+  java_byte*        getData() { return buffer; }
+  /** Tells that the given length of data has been written */
+  void              forward(int len) { write_index += len; }
+  /** Discards all data */
+  void              reset() { write_index = 0; }
+  /** Returns the size left in the buffer */
+  int               getRemainingSize() { return WRITE_BUFFER_SIZE - 
write_index; }
+//private:
+  /** Buffer in which data read from the network is stored */
+  java_byte         buffer[WRITE_BUFFER_SIZE];
+  /** Index of the next byte to be read */
+  int               write_index;
+};
+
+
+/**
+ * Extends the <code>JavaSocket</code> class to allow buffered read operations
+ * @see JavaSocket
+ */
+class BufferedSocket : public JavaSocket
+{
+public:
+  /**
+   * Empty constructor
+   */
+  BufferedSocket() throw (CodecException) : JavaSocket() {} ;
+  /**
+   * Sends the buffered write data to the socket
+   */
+  void                flush()  const throw (SocketIOException, 
UnexpectedException);
+protected:
+  /**
+   * Substitute for recv. Waits for incomming data by calling pollOnSingleFd
+   * and loops until full length has been received or an erro occured. If
+   * shutdown() is called during the loop, throws a SocketIOException to inform
+   * callers that the socket is not longer readable
+   * @param buf data to send
+   * @param len full buffer length
+   * @param flags send options, see recv man page
+   * @return the total number of bytes send, -1 in case of failure
+   * @throw SocketIOException if interrupted by shutdown() function
+   */
+  int32_t             recvFully(void *buf, const int len, const int flags)
+                          const throw (SocketIOException, UnexpectedException);
+  /**
+   * Wrapper over send(...) function to handle errors and throw exceptions
+   * @param fctName name of the calling function (for logging purposes)
+   * @param objName name of the object to be send (for logging purposes)
+   * @param buf data to be send
+   * @param len length of buf
+   * @param flags send option, see recv man page
+   * @throws SocketIOException
+   */
+  void                sendToSocket(const wchar_t fctName[],
+                          const std::wstring& objName, const void* buf, int 
len,
+                          int flags) const throw (SocketIOException,
+                          UnexpectedException);
+private:
+  /**
+   * Buffer in which data read from the network is stored.<br>
+   * As read operations don't really modify the socket, this object doesn't
+   * affects constness
+   */
+  mutable ReadCache   read_cache;
+  /**
+   * Buffer for data to be written<br>
+   */
+  mutable WriteCache  write_cache;
+  /**
+   * Buffered socket receive function.<br>
+   * Reads available data in <code>read_buffer</code> and returns the number of
+   * bytes read (ie. available)
+   *
+   * @param recvFlags flags to pass to recv function
+   * @throw SocketIOException if an error occurs during the receive operation
+   * @return the number of bytes received
+   */
+  int                 fillReadBuffer(int recvFlags) const throw 
(SocketIOException,
+                          UnexpectedException);
+};
+
+} //namespace CarobNS
+#endif /*_BUFFERED_SOCKET_H_*/
Index: carob/src/BufferedSocket.cpp
diff -u /dev/null carob/src/BufferedSocket.cpp:1.1
--- /dev/null   Mon Jan 29 17:43:57 2007
+++ carob/src/BufferedSocket.cpp        Mon Jan 29 17:43:57 2007
@@ -0,0 +1,141 @@
+/*
+ * Sequoia: Database clustering technology.
+ * Copyright (C) 2007 Emic Networks
+ * Contact: [EMAIL PROTECTED]
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Initial developer(s): Gilles Rayrat
+ * Contributor(s):
+ */
+
+#include "BufferedSocket.hpp"
+#include "SystemDependantDefs.hpp"
+
+#ifdef __MINGW32__
+  #include <ws2tcpip.h>
+#else
+  #include <sys/types.h>   //  socket(), send(), etc.
+  #include <sys/socket.h>  //         "
+#endif
+
+using namespace CarobNS;
+
+/** Returns the number of octets read or the (negative) error from recv() */
+int BufferedSocket::recvFully(void *buf, const int len, const int flags)
+    const throw (SocketIOException, UnexpectedException)
+{
+  int alreadyRead = 0; // Keeps the whole size read
+  int remainingBytesToRead = len;
+  while (!canceled && (remainingBytesToRead > 0))
+  {
+    int availableBytes = read_cache.getDataSize();
+    if (availableBytes < 1)
+    {
+      // First send data that has been buffered
+      flush();
+      availableBytes = fillReadBuffer(flags);
+    }
+    int toBeReadThisTime = remainingBytesToRead;
+    if (toBeReadThisTime > availableBytes)
+    {
+      // not enough data ready, just read those available. Next bytes will be
+      // read in the next loop round
+      toBeReadThisTime = availableBytes;
+    }
+    memcpy(static_cast<java_byte*>(static_cast<java_byte*>(buf)+alreadyRead),
+           read_cache.getData(),
+           toBeReadThisTime);
+    read_cache.discard(toBeReadThisTime);
+    alreadyRead += toBeReadThisTime;
+    remainingBytesToRead -= toBeReadThisTime;
+  }
+  // We were shutdown()
+  if (!connected)
+    throw SocketIOException(L"JavaSocket has been shutdown, read aborted");
+  return alreadyRead;
+}
+
+int BufferedSocket::fillReadBuffer(int recvFlags) const throw 
(SocketIOException, UnexpectedException)
+{
+  // Wait for incoming data
+  int pollRetVal = pollOnSingleFd(socket_fd, 1000); // 1000 = 1 second timeout
+  int readThisTime = 0;
+  if (pollRetVal > 0) // data is ready to be read
+  // if (pollRetVal == 0) => timeout, no response, do nothing
+  {
+    read_cache.reset();
+    int readThisTime = recv(socket_fd, read_cache.getData(), READ_BUFFER_SIZE, 
recvFlags);
+    read_cache.setDataSize(readThisTime);
+    if (readThisTime < 0)
+      throw SocketIOException(L"Could not read from socket.");
+    if (readThisTime == 0)
+      throw SocketIOException(L"Peer reset connection while we were receiving 
data");
+  }
+  return readThisTime;
+}
+
+void BufferedSocket::sendToSocket(const wchar_t fctName[], const std::wstring& 
objName,
+    const void* buf, int len, int flags) const
+    throw (SocketIOException, UnexpectedException)
+{
+  int lenToCopy = len;
+  if (write_cache.getRemainingSize() < lenToCopy)
+  {
+    // not enough room: copy only # of bytes remaining in buffer
+    lenToCopy = write_cache.getRemainingSize();
+  }
+  memcpy(write_cache.getWritePtr(), buf, lenToCopy);
+  write_cache.forward(lenToCopy);
+  if (len > lenToCopy)
+  {
+    // did not copy everything => have to flush and copy the rest
+    flush();
+    sendToSocket(fctName, objName, static_cast<const 
java_byte*>(buf)+lenToCopy,
+        len-lenToCopy, flags);
+  }
+}
+
+void BufferedSocket::flush() const throw (SocketIOException, 
UnexpectedException)
+{
+  const wchar_t fctName[] = L"BufferedSocket::flush";
+  
+  if (write_cache.getDataSize() == 0)
+  {
+    return;
+  }
+  int ret = send(socket_fd,
+            static_cast<const void*>(write_cache.getData()),
+            write_cache.getDataSize(),
+            SOCKET_SEND_FLAGS);
+  if ( ret == -1 )
+  {
+    throw SocketIOException(std::wstring(fctName) + L": could not write to 
socket");
+  }
+  if (ret != write_cache.getDataSize())
+  {
+    // should never happen
+    throw SocketIOException(std::wstring(fctName) + L": only " + 
toUserString(ret)
+        + L"/" + toWString(write_cache.getDataSize())
+        + L" bytes could be send to socket");
+  }
+  write_cache.reset();
+}
+
+/*
+ * Local Variables:
+ * c-file-style: "bsd"
+ * c-basic-offset: 2
+ * indent-tabs-mode: nil
+ * End:
+ */

_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits

Reply via email to