Author: shuston
Date: Mon Dec 14 21:16:30 2009
New Revision: 890481

URL: http://svn.apache.org/viewvc?rev=890481&view=rev
Log:
Move the TCPConnector class to its own file to allow deriving from it; resolves 
QPID-2270.

Added:
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=890481&r1=890480&r2=890481&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Dec 14 21:16:30 2009
@@ -426,17 +426,25 @@
     qpid/sys/windows/Thread.cpp
     qpid/sys/windows/Time.cpp
     qpid/sys/windows/uuid.cpp
+    ${sslcommon_windows_SOURCES}
   )
   set (qpidcommon_platform_LIBS
-    rpcrt4 ws2_32
+    ${windows_ssl_libs} rpcrt4 ws2_32
   )
   set (qpidbroker_platform_SOURCES
     qpid/broker/windows/BrokerDefaults.cpp
     qpid/broker/windows/SaslAuthenticator.cpp
+    ${sslbroker_windows_SOURCES}
+  )
+  set (qpidbroker_platform_LIBS
+    ${windows_ssl_libs}
   )
-
   set (qpidclient_platform_SOURCES
     qpid/client/windows/SaslFactory.cpp
+    ${sslclient_windows_SOURCES}
+  )
+  set (qpidclient_platform_LIBS
+    ${windows_ssl_libs}
   )
 
   set (qpidd_platform_SOURCES
@@ -625,6 +633,7 @@
      qpid/client/SubscriptionImpl.cpp
      qpid/client/SubscriptionManager.cpp
      qpid/client/SubscriptionManagerImpl.cpp
+     qpid/client/TCPConnector.cpp
      qpid/messaging/Address.cpp
      qpid/messaging/Connection.cpp
      qpid/messaging/ConnectionImpl.h
@@ -665,7 +674,7 @@
 )
 
 add_library (qpidclient SHARED ${qpidclient_SOURCES})
-target_link_libraries (qpidclient qpidcommon)
+target_link_libraries (qpidclient qpidcommon ${qpidclient_platform_LIBS})
 set_target_properties (qpidclient PROPERTIES VERSION ${qpidc_version})
 install (TARGETS qpidclient
          DESTINATION ${QPID_INSTALL_LIBDIR}
@@ -751,7 +760,7 @@
      qpid/sys/TCPIOPlugin.cpp
 )
 add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
-target_link_libraries (qpidbroker qpidcommon)
+target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS})
 set_target_properties (qpidbroker PROPERTIES VERSION ${qpidc_version})
 if (MSVC)
   set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=890481&r1=890480&r2=890481&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Dec 14 21:16:30 2009
@@ -691,6 +691,8 @@
   qpid/client/SubscriptionManager.cpp          \
   qpid/client/SubscriptionManagerImpl.cpp      \
   qpid/client/SubscriptionManagerImpl.h                \
+  qpid/client/TCPConnector.cpp                 \
+  qpid/client/TCPConnector.h                   \
   qpid/messaging/Address.cpp                   \
   qpid/messaging/Connection.cpp                        \
   qpid/messaging/ListContent.cpp               \

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=890481&r1=890480&r2=890481&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Mon Dec 14 21:16:30 2009
@@ -18,9 +18,9 @@
  * under the License.
  *
  */
+
 #include "qpid/client/Connector.h"
 
-#include "qpid/client/Bounds.h"
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/ConnectionSettings.h"
 #include "qpid/log/Statement.h"
@@ -35,10 +35,8 @@
 
 #include <iostream>
 #include <map>
-#include <deque>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
-#include <boost/weak_ptr.hpp>
 
 namespace qpid {
 namespace client {
@@ -81,353 +79,4 @@
 {
 }
 
-class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
-{
-    typedef std::deque<framing::AMQFrame> Frames;
-    struct Buff;
-
-    const uint16_t maxFrameSize;
-
-    sys::Mutex lock;
-    Frames frames; // Outgoing frame queue
-    size_t lastEof; // Position after last EOF in frames
-    uint64_t currentSize;
-    Bounds* bounds;
-
-    framing::ProtocolVersion version;
-    bool initiated;
-    bool closed;
-    bool joined;
-
-    sys::ShutdownHandler* shutdownHandler;
-    framing::InputHandler* input;
-    framing::InitiationHandler* initialiser;
-    framing::OutputHandler* output;
-
-    sys::Thread receiver;
-
-    sys::Socket socket;
-
-    sys::AsynchIO* aio;
-    std::string identifier;
-    boost::shared_ptr<sys::Poller> poller;
-    std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
-
-    ~TCPConnector();
-
-    void run();
-    void handleClosed();
-    bool closeInternal();
-
-    void connected(const Socket&);
-    void connectFailed(const std::string& msg);
-    bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
-    void writebuff(qpid::sys::AsynchIO&);
-    void writeDataBlock(const framing::AMQDataBlock& data);
-    void eof(qpid::sys::AsynchIO&);
-
-    boost::weak_ptr<ConnectionImpl> impl;
-
-    void connect(const std::string& host, int port);
-    void close();
-    void send(framing::AMQFrame& frame);
-    void abort();
-
-    void setInputHandler(framing::InputHandler* handler);
-    void setShutdownHandler(sys::ShutdownHandler* handler);
-    sys::ShutdownHandler* getShutdownHandler() const;
-    framing::OutputHandler* getOutputHandler();
-    const std::string& getIdentifier() const;
-    void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
-
-    size_t decode(const char* buffer, size_t size);
-    size_t encode(const char* buffer, size_t size);
-    bool canEncode();
-
-public:
-    TCPConnector(framing::ProtocolVersion pVersion,
-              const ConnectionSettings&, 
-              ConnectionImpl*);
-    unsigned int getSSF() { return 0; }
-};
-
-// Static constructor which registers connector here
-namespace {
-    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, 
ConnectionImpl* c) {
-        return new TCPConnector(v, s, c);
-    }
-
-    struct StaticInit {
-        StaticInit() {
-            Connector::registerFactory("tcp", &create);
-        };
-    } init;
-}
-
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
-    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
-    ~Buff() { delete [] bytes;}
-};
-
-TCPConnector::TCPConnector(ProtocolVersion ver,
-                     const ConnectionSettings& settings,
-                     ConnectionImpl* cimpl)
-    : maxFrameSize(settings.maxFrameSize),
-      lastEof(0),
-      currentSize(0),
-      bounds(cimpl),
-      version(ver), 
-      initiated(false),
-      closed(true),
-      joined(true),
-      shutdownHandler(0),
-      aio(0),
-      impl(cimpl->shared_from_this())
-{
-    QPID_LOG(debug, "TCPConnector created for " << version.toString());
-    settings.configureSocket(socket);
-}
-
-TCPConnector::~TCPConnector() {
-    close();
-}
-
-void TCPConnector::connect(const std::string& host, int port){
-    Mutex::ScopedLock l(lock);
-    assert(closed);
-    assert(joined);
-    poller = Poller::shared_ptr(new Poller);
-    AsynchConnector::create(socket,
-                            poller,
-                            host, port,
-                            boost::bind(&TCPConnector::connected, this, _1),
-                            boost::bind(&TCPConnector::connectFailed, this, 
_3));
-    closed = false;
-    joined = false;
-    receiver = Thread(this);
-}
-
-void TCPConnector::connected(const Socket&) {
-    aio = AsynchIO::create(socket,
-                       boost::bind(&TCPConnector::readbuff, this, _1, _2),
-                       boost::bind(&TCPConnector::eof, this, _1),
-                       boost::bind(&TCPConnector::eof, this, _1),
-                       0, // closed
-                       0, // nobuffs
-                       boost::bind(&TCPConnector::writebuff, this, _1));
-    for (int i = 0; i < 32; i++) {
-        aio->queueReadBuffer(new Buff(maxFrameSize));
-    }
-    aio->start(poller);
-
-    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % 
socket.getPeerAddress());
-    ProtocolInitiation init(version);
-    writeDataBlock(init);
-}
-
-void TCPConnector::connectFailed(const std::string& msg) {
-    QPID_LOG(warning, "Connecting failed: " << msg);
-    closed = true;
-    poller->shutdown();
-    closeInternal();
-    if (shutdownHandler)
-        shutdownHandler->shutdown();
-}
-
-bool TCPConnector::closeInternal() {
-    bool ret;
-    {
-    Mutex::ScopedLock l(lock);
-    ret = !closed;
-    if (!closed) {
-        closed = true;
-        aio->queueForDeletion();
-        poller->shutdown();
-    }
-    if (joined || receiver.id() == Thread::current().id()) {
-        return ret;
-    }
-    joined = true;
-    }
-    receiver.join();
-    return ret;
-}
-
-void TCPConnector::close() {
-    closeInternal();
-}
-
-void TCPConnector::abort() {
-    // Can't abort a closed connection
-    if (!closed) {
-        if (aio) {
-            // Established connection
-            aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
-        } else {
-            // We're still connecting
-            connectFailed("Connection timedout");
-        }
-    }
-}
-
-void TCPConnector::setInputHandler(InputHandler* handler){
-    input = handler;
-}
-
-void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
-    shutdownHandler = handler;
-}
-
-OutputHandler* TCPConnector::getOutputHandler() {
-    return this; 
-}
-
-sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
-    return shutdownHandler;
-}
-
-const std::string& TCPConnector::getIdentifier() const { 
-    return identifier;
-}
-
-void TCPConnector::send(AMQFrame& frame) {
-    Mutex::ScopedLock l(lock);
-    frames.push_back(frame);
-    //only ask to write if this is the end of a frameset or if we
-    //already have a buffers worth of data
-    currentSize += frame.encodedSize();
-    bool notifyWrite = false;
-    if (frame.getEof()) {
-        lastEof = frames.size();
-        notifyWrite = true;
-    } else {
-        notifyWrite = (currentSize >= maxFrameSize);
-    }
-    if (notifyWrite && !closed) aio->notifyPendingWrite();
-}
-
-void TCPConnector::handleClosed() {
-    if (closeInternal() && shutdownHandler)
-        shutdownHandler->shutdown();
-}
-
-void TCPConnector::writebuff(AsynchIO& /*aio*/) 
-{
-    Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : 
(Codec*) this;
-    if (codec->canEncode()) {
-        std::auto_ptr<AsynchIO::BufferBase> buffer = 
std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
-        if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new 
Buff(maxFrameSize));
-
-        size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
-
-        buffer->dataStart = 0;
-        buffer->dataCount = encoded;
-        aio->queueWrite(buffer.release());
-    }
-}
-
-// Called in IO thread.
-bool TCPConnector::canEncode()
-{
-    Mutex::ScopedLock l(lock);
-    //have at least one full frameset or a whole buffers worth of data
-    return lastEof || currentSize >= maxFrameSize;
-}
-
-// Called in IO thread.
-size_t TCPConnector::encode(const char* buffer, size_t size)
-{
-    framing::Buffer out(const_cast<char*>(buffer), size);
-    size_t bytesWritten(0);
-    {
-        Mutex::ScopedLock l(lock);
-        while (!frames.empty() && out.available() >= 
frames.front().encodedSize() ) {
-            frames.front().encode(out);
-            QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
-            frames.pop_front();
-            if (lastEof) --lastEof;
-        }
-        bytesWritten = size - out.available();
-        currentSize -= bytesWritten;
-    }
-    if (bounds) bounds->reduce(bytesWritten);
-    return bytesWritten;
-}
-
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) 
-{
-    Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : 
(Codec*) this;
-    int32_t decoded = codec->decode(buff->bytes+buff->dataStart, 
buff->dataCount);
-    // TODO: unreading needs to go away, and when we can cope
-    // with multiple sub-buffers in the general buffer scheme, it will
-    if (decoded < buff->dataCount) {
-        // Adjust buffer for used bytes and then "unread them"
-        buff->dataStart += decoded;
-        buff->dataCount -= decoded;
-        aio.unread(buff);
-    } else {
-        // Give whole buffer back to aio subsystem
-        aio.queueReadBuffer(buff);
-    }
-    return true;
-}
-
-size_t TCPConnector::decode(const char* buffer, size_t size) 
-{
-    framing::Buffer in(const_cast<char*>(buffer), size);
-    if (!initiated) {
-        framing::ProtocolInitiation protocolInit;
-        if (protocolInit.decode(in)) {
-            QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit 
<< ")");
-            if(!(protocolInit==version)){
-                throw Exception(QPID_MSG("Unsupported version: " << 
protocolInit
-                                         << " supported version " << version));
-            }
-        }
-        initiated = true;
-    }
-    AMQFrame frame;
-    while(frame.decode(in)){
-        QPID_LOG(trace, "RECV " << identifier << ": " << frame);
-        input->received(frame);
-    }
-    return size - in.available();
-}
-
-void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
-    AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
-    framing::Buffer out(buff->bytes, buff->byteCount);
-    data.encode(out);
-    buff->dataCount = data.encodedSize();
-    aio->queueWrite(buff);
-}
-
-void TCPConnector::eof(AsynchIO&) {
-    handleClosed();
-}
-
-void TCPConnector::run() {
-    // Keep the connection impl in memory until run() completes.
-    boost::shared_ptr<ConnectionImpl> protect = impl.lock();
-    assert(protect);
-    try {
-        Dispatcher d(poller);
-
-        d.run();
-    } catch (const std::exception& e) {
-        QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
-        handleClosed();
-    }
-    try {
-        socket.close();
-    } catch (const std::exception&) {}
-}
-
-void 
TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
-{
-    securityLayer = sl;
-    securityLayer->init(this);
-}
-
-
 }} // namespace qpid::client

Added: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=890481&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp Mon Dec 14 21:16:30 
2009
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/TCPConnector.h"
+
+#include "qpid/client/ConnectionImpl.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Codec.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+// Static constructor which registers connector here
+namespace {
+    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, 
ConnectionImpl* c) {
+        return new TCPConnector(v, s, c);
+    }
+
+    struct StaticInit {
+        StaticInit() {
+            Connector::registerFactory("tcp", &create);
+        };
+    } init;
+}
+
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+    ~Buff() { delete [] bytes;}
+};
+
+TCPConnector::TCPConnector(ProtocolVersion ver,
+                           const ConnectionSettings& settings,
+                           ConnectionImpl* cimpl)
+    : maxFrameSize(settings.maxFrameSize),
+      lastEof(0),
+      currentSize(0),
+      bounds(cimpl),
+      version(ver), 
+      initiated(false),
+      closed(true),
+      joined(true),
+      shutdownHandler(0),
+      aio(0),
+      impl(cimpl->shared_from_this())
+{
+    QPID_LOG(debug, "TCPConnector created for " << version.toString());
+    settings.configureSocket(socket);
+}
+
+TCPConnector::~TCPConnector() {
+    close();
+}
+
+void TCPConnector::connect(const std::string& host, int port) {
+    Mutex::ScopedLock l(lock);
+    assert(closed);
+    assert(joined);
+    poller = Poller::shared_ptr(new Poller);
+    AsynchConnector::create(socket,
+                            poller,
+                            host, port,
+                            boost::bind(&TCPConnector::connected, this, _1),
+                            boost::bind(&TCPConnector::connectFailed, this, 
_3));
+    closed = false;
+    joined = false;
+    receiver = Thread(this);
+}
+
+void TCPConnector::connected(const Socket&) {
+    aio = AsynchIO::create(socket,
+                       boost::bind(&TCPConnector::readbuff, this, _1, _2),
+                       boost::bind(&TCPConnector::eof, this, _1),
+                       boost::bind(&TCPConnector::eof, this, _1),
+                       0, // closed
+                       0, // nobuffs
+                       boost::bind(&TCPConnector::writebuff, this, _1));
+    for (int i = 0; i < 32; i++) {
+        aio->queueReadBuffer(new Buff(maxFrameSize));
+    }
+    aio->start(poller);
+
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % 
socket.getPeerAddress());
+    ProtocolInitiation init(version);
+    writeDataBlock(init);
+}
+
+void TCPConnector::connectFailed(const std::string& msg) {
+    QPID_LOG(warning, "Connecting failed: " << msg);
+    closed = true;
+    poller->shutdown();
+    closeInternal();
+    if (shutdownHandler)
+        shutdownHandler->shutdown();
+}
+
+bool TCPConnector::closeInternal() {
+    bool ret;
+    {
+    Mutex::ScopedLock l(lock);
+    ret = !closed;
+    if (!closed) {
+        closed = true;
+        aio->queueForDeletion();
+        poller->shutdown();
+    }
+    if (joined || receiver.id() == Thread::current().id()) {
+        return ret;
+    }
+    joined = true;
+    }
+    receiver.join();
+    return ret;
+}
+
+void TCPConnector::close() {
+    closeInternal();
+}
+
+void TCPConnector::abort() {
+    // Can't abort a closed connection
+    if (!closed) {
+        if (aio) {
+            // Established connection
+            aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
+        } else {
+            // We're still connecting
+            connectFailed("Connection timedout");
+        }
+    }
+}
+
+void TCPConnector::setInputHandler(InputHandler* handler){
+    input = handler;
+}
+
+void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
+    shutdownHandler = handler;
+}
+
+OutputHandler* TCPConnector::getOutputHandler() {
+    return this; 
+}
+
+sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
+    return shutdownHandler;
+}
+
+const std::string& TCPConnector::getIdentifier() const { 
+    return identifier;
+}
+
+void TCPConnector::send(AMQFrame& frame) {
+    Mutex::ScopedLock l(lock);
+    frames.push_back(frame);
+    //only ask to write if this is the end of a frameset or if we
+    //already have a buffers worth of data
+    currentSize += frame.encodedSize();
+    bool notifyWrite = false;
+    if (frame.getEof()) {
+        lastEof = frames.size();
+        notifyWrite = true;
+    } else {
+        notifyWrite = (currentSize >= maxFrameSize);
+    }
+    if (notifyWrite && !closed) aio->notifyPendingWrite();
+}
+
+void TCPConnector::handleClosed() {
+    if (closeInternal() && shutdownHandler)
+        shutdownHandler->shutdown();
+}
+
+void TCPConnector::writebuff(AsynchIO& /*aio*/) 
+{
+    Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : 
(Codec*) this;
+    if (codec->canEncode()) {
+        std::auto_ptr<AsynchIO::BufferBase> buffer = 
std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
+        if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new 
Buff(maxFrameSize));
+
+        size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
+
+        buffer->dataStart = 0;
+        buffer->dataCount = encoded;
+        aio->queueWrite(buffer.release());
+    }
+}
+
+// Called in IO thread.
+bool TCPConnector::canEncode()
+{
+    Mutex::ScopedLock l(lock);
+    //have at least one full frameset or a whole buffers worth of data
+    return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t TCPConnector::encode(const char* buffer, size_t size)
+{
+    framing::Buffer out(const_cast<char*>(buffer), size);
+    size_t bytesWritten(0);
+    {
+        Mutex::ScopedLock l(lock);
+        while (!frames.empty() && out.available() >= 
frames.front().encodedSize() ) {
+            frames.front().encode(out);
+            QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
+            frames.pop_front();
+            if (lastEof) --lastEof;
+        }
+        bytesWritten = size - out.available();
+        currentSize -= bytesWritten;
+    }
+    if (bounds) bounds->reduce(bytesWritten);
+    return bytesWritten;
+}
+
+bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) 
+{
+    Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : 
(Codec*) this;
+    int32_t decoded = codec->decode(buff->bytes+buff->dataStart, 
buff->dataCount);
+    // TODO: unreading needs to go away, and when we can cope
+    // with multiple sub-buffers in the general buffer scheme, it will
+    if (decoded < buff->dataCount) {
+        // Adjust buffer for used bytes and then "unread them"
+        buff->dataStart += decoded;
+        buff->dataCount -= decoded;
+        aio.unread(buff);
+    } else {
+        // Give whole buffer back to aio subsystem
+        aio.queueReadBuffer(buff);
+    }
+    return true;
+}
+
+size_t TCPConnector::decode(const char* buffer, size_t size) 
+{
+    framing::Buffer in(const_cast<char*>(buffer), size);
+    if (!initiated) {
+        framing::ProtocolInitiation protocolInit;
+        if (protocolInit.decode(in)) {
+            QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit 
<< ")");
+            if(!(protocolInit==version)){
+                throw Exception(QPID_MSG("Unsupported version: " << 
protocolInit
+                                         << " supported version " << version));
+            }
+        }
+        initiated = true;
+    }
+    AMQFrame frame;
+    while(frame.decode(in)){
+        QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+        input->received(frame);
+    }
+    return size - in.available();
+}
+
+void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
+    AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
+    framing::Buffer out(buff->bytes, buff->byteCount);
+    data.encode(out);
+    buff->dataCount = data.encodedSize();
+    aio->queueWrite(buff);
+}
+
+void TCPConnector::eof(AsynchIO&) {
+    handleClosed();
+}
+
+void TCPConnector::run() {
+    // Keep the connection impl in memory until run() completes.
+    boost::shared_ptr<ConnectionImpl> protect = impl.lock();
+    assert(protect);
+    try {
+        Dispatcher d(poller);
+
+        d.run();
+    } catch (const std::exception& e) {
+        QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
+        handleClosed();
+    }
+    try {
+        socket.close();
+    } catch (const std::exception&) {}
+}
+
+void 
TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
+{
+    securityLayer = sl;
+    securityLayer->init(this);
+}
+
+
+}} // namespace qpid::client

Added: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h?rev=890481&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h Mon Dec 14 21:16:30 2009
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TCPConnector_
+#define _TCPConnector_
+
+#include "Connector.h"
+#include "qpid/client/Bounds.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Codec.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+#include <deque>
+#include <string>
+
+namespace qpid {
+namespace client {
+
+class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
+{
+    typedef std::deque<framing::AMQFrame> Frames;
+    struct Buff;
+
+    const uint16_t maxFrameSize;
+
+    sys::Mutex lock;
+    Frames frames; // Outgoing frame queue
+    size_t lastEof; // Position after last EOF in frames
+    uint64_t currentSize;
+    Bounds* bounds;
+
+    framing::ProtocolVersion version;
+    bool initiated;
+    bool closed;
+    bool joined;
+
+    sys::ShutdownHandler* shutdownHandler;
+    framing::InputHandler* input;
+    framing::InitiationHandler* initialiser;
+    framing::OutputHandler* output;
+
+    sys::Thread receiver;
+
+    sys::Socket socket;
+
+    sys::AsynchIO* aio;
+    std::string identifier;
+    boost::shared_ptr<sys::Poller> poller;
+    std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+
+    ~TCPConnector();
+
+    void run();
+    void handleClosed();
+    bool closeInternal();
+
+    virtual void connected(const qpid::sys::Socket&);
+    void connectFailed(const std::string& msg);
+    bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+    void writebuff(qpid::sys::AsynchIO&);
+    void writeDataBlock(const framing::AMQDataBlock& data);
+    void eof(qpid::sys::AsynchIO&);
+
+    boost::weak_ptr<ConnectionImpl> impl;
+
+    void connect(const std::string& host, int port);
+    void close();
+    void send(framing::AMQFrame& frame);
+    void abort();
+
+    void setInputHandler(framing::InputHandler* handler);
+    void setShutdownHandler(sys::ShutdownHandler* handler);
+    sys::ShutdownHandler* getShutdownHandler() const;
+    framing::OutputHandler* getOutputHandler();
+    const std::string& getIdentifier() const;
+    void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>);
+
+    size_t decode(const char* buffer, size_t size);
+    size_t encode(const char* buffer, size_t size);
+    bool canEncode();
+
+public:
+    TCPConnector(framing::ProtocolVersion pVersion,
+                 const ConnectionSettings&, 
+                 ConnectionImpl*);
+    unsigned int getSSF() { return 0; }
+};
+
+}}   // namespace qpid::client
+
+#endif  /* _TCPConnector_ */



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to